diff --git a/docs/source/_static/images/comet-dataflow.excalidraw b/docs/source/_static/images/comet-dataflow.excalidraw
index dd12099834..28c81d40b0 100644
--- a/docs/source/_static/images/comet-dataflow.excalidraw
+++ b/docs/source/_static/images/comet-dataflow.excalidraw
@@ -234,10 +234,6 @@
"type": "text",
"id": "yHFb7s7QYOWZst8xXlFG2"
},
- {
- "id": "Jd5Fqfx6eFl_OJ6x0TUki",
- "type": "arrow"
- },
{
"id": "7KEns52XY_jok50o5G5op",
"type": "arrow"
@@ -387,10 +383,10 @@
{
"id": "twg3z-vK6jWmVl4xySGde",
"type": "text",
- "x": 396.18428047371066,
- "y": 341.33772616179004,
- "width": 203.32500000000002,
- "height": 25,
+ "x": 402.39679115486297,
+ "y": 328.83772616179004,
+ "width": 190.8999786376953,
+ "height": 50,
"angle": 0.003703686768755432,
"strokeColor": "#1e1e1e",
"backgroundColor": "#a5d8ff",
@@ -404,20 +400,20 @@
"index": "b0h",
"roundness": null,
"seed": 34654423,
- "version": 631,
- "versionNonce": 1121311223,
+ "version": 633,
+ "versionNonce": 1471420929,
"isDeleted": false,
"boundElements": [],
- "updated": 1733167372551,
+ "updated": 1781542105529,
"link": null,
"locked": false,
- "text": "CometBatchIterator",
+ "text": "ArrowReader\n(ArrowArrayStream)",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "center",
"verticalAlign": "middle",
"containerId": "Ro2R78aPw-luRF_bB2EKU",
- "originalText": "CometBatchIterator",
+ "originalText": "ArrowReader\n(ArrowArrayStream)",
"autoResize": true,
"lineHeight": 1.25
},
@@ -854,16 +850,20 @@
],
"lastCommittedPoint": null,
"startBinding": {
+ "mode": "orbit",
"elementId": "GHKyE6o_at1-J0KO1mWpt",
- "focus": -0.48947127224675757,
- "gap": 1,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.5155015866677681,
+ 0.5155015866677681
+ ]
},
"endBinding": {
+ "mode": "orbit",
"elementId": "C3-eUJazhRorbXp9Um-Mo",
- "focus": -1.9941089907787155,
- "gap": 12.73052391874603,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.8253968253968254,
+ 1.5092209567498411
+ ]
},
"startArrowhead": null,
"endArrowhead": "arrow",
@@ -910,16 +910,20 @@
],
"lastCommittedPoint": null,
"startBinding": {
+ "mode": "orbit",
"elementId": "aiAipugp154jY5IgHqjTm",
- "focus": 0.1262072643242283,
- "gap": 1,
- "fixedPoint": null
+ "fixedPoint": [
+ 0,
+ 0.5001
+ ]
},
"endBinding": {
+ "mode": "orbit",
"elementId": "GHKyE6o_at1-J0KO1mWpt",
- "focus": 0.37801089214584216,
- "gap": 1,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.6741118143201608,
+ 0.6741118143201603
+ ]
},
"startArrowhead": null,
"endArrowhead": "arrow",
@@ -950,7 +954,7 @@
"version": 139,
"versionNonce": 2098561815,
"isDeleted": false,
- "boundElements": null,
+ "boundElements": [],
"updated": 1733167372551,
"link": null,
"locked": false,
@@ -966,16 +970,20 @@
],
"lastCommittedPoint": null,
"startBinding": {
+ "mode": "orbit",
"elementId": "7VTYHzsqQvUuKMy0ShKZn",
- "focus": 0.261904761904762,
- "gap": 3.5,
- "fixedPoint": null
+ "fixedPoint": [
+ 0,
+ 0.5001
+ ]
},
"endBinding": {
+ "mode": "orbit",
"elementId": "Ro2R78aPw-luRF_bB2EKU",
- "focus": -0.3765315568105985,
- "gap": 2.2509344960505473,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.721060978197207,
+ 0.2789390218027937
+ ]
},
"startArrowhead": null,
"endArrowhead": "arrow",
@@ -1006,7 +1014,7 @@
"version": 131,
"versionNonce": 1606617143,
"isDeleted": false,
- "boundElements": null,
+ "boundElements": [],
"updated": 1733167372551,
"link": null,
"locked": false,
@@ -1022,16 +1030,20 @@
],
"lastCommittedPoint": null,
"startBinding": {
+ "mode": "orbit",
"elementId": "Ro2R78aPw-luRF_bB2EKU",
- "focus": 0.31181090317651905,
- "gap": 1.3694590603334404,
- "fixedPoint": null
+ "fixedPoint": [
+ 1.0000000000000002,
+ 0.5001
+ ]
},
"endBinding": {
+ "mode": "orbit",
"elementId": "7VTYHzsqQvUuKMy0ShKZn",
- "focus": -0.5000000000000002,
- "gap": 1.5,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.22549507027974353,
+ 0.7745049297202583
+ ]
},
"startArrowhead": null,
"endArrowhead": "arrow",
@@ -1062,7 +1074,7 @@
"version": 252,
"versionNonce": 1005623161,
"isDeleted": false,
- "boundElements": null,
+ "boundElements": [],
"updated": 1733167373032,
"link": null,
"locked": false,
@@ -1078,16 +1090,20 @@
],
"lastCommittedPoint": null,
"startBinding": {
+ "mode": "inside",
"elementId": "Ro2R78aPw-luRF_bB2EKU",
- "focus": 0.46419678699387723,
- "gap": 1,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.27180835287146266,
+ 0.9969178948258344
+ ]
},
"endBinding": {
+ "mode": "orbit",
"elementId": "GHKyE6o_at1-J0KO1mWpt",
- "focus": -0.3880655447790852,
- "gap": 2.3015909311958467,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.25764776855868854,
+ 0.2576477685586875
+ ]
},
"startArrowhead": null,
"endArrowhead": "arrow",
@@ -1118,7 +1134,7 @@
"version": 236,
"versionNonce": 55168313,
"isDeleted": false,
- "boundElements": null,
+ "boundElements": [],
"updated": 1733167373032,
"link": null,
"locked": false,
@@ -1134,16 +1150,20 @@
],
"lastCommittedPoint": null,
"startBinding": {
+ "mode": "orbit",
"elementId": "GHKyE6o_at1-J0KO1mWpt",
- "focus": 0.4579838276843583,
- "gap": 2.723810257547825,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.7654127769256563,
+ 0.23458722307434404
+ ]
},
"endBinding": {
+ "mode": "orbit",
"elementId": "Ro2R78aPw-luRF_bB2EKU",
- "focus": -0.43910468547182224,
- "gap": 1,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.754483118562463,
+ 0.7544831185624622
+ ]
},
"startArrowhead": null,
"endArrowhead": "arrow",
@@ -1174,7 +1194,7 @@
"version": 132,
"versionNonce": 1271110743,
"isDeleted": false,
- "boundElements": null,
+ "boundElements": [],
"updated": 1733167393399,
"link": null,
"locked": false,
@@ -1190,16 +1210,20 @@
],
"lastCommittedPoint": null,
"startBinding": {
+ "mode": "orbit",
"elementId": "macb6DKtgx8DhcqjKk6no",
- "focus": -0.4652381310069499,
- "gap": 3.4138894826694752,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.5288290914529353,
+ 0.4711709085470648
+ ]
},
"endBinding": {
+ "mode": "orbit",
"elementId": "OFwuou30qsm3aMZ96ASUO",
- "focus": -1.5667144994299218,
- "gap": 7.5,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.8333333333333334,
+ 1.3
+ ]
},
"startArrowhead": null,
"endArrowhead": "arrow",
@@ -1292,8 +1316,8 @@
{
"id": "2ZoBSXI-amAjEfzxoQ17b",
"type": "text",
- "x": 749.1634633724364,
- "y": 308.8333435058594,
+ "x": 687.6673696224364,
+ "y": 305.5325622558594,
"width": 111,
"height": 25,
"angle": 0,
@@ -1309,20 +1333,20 @@
"index": "b18",
"roundness": null,
"seed": 1238305721,
- "version": 97,
- "versionNonce": 1368434199,
+ "version": 219,
+ "versionNonce": 1166686625,
"isDeleted": false,
"boundElements": [],
- "updated": 1733167188224,
+ "updated": 1781542080581,
"link": null,
"locked": false,
- "text": "next()",
+ "text": "get_next()",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "left",
"verticalAlign": "top",
"containerId": null,
- "originalText": "next()",
+ "originalText": "get_next()",
"autoResize": false,
"lineHeight": 1.25
},
@@ -1366,8 +1390,8 @@
{
"id": "VFm7kotI1oNa1rIxLMh6W",
"type": "text",
- "x": 676.6634633724364,
- "y": 376.3333435058594,
+ "x": 694.2806508724364,
+ "y": 370.1302185058594,
"width": 147,
"height": 25,
"angle": 0,
@@ -1383,20 +1407,20 @@
"index": "b1A",
"roundness": null,
"seed": 1623222905,
- "version": 76,
- "versionNonce": 1030050969,
+ "version": 161,
+ "versionNonce": 189588193,
"isDeleted": false,
- "boundElements": null,
- "updated": 1733167271120,
+ "boundElements": [],
+ "updated": 1781542085214,
"link": null,
"locked": false,
- "text": "exportBatch()",
+ "text": "batch",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "left",
"verticalAlign": "top",
"containerId": null,
- "originalText": "exportBatch()",
+ "originalText": "batch",
"autoResize": false,
"lineHeight": 1.25
},
@@ -1423,7 +1447,7 @@
"version": 127,
"versionNonce": 1917573399,
"isDeleted": false,
- "boundElements": null,
+ "boundElements": [],
"updated": 1733167377483,
"link": null,
"locked": false,
@@ -1462,7 +1486,7 @@
"version": 35,
"versionNonce": 652639415,
"isDeleted": false,
- "boundElements": null,
+ "boundElements": [],
"updated": 1733167385065,
"link": null,
"locked": false,
@@ -1478,16 +1502,20 @@
],
"lastCommittedPoint": null,
"startBinding": {
+ "mode": "inside",
"elementId": "lSUrwgLq2W49ULouPfm0h",
- "focus": -0.2114558118557865,
- "gap": 1,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.002027658615173893,
+ 0.6130952380952382
+ ]
},
"endBinding": {
+ "mode": "orbit",
"elementId": "macb6DKtgx8DhcqjKk6no",
- "focus": 0.3654122251883526,
- "gap": 1,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.7030751689956286,
+ 0.7030751689956288
+ ]
},
"startArrowhead": null,
"endArrowhead": "arrow",
@@ -1555,7 +1583,7 @@
"version": 17,
"versionNonce": 1070980535,
"isDeleted": false,
- "boundElements": null,
+ "boundElements": [],
"updated": 1733167413149,
"link": null,
"locked": false,
@@ -1571,16 +1599,20 @@
],
"lastCommittedPoint": null,
"startBinding": {
+ "mode": "orbit",
"elementId": "lSUrwgLq2W49ULouPfm0h",
- "focus": 0.4183574316825765,
- "gap": 11.500000000000014,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.22193792575017277,
+ 0.7780620742498273
+ ]
},
"endBinding": {
+ "mode": "orbit",
"elementId": "7VTYHzsqQvUuKMy0ShKZn",
- "focus": -0.37462537462537454,
- "gap": 8,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.23385173574258802,
+ 0.23385173574258805
+ ]
},
"startArrowhead": null,
"endArrowhead": "arrow",
@@ -1611,7 +1643,7 @@
"version": 15,
"versionNonce": 1869550297,
"isDeleted": false,
- "boundElements": null,
+ "boundElements": [],
"updated": 1733167417649,
"link": null,
"locked": false,
@@ -1627,16 +1659,20 @@
],
"lastCommittedPoint": null,
"startBinding": {
+ "mode": "orbit",
"elementId": "7VTYHzsqQvUuKMy0ShKZn",
- "focus": 0.4912563895614742,
- "gap": 14,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.8306354865497666,
+ 0.169364513450234
+ ]
},
"endBinding": {
+ "mode": "orbit",
"elementId": "lSUrwgLq2W49ULouPfm0h",
- "focus": -0.4789893168742339,
- "gap": 7.500000000000014,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.8128526092800505,
+ 0.8128526092800512
+ ]
},
"startArrowhead": null,
"endArrowhead": "arrow",
@@ -1831,7 +1867,7 @@
"version": 108,
"versionNonce": 91341817,
"isDeleted": false,
- "boundElements": null,
+ "boundElements": [],
"updated": 1733167501667,
"link": null,
"locked": false,
@@ -2092,7 +2128,7 @@
"version": 40,
"versionNonce": 1347291095,
"isDeleted": false,
- "boundElements": null,
+ "boundElements": [],
"updated": 1733167585167,
"link": null,
"locked": false,
@@ -2108,16 +2144,20 @@
],
"lastCommittedPoint": null,
"startBinding": {
+ "mode": "orbit",
"elementId": "lSUrwgLq2W49ULouPfm0h",
- "focus": 0.04618975520292551,
- "gap": 1.496776360717604,
- "fixedPoint": null
+ "fixedPoint": [
+ 0.9999999999999998,
+ 0.5001
+ ]
},
"endBinding": {
+ "mode": "orbit",
"elementId": "xdQ0w3-b5BGEpSvQ2Uc8A",
- "focus": -0.13841786234942072,
- "gap": 1,
- "fixedPoint": null
+ "fixedPoint": [
+ 0,
+ 0.5001
+ ]
},
"startArrowhead": null,
"endArrowhead": "arrow",
@@ -2128,7 +2168,8 @@
"gridSize": 20,
"gridStep": 5,
"gridModeEnabled": false,
- "viewBackgroundColor": "#ffffff"
+ "viewBackgroundColor": "#ffffff",
+ "lockedMultiSelections": {}
},
"files": {}
}
\ No newline at end of file
diff --git a/docs/source/_static/images/comet-dataflow.svg b/docs/source/_static/images/comet-dataflow.svg
index 20a573c1fa..2fc8e186a1 100644
--- a/docs/source/_static/images/comet-dataflow.svg
+++ b/docs/source/_static/images/comet-dataflow.svg
@@ -1,10 +1,4 @@
-
\ No newline at end of file
+
+
+
\ No newline at end of file
diff --git a/docs/source/contributor-guide/ffi.md b/docs/source/contributor-guide/ffi.md
index 24e6843ba1..f45fb9beaf 100644
--- a/docs/source/contributor-guide/ffi.md
+++ b/docs/source/contributor-guide/ffi.md
@@ -21,10 +21,14 @@ under the License.
## Overview
-Comet uses the [Arrow C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) for zero-copy data transfer in two directions:
+Comet transfers Arrow data across the JVM/native boundary in two directions:
-1. **JVM → Native**: Native code pulls batches from JVM using `CometBatchIterator`
-2. **Native → JVM**: JVM pulls batches from native code using `CometExecIterator`
+1. **JVM → Native**: Native code pulls batches from the JVM over the
+ [Arrow C Stream Interface](https://arrow.apache.org/docs/format/CStreamInterface.html). The JVM exports each
+ per-partition iterator once as an `ArrowArrayStream`, and native pulls every batch through a single C callback.
+2. **Native → JVM**: JVM pulls batches from native code using `CometExecIterator`, via the
+ [Arrow C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) (one `ArrowArray`/`ArrowSchema`
+ pair per batch).
The following diagram shows an example of the end-to-end flow for a query stage.
@@ -39,6 +43,11 @@ The Arrow C Data Interface defines two C structures:
- `ArrowArray`: Contains pointers to data buffers and metadata
- `ArrowSchema`: Contains type information
+The Arrow C Stream Interface builds on these with a third structure:
+
+- `ArrowArrayStream`: A stream of `ArrowArray`s sharing one `ArrowSchema`, pulled one at a time through a
+ `get_next` C callback. This is how Comet transfers JVM-sourced input (see below).
+
### Key Characteristics
- **Zero-copy**: Data buffers can be shared across language boundaries without copying
@@ -49,24 +58,27 @@ The Arrow C Data Interface defines two C structures:
### Architecture
-When native code needs data from the JVM, it uses `ScanExec` which calls into `CometBatchIterator`:
+When native code needs data from the JVM, it uses `ScanExec`, which is backed by an Arrow C Stream that the JVM
+exports once per partition:
```
┌─────────────────┐
│ Spark/Scala │
-│ CometExecIter │
+│ Iterator of │
+│ batches or rows │
└────────┬────────┘
- │ produces batches
+ │ wrapped in an ArrowReader, exported once
+ │ via Data.exportArrayStream
▼
┌─────────────────┐
-│ CometBatchIter │ ◄─── JNI call from native
-│ (JVM side) │
+│ ArrowArrayStream│ ── JVM side: one C stream struct per partition
+│ (C struct) │
└────────┬────────┘
- │ Arrow FFI
- │ (transfers ArrowArray/ArrowSchema pointers)
+ │ Arrow C Stream Interface
+ │ (native pulls each batch via the get_next callback)
▼
┌─────────────────┐
-│ ScanExec │
+│ ScanExec │ ── owns an AlignedArrowStreamReader
│ (Rust/native) │
└────────┬────────┘
│
@@ -77,32 +89,38 @@ When native code needs data from the JVM, it uses `ScanExec` which calls into `C
└─────────────────┘
```
-### FFI Transfer Process
+### Stream Export and Import
-The data transfer happens in `ScanExec::get_next()`:
+On the JVM side, `CometArrowStream` (in `execution/arrow/CometNativeArrowSource.scala`) wraps each per-partition
+input in an `org.apache.arrow.vector.ipc.ArrowReader` and exports it once with `Data.exportArrayStream`. The reader
+implementation depends on the source of the data:
-```rust
-// 1. Allocate FFI structures on native side (Rust heap)
-for _ in 0..num_cols {
- let arrow_array = Rc::new(FFI_ArrowArray::empty());
- let arrow_schema = Rc::new(FFI_ArrowSchema::empty());
- let array_ptr = Rc::into_raw(arrow_array) as i64;
- let schema_ptr = Rc::into_raw(arrow_schema) as i64;
- // Store pointers...
-}
+- `RowArrowReader`: a Spark `Iterator[InternalRow]` (row input)
+- `SparkColumnarArrowReader`: a non-Arrow Spark `ColumnarBatch`
+- `ColumnarBatchArrowReader`: an Arrow-backed `ColumnarBatch` (transfers `VectorSchemaRoot` ownership)
-// 2. Call JVM to populate FFI structures
-let num_rows: i32 = unsafe {
- jni_call!(env, comet_batch_iterator(iter).next(array_obj, schema_obj) -> i32)?
-};
+The exported `ArrowArrayStream`s are boxed into the `Array[Object]` that `CometExecIterator` / `CometExecRDD` pass
+to native `createPlan` (one slot per scan input; shuffle inputs pass a `CometShuffleBlockIterator` instead).
-// 3. Import data from FFI structures
-for i in 0..num_cols {
- let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?;
- let array = make_array(array_data);
- // ... process array
-}
-```
+On the native side, `planner.rs` reads each stream's `memoryAddress` and takes ownership through
+`AlignedArrowStreamReader::from_raw`, importing the schema once. `ScanExec::get_next_batch` then pulls each batch
+through the stream's `get_next` callback. There is no per-batch JNI call and no per-column FFI export.
+
+### Buffer Alignment (AlignedArrowStreamReader)
+
+`AlignedArrowStreamReader` (in `execution/operators/aligned_stream_reader.rs`) wraps the imported stream and calls
+`align_buffers` on every batch before constructing typed arrays. This works around the fact that Java's allocator
+hands back `Decimal128` buffers at 8-byte (not 16-byte) alignment, which the stock `ArrowArrayStreamReader` rejects
+([apache/arrow-rs#10028](https://github.com/apache/arrow-rs/issues/10028)). The fix
+([apache/arrow-rs#10030](https://github.com/apache/arrow-rs/pull/10030)) makes import align internally and ships in
+arrow 59.0.0; once Comet is on arrow >= 59 this reader can be dropped for the stock `ArrowArrayStreamReader`.
+
+### Schema Reconciliation
+
+`CometArrowStream.reconcileStreamSchema` advertises the stream's schema from the actual `CometVector` types in the
+first batch rather than the consumer's Spark-declared types. Native `ScanExec` already casts its input to the
+declared scan-input schema in `build_record_batch`, so the truthful first-batch schema lets that cast fire; if the
+two differ, it logs one deduplicated warning naming the operator, column, and type drift.
### Memory Layout
@@ -126,68 +144,19 @@ Off-heap Memory: │
└──────────────────┘
```
-**Key Point**: The actual data buffers can be off-heap, but the `ArrowArray` and `ArrowSchema` wrapper objects are **always allocated on the JVM heap**.
-
-### Wrapper Object Lifecycle
-
-When arrays are created in the JVM and passed to native code, the JVM creates the array data off-heap and creates
-wrapper objects `ArrowArray` and `ArrowSchema` on-heap. These wrapper objects can consume significant memory over
-time.
-
-```
-Per batch overhead on JVM heap:
-- ArrowArray object: ~100 bytes
-- ArrowSchema object: ~100 bytes
-- Per column: ~200 bytes
-- 100 columns × 1000 batches = ~20 MB of wrapper objects
-```
-
-When native code pulls batches from the JVM, the JVM wrapper objects are kept alive until the native code drops
-all references to the arrays.
-
-When operators such as `SortExec` fetch many batches and buffer them in native code, the number of wrapper objects
-in Java on-heap memory keeps growing until the batches are released in native code at the end of the sort operation.
-
-### Ownership Transfer
-
-The Arrow C data interface supports ownership transfer by registering callbacks in the C struct that is passed over
-the JNI boundary for the function to delete the array data. For example, the `ArrowArray` struct has:
-
-```c
-// Release callback
-void (*release)(struct ArrowArray*);
-```
-
-Comet currently does not always follow best practice around ownership transfer because there are some cases where
-Comet JVM code will retain references to arrays after passing them to native code and may mutate the underlying
-buffers. There is an `arrow_ffi_safe` flag in the protocol buffer definition of `Scan` that indicates whether
-ownership is being transferred according to the Arrow C data interface specification.
-
-```protobuf
-message Scan {
- repeated spark.spark_expression.DataType fields = 1;
- // The source of the scan (e.g. file scan, broadcast exchange, shuffle, etc). This
- // is purely for informational purposes when viewing native query plans in
- // debug mode.
- string source = 2;
- // Whether native code can assume ownership of batches that it receives
- bool arrow_ffi_safe = 3;
-}
-```
-
-#### When ownership is NOT transferred to native:
-
-If the data originates from a scan that uses mutable buffers
-then ownership is not transferred to native and the JVM may re-use the underlying buffers in the future.
+**Key Point**: The actual data buffers are shared zero-copy; native only takes pointers to the off-heap buffers.
-It is critical that the native code performs a deep copy of the arrays if the arrays are to be buffered by
-operators such as `SortExec` or `ShuffleWriterExec`, otherwise data corruption is likely to occur.
+### Ownership and Lifecycle
-#### When ownership IS transferred to native:
+The Arrow C Stream Interface transfers ownership by reference count: native takes ownership of each imported batch,
+so it is safe to buffer batches in operators such as `SortExec` or `ShuffleWriterExec` without a deep copy.
-When ownership is transferred, it is safe to buffer batches in native. However, JVM wrapper objects will not be
-released until the native batches are dropped. This can lead to OOM or GC pressure if there is not enough Java
-heap memory configured.
+The whole per-partition stream is exported once, so the JVM allocates one `ArrowArrayStream` per partition rather
+than a per-batch, per-column `ArrowArray`/`ArrowSchema` wrapper object pair. Lifecycle is anchored at the stream: when
+`ScanExec` drops its `AlignedArrowStreamReader`, the stream's release callback fires synchronously back into the JVM
+and closes the `ArrowReader` and its `VectorSchemaRoot`, releasing the off-heap buffers. Because native holds those
+buffers until the reader drops, an operator that buffers many batches keeps the corresponding JVM-side data alive
+until then.
## Native → JVM Data Flow (CometExecIterator)
@@ -342,10 +311,9 @@ arrowBuf.close(); // → calls native release_batch()
### JVM → Native
-| Scenario | `arrow_ffi_safe` | Ownership | Action Required |
-| ------------------ | ---------------- | ----------- | -------------------------------------- |
-| Temporary scan | `false` | JVM keeps | **Must deep copy** to avoid corruption |
-| Ownership transfer | `true` | Native owns | Copy only to unpack dictionaries |
+| Scenario | Ownership | Action Required |
+| --------- | ----------- | -------------------------------------------------------------------------------------------------------------------------------------------- |
+| All cases | Native owns | None; the C Stream transfers ownership by reference count (copy only to unpack dictionaries). Dropping the reader releases the JVM-side data |
### Native → JVM
@@ -356,5 +324,6 @@ arrowBuf.close(); // → calls native release_batch()
## Further Reading
- [Arrow C Data Interface Specification](https://arrow.apache.org/docs/format/CDataInterface.html)
+- [Arrow C Stream Interface Specification](https://arrow.apache.org/docs/format/CStreamInterface.html)
- [Arrow Java FFI Implementation](https://github.com/apache/arrow/tree/main/java/c)
- [Arrow Rust FFI Implementation](https://docs.rs/arrow/latest/arrow/ffi/)
diff --git a/docs/source/contributor-guide/plugin_overview.md b/docs/source/contributor-guide/plugin_overview.md
index 444c9ccb60..c3330a65d6 100644
--- a/docs/source/contributor-guide/plugin_overview.md
+++ b/docs/source/contributor-guide/plugin_overview.md
@@ -96,9 +96,10 @@ In the native code there is a `PhysicalPlanner` struct (in `planner.rs`) which c
Apache DataFusion `ExecutionPlan`. In some cases, Comet provides specialized physical operators and expressions to
override the DataFusion versions to ensure compatibility with Apache Spark.
-The leaf nodes in the physical plan are always `ScanExec` and each of these operators will make a JNI call to
-`CometBatchIterator.next()` to fetch the next input batch. The input could be a Comet native Parquet scan,
-a Spark exchange, or another native plan.
+The leaf nodes in the physical plan are always `ScanExec`. Each JVM-sourced input is exported once as an
+[Arrow C Stream](https://arrow.apache.org/docs/format/CStreamInterface.html) (`org.apache.arrow.c.ArrowArrayStream`),
+and `ScanExec` pulls each input batch through a single C callback rather than making a JNI call per batch. The
+input could be a Comet native Parquet scan, a Spark exchange, or another native plan.
`CometNativeExec` creates a `CometExecIterator` and applies this iterator to the input RDD
partitions. Each call to `CometExecIterator.next()` will invoke `Native.executePlan`. Once the plan finishes
diff --git a/docs/source/contributor-guide/profiling.md b/docs/source/contributor-guide/profiling.md
index 67729a235e..1c4faa5a86 100644
--- a/docs/source/contributor-guide/profiling.md
+++ b/docs/source/contributor-guide/profiling.md
@@ -276,8 +276,8 @@ objects are created during data transfer between JVM and native code:
$ASYNC_PROFILER_HOME/bin/asprof -e alloc -d 60 -f alloc-profile.html
```
-Look for allocations in `CometExecIterator`, `CometBatchIterator`, and Arrow vector
-classes.
+Look for allocations in `CometExecIterator` (native to JVM output), `CometArrowStream` (JVM to native
+input export), and Arrow vector classes.
### Isolate Rust-only performance issues