diff --git a/docs/source/_static/images/comet-dataflow.excalidraw b/docs/source/_static/images/comet-dataflow.excalidraw index dd12099834..28c81d40b0 100644 --- a/docs/source/_static/images/comet-dataflow.excalidraw +++ b/docs/source/_static/images/comet-dataflow.excalidraw @@ -234,10 +234,6 @@ "type": "text", "id": "yHFb7s7QYOWZst8xXlFG2" }, - { - "id": "Jd5Fqfx6eFl_OJ6x0TUki", - "type": "arrow" - }, { "id": "7KEns52XY_jok50o5G5op", "type": "arrow" @@ -387,10 +383,10 @@ { "id": "twg3z-vK6jWmVl4xySGde", "type": "text", - "x": 396.18428047371066, - "y": 341.33772616179004, - "width": 203.32500000000002, - "height": 25, + "x": 402.39679115486297, + "y": 328.83772616179004, + "width": 190.8999786376953, + "height": 50, "angle": 0.003703686768755432, "strokeColor": "#1e1e1e", "backgroundColor": "#a5d8ff", @@ -404,20 +400,20 @@ "index": "b0h", "roundness": null, "seed": 34654423, - "version": 631, - "versionNonce": 1121311223, + "version": 633, + "versionNonce": 1471420929, "isDeleted": false, "boundElements": [], - "updated": 1733167372551, + "updated": 1781542105529, "link": null, "locked": false, - "text": "CometBatchIterator", + "text": "ArrowReader\n(ArrowArrayStream)", "fontSize": 20, "fontFamily": 5, "textAlign": "center", "verticalAlign": "middle", "containerId": "Ro2R78aPw-luRF_bB2EKU", - "originalText": "CometBatchIterator", + "originalText": "ArrowReader\n(ArrowArrayStream)", "autoResize": true, "lineHeight": 1.25 }, @@ -854,16 +850,20 @@ ], "lastCommittedPoint": null, "startBinding": { + "mode": "orbit", "elementId": "GHKyE6o_at1-J0KO1mWpt", - "focus": -0.48947127224675757, - "gap": 1, - "fixedPoint": null + "fixedPoint": [ + 0.5155015866677681, + 0.5155015866677681 + ] }, "endBinding": { + "mode": "orbit", "elementId": "C3-eUJazhRorbXp9Um-Mo", - "focus": -1.9941089907787155, - "gap": 12.73052391874603, - "fixedPoint": null + "fixedPoint": [ + 0.8253968253968254, + 1.5092209567498411 + ] }, "startArrowhead": null, "endArrowhead": "arrow", @@ -910,16 +910,20 @@ ], "lastCommittedPoint": null, "startBinding": { + "mode": "orbit", "elementId": "aiAipugp154jY5IgHqjTm", - "focus": 0.1262072643242283, - "gap": 1, - "fixedPoint": null + "fixedPoint": [ + 0, + 0.5001 + ] }, "endBinding": { + "mode": "orbit", "elementId": "GHKyE6o_at1-J0KO1mWpt", - "focus": 0.37801089214584216, - "gap": 1, - "fixedPoint": null + "fixedPoint": [ + 0.6741118143201608, + 0.6741118143201603 + ] }, "startArrowhead": null, "endArrowhead": "arrow", @@ -950,7 +954,7 @@ "version": 139, "versionNonce": 2098561815, "isDeleted": false, - "boundElements": null, + "boundElements": [], "updated": 1733167372551, "link": null, "locked": false, @@ -966,16 +970,20 @@ ], "lastCommittedPoint": null, "startBinding": { + "mode": "orbit", "elementId": "7VTYHzsqQvUuKMy0ShKZn", - "focus": 0.261904761904762, - "gap": 3.5, - "fixedPoint": null + "fixedPoint": [ + 0, + 0.5001 + ] }, "endBinding": { + "mode": "orbit", "elementId": "Ro2R78aPw-luRF_bB2EKU", - "focus": -0.3765315568105985, - "gap": 2.2509344960505473, - "fixedPoint": null + "fixedPoint": [ + 0.721060978197207, + 0.2789390218027937 + ] }, "startArrowhead": null, "endArrowhead": "arrow", @@ -1006,7 +1014,7 @@ "version": 131, "versionNonce": 1606617143, "isDeleted": false, - "boundElements": null, + "boundElements": [], "updated": 1733167372551, "link": null, "locked": false, @@ -1022,16 +1030,20 @@ ], "lastCommittedPoint": null, "startBinding": { + "mode": "orbit", "elementId": "Ro2R78aPw-luRF_bB2EKU", - "focus": 0.31181090317651905, - "gap": 1.3694590603334404, - "fixedPoint": null + "fixedPoint": [ + 1.0000000000000002, + 0.5001 + ] }, "endBinding": { + "mode": "orbit", "elementId": "7VTYHzsqQvUuKMy0ShKZn", - "focus": -0.5000000000000002, - "gap": 1.5, - "fixedPoint": null + "fixedPoint": [ + 0.22549507027974353, + 0.7745049297202583 + ] }, "startArrowhead": null, "endArrowhead": "arrow", @@ -1062,7 +1074,7 @@ "version": 252, "versionNonce": 1005623161, "isDeleted": false, - "boundElements": null, + "boundElements": [], "updated": 1733167373032, "link": null, "locked": false, @@ -1078,16 +1090,20 @@ ], "lastCommittedPoint": null, "startBinding": { + "mode": "inside", "elementId": "Ro2R78aPw-luRF_bB2EKU", - "focus": 0.46419678699387723, - "gap": 1, - "fixedPoint": null + "fixedPoint": [ + 0.27180835287146266, + 0.9969178948258344 + ] }, "endBinding": { + "mode": "orbit", "elementId": "GHKyE6o_at1-J0KO1mWpt", - "focus": -0.3880655447790852, - "gap": 2.3015909311958467, - "fixedPoint": null + "fixedPoint": [ + 0.25764776855868854, + 0.2576477685586875 + ] }, "startArrowhead": null, "endArrowhead": "arrow", @@ -1118,7 +1134,7 @@ "version": 236, "versionNonce": 55168313, "isDeleted": false, - "boundElements": null, + "boundElements": [], "updated": 1733167373032, "link": null, "locked": false, @@ -1134,16 +1150,20 @@ ], "lastCommittedPoint": null, "startBinding": { + "mode": "orbit", "elementId": "GHKyE6o_at1-J0KO1mWpt", - "focus": 0.4579838276843583, - "gap": 2.723810257547825, - "fixedPoint": null + "fixedPoint": [ + 0.7654127769256563, + 0.23458722307434404 + ] }, "endBinding": { + "mode": "orbit", "elementId": "Ro2R78aPw-luRF_bB2EKU", - "focus": -0.43910468547182224, - "gap": 1, - "fixedPoint": null + "fixedPoint": [ + 0.754483118562463, + 0.7544831185624622 + ] }, "startArrowhead": null, "endArrowhead": "arrow", @@ -1174,7 +1194,7 @@ "version": 132, "versionNonce": 1271110743, "isDeleted": false, - "boundElements": null, + "boundElements": [], "updated": 1733167393399, "link": null, "locked": false, @@ -1190,16 +1210,20 @@ ], "lastCommittedPoint": null, "startBinding": { + "mode": "orbit", "elementId": "macb6DKtgx8DhcqjKk6no", - "focus": -0.4652381310069499, - "gap": 3.4138894826694752, - "fixedPoint": null + "fixedPoint": [ + 0.5288290914529353, + 0.4711709085470648 + ] }, "endBinding": { + "mode": "orbit", "elementId": "OFwuou30qsm3aMZ96ASUO", - "focus": -1.5667144994299218, - "gap": 7.5, - "fixedPoint": null + "fixedPoint": [ + 0.8333333333333334, + 1.3 + ] }, "startArrowhead": null, "endArrowhead": "arrow", @@ -1292,8 +1316,8 @@ { "id": "2ZoBSXI-amAjEfzxoQ17b", "type": "text", - "x": 749.1634633724364, - "y": 308.8333435058594, + "x": 687.6673696224364, + "y": 305.5325622558594, "width": 111, "height": 25, "angle": 0, @@ -1309,20 +1333,20 @@ "index": "b18", "roundness": null, "seed": 1238305721, - "version": 97, - "versionNonce": 1368434199, + "version": 219, + "versionNonce": 1166686625, "isDeleted": false, "boundElements": [], - "updated": 1733167188224, + "updated": 1781542080581, "link": null, "locked": false, - "text": "next()", + "text": "get_next()", "fontSize": 20, "fontFamily": 5, "textAlign": "left", "verticalAlign": "top", "containerId": null, - "originalText": "next()", + "originalText": "get_next()", "autoResize": false, "lineHeight": 1.25 }, @@ -1366,8 +1390,8 @@ { "id": "VFm7kotI1oNa1rIxLMh6W", "type": "text", - "x": 676.6634633724364, - "y": 376.3333435058594, + "x": 694.2806508724364, + "y": 370.1302185058594, "width": 147, "height": 25, "angle": 0, @@ -1383,20 +1407,20 @@ "index": "b1A", "roundness": null, "seed": 1623222905, - "version": 76, - "versionNonce": 1030050969, + "version": 161, + "versionNonce": 189588193, "isDeleted": false, - "boundElements": null, - "updated": 1733167271120, + "boundElements": [], + "updated": 1781542085214, "link": null, "locked": false, - "text": "exportBatch()", + "text": "batch", "fontSize": 20, "fontFamily": 5, "textAlign": "left", "verticalAlign": "top", "containerId": null, - "originalText": "exportBatch()", + "originalText": "batch", "autoResize": false, "lineHeight": 1.25 }, @@ -1423,7 +1447,7 @@ "version": 127, "versionNonce": 1917573399, "isDeleted": false, - "boundElements": null, + "boundElements": [], "updated": 1733167377483, "link": null, "locked": false, @@ -1462,7 +1486,7 @@ "version": 35, "versionNonce": 652639415, "isDeleted": false, - "boundElements": null, + "boundElements": [], "updated": 1733167385065, "link": null, "locked": false, @@ -1478,16 +1502,20 @@ ], "lastCommittedPoint": null, "startBinding": { + "mode": "inside", "elementId": "lSUrwgLq2W49ULouPfm0h", - "focus": -0.2114558118557865, - "gap": 1, - "fixedPoint": null + "fixedPoint": [ + 0.002027658615173893, + 0.6130952380952382 + ] }, "endBinding": { + "mode": "orbit", "elementId": "macb6DKtgx8DhcqjKk6no", - "focus": 0.3654122251883526, - "gap": 1, - "fixedPoint": null + "fixedPoint": [ + 0.7030751689956286, + 0.7030751689956288 + ] }, "startArrowhead": null, "endArrowhead": "arrow", @@ -1555,7 +1583,7 @@ "version": 17, "versionNonce": 1070980535, "isDeleted": false, - "boundElements": null, + "boundElements": [], "updated": 1733167413149, "link": null, "locked": false, @@ -1571,16 +1599,20 @@ ], "lastCommittedPoint": null, "startBinding": { + "mode": "orbit", "elementId": "lSUrwgLq2W49ULouPfm0h", - "focus": 0.4183574316825765, - "gap": 11.500000000000014, - "fixedPoint": null + "fixedPoint": [ + 0.22193792575017277, + 0.7780620742498273 + ] }, "endBinding": { + "mode": "orbit", "elementId": "7VTYHzsqQvUuKMy0ShKZn", - "focus": -0.37462537462537454, - "gap": 8, - "fixedPoint": null + "fixedPoint": [ + 0.23385173574258802, + 0.23385173574258805 + ] }, "startArrowhead": null, "endArrowhead": "arrow", @@ -1611,7 +1643,7 @@ "version": 15, "versionNonce": 1869550297, "isDeleted": false, - "boundElements": null, + "boundElements": [], "updated": 1733167417649, "link": null, "locked": false, @@ -1627,16 +1659,20 @@ ], "lastCommittedPoint": null, "startBinding": { + "mode": "orbit", "elementId": "7VTYHzsqQvUuKMy0ShKZn", - "focus": 0.4912563895614742, - "gap": 14, - "fixedPoint": null + "fixedPoint": [ + 0.8306354865497666, + 0.169364513450234 + ] }, "endBinding": { + "mode": "orbit", "elementId": "lSUrwgLq2W49ULouPfm0h", - "focus": -0.4789893168742339, - "gap": 7.500000000000014, - "fixedPoint": null + "fixedPoint": [ + 0.8128526092800505, + 0.8128526092800512 + ] }, "startArrowhead": null, "endArrowhead": "arrow", @@ -1831,7 +1867,7 @@ "version": 108, "versionNonce": 91341817, "isDeleted": false, - "boundElements": null, + "boundElements": [], "updated": 1733167501667, "link": null, "locked": false, @@ -2092,7 +2128,7 @@ "version": 40, "versionNonce": 1347291095, "isDeleted": false, - "boundElements": null, + "boundElements": [], "updated": 1733167585167, "link": null, "locked": false, @@ -2108,16 +2144,20 @@ ], "lastCommittedPoint": null, "startBinding": { + "mode": "orbit", "elementId": "lSUrwgLq2W49ULouPfm0h", - "focus": 0.04618975520292551, - "gap": 1.496776360717604, - "fixedPoint": null + "fixedPoint": [ + 0.9999999999999998, + 0.5001 + ] }, "endBinding": { + "mode": "orbit", "elementId": "xdQ0w3-b5BGEpSvQ2Uc8A", - "focus": -0.13841786234942072, - "gap": 1, - "fixedPoint": null + "fixedPoint": [ + 0, + 0.5001 + ] }, "startArrowhead": null, "endArrowhead": "arrow", @@ -2128,7 +2168,8 @@ "gridSize": 20, "gridStep": 5, "gridModeEnabled": false, - "viewBackgroundColor": "#ffffff" + "viewBackgroundColor": "#ffffff", + "lockedMultiSelections": {} }, "files": {} } \ No newline at end of file diff --git a/docs/source/_static/images/comet-dataflow.svg b/docs/source/_static/images/comet-dataflow.svg index 20a573c1fa..2fc8e186a1 100644 --- a/docs/source/_static/images/comet-dataflow.svg +++ b/docs/source/_static/images/comet-dataflow.svg @@ -1,10 +1,4 @@ - - - - - - - - JVMNativeShuffleWriterExecCometExecIteratorNative PlanCometBatchIteratorCometExecIteratorScanExecProjectExecNative Plan...executePlan()executePlan()next()next()exportBatch()importVectors()importVectors()next()next()batchbatchbatchShuffle FilesArrow IPC Batch \ No newline at end of file + + +JVMNativeShuffleWriterExecCometExecIteratorNative PlanArrowReader(ArrowArrayStream)CometExecIteratorScanExecProjectExecNative Plan...executePlan()executePlan()get_next()next()batchimportVectors()importVectors()next()next()batchbatchbatchShuffle FilesArrow IPC Batch \ No newline at end of file diff --git a/docs/source/contributor-guide/ffi.md b/docs/source/contributor-guide/ffi.md index 24e6843ba1..f45fb9beaf 100644 --- a/docs/source/contributor-guide/ffi.md +++ b/docs/source/contributor-guide/ffi.md @@ -21,10 +21,14 @@ under the License. ## Overview -Comet uses the [Arrow C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) for zero-copy data transfer in two directions: +Comet transfers Arrow data across the JVM/native boundary in two directions: -1. **JVM → Native**: Native code pulls batches from JVM using `CometBatchIterator` -2. **Native → JVM**: JVM pulls batches from native code using `CometExecIterator` +1. **JVM → Native**: Native code pulls batches from the JVM over the + [Arrow C Stream Interface](https://arrow.apache.org/docs/format/CStreamInterface.html). The JVM exports each + per-partition iterator once as an `ArrowArrayStream`, and native pulls every batch through a single C callback. +2. **Native → JVM**: JVM pulls batches from native code using `CometExecIterator`, via the + [Arrow C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) (one `ArrowArray`/`ArrowSchema` + pair per batch). The following diagram shows an example of the end-to-end flow for a query stage. @@ -39,6 +43,11 @@ The Arrow C Data Interface defines two C structures: - `ArrowArray`: Contains pointers to data buffers and metadata - `ArrowSchema`: Contains type information +The Arrow C Stream Interface builds on these with a third structure: + +- `ArrowArrayStream`: A stream of `ArrowArray`s sharing one `ArrowSchema`, pulled one at a time through a + `get_next` C callback. This is how Comet transfers JVM-sourced input (see below). + ### Key Characteristics - **Zero-copy**: Data buffers can be shared across language boundaries without copying @@ -49,24 +58,27 @@ The Arrow C Data Interface defines two C structures: ### Architecture -When native code needs data from the JVM, it uses `ScanExec` which calls into `CometBatchIterator`: +When native code needs data from the JVM, it uses `ScanExec`, which is backed by an Arrow C Stream that the JVM +exports once per partition: ``` ┌─────────────────┐ │ Spark/Scala │ -│ CometExecIter │ +│ Iterator of │ +│ batches or rows │ └────────┬────────┘ - │ produces batches + │ wrapped in an ArrowReader, exported once + │ via Data.exportArrayStream ▼ ┌─────────────────┐ -│ CometBatchIter │ ◄─── JNI call from native -│ (JVM side) │ +│ ArrowArrayStream│ ── JVM side: one C stream struct per partition +│ (C struct) │ └────────┬────────┘ - │ Arrow FFI - │ (transfers ArrowArray/ArrowSchema pointers) + │ Arrow C Stream Interface + │ (native pulls each batch via the get_next callback) ▼ ┌─────────────────┐ -│ ScanExec │ +│ ScanExec │ ── owns an AlignedArrowStreamReader │ (Rust/native) │ └────────┬────────┘ │ @@ -77,32 +89,38 @@ When native code needs data from the JVM, it uses `ScanExec` which calls into `C └─────────────────┘ ``` -### FFI Transfer Process +### Stream Export and Import -The data transfer happens in `ScanExec::get_next()`: +On the JVM side, `CometArrowStream` (in `execution/arrow/CometNativeArrowSource.scala`) wraps each per-partition +input in an `org.apache.arrow.vector.ipc.ArrowReader` and exports it once with `Data.exportArrayStream`. The reader +implementation depends on the source of the data: -```rust -// 1. Allocate FFI structures on native side (Rust heap) -for _ in 0..num_cols { - let arrow_array = Rc::new(FFI_ArrowArray::empty()); - let arrow_schema = Rc::new(FFI_ArrowSchema::empty()); - let array_ptr = Rc::into_raw(arrow_array) as i64; - let schema_ptr = Rc::into_raw(arrow_schema) as i64; - // Store pointers... -} +- `RowArrowReader`: a Spark `Iterator[InternalRow]` (row input) +- `SparkColumnarArrowReader`: a non-Arrow Spark `ColumnarBatch` +- `ColumnarBatchArrowReader`: an Arrow-backed `ColumnarBatch` (transfers `VectorSchemaRoot` ownership) -// 2. Call JVM to populate FFI structures -let num_rows: i32 = unsafe { - jni_call!(env, comet_batch_iterator(iter).next(array_obj, schema_obj) -> i32)? -}; +The exported `ArrowArrayStream`s are boxed into the `Array[Object]` that `CometExecIterator` / `CometExecRDD` pass +to native `createPlan` (one slot per scan input; shuffle inputs pass a `CometShuffleBlockIterator` instead). -// 3. Import data from FFI structures -for i in 0..num_cols { - let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?; - let array = make_array(array_data); - // ... process array -} -``` +On the native side, `planner.rs` reads each stream's `memoryAddress` and takes ownership through +`AlignedArrowStreamReader::from_raw`, importing the schema once. `ScanExec::get_next_batch` then pulls each batch +through the stream's `get_next` callback. There is no per-batch JNI call and no per-column FFI export. + +### Buffer Alignment (AlignedArrowStreamReader) + +`AlignedArrowStreamReader` (in `execution/operators/aligned_stream_reader.rs`) wraps the imported stream and calls +`align_buffers` on every batch before constructing typed arrays. This works around the fact that Java's allocator +hands back `Decimal128` buffers at 8-byte (not 16-byte) alignment, which the stock `ArrowArrayStreamReader` rejects +([apache/arrow-rs#10028](https://github.com/apache/arrow-rs/issues/10028)). The fix +([apache/arrow-rs#10030](https://github.com/apache/arrow-rs/pull/10030)) makes import align internally and ships in +arrow 59.0.0; once Comet is on arrow >= 59 this reader can be dropped for the stock `ArrowArrayStreamReader`. + +### Schema Reconciliation + +`CometArrowStream.reconcileStreamSchema` advertises the stream's schema from the actual `CometVector` types in the +first batch rather than the consumer's Spark-declared types. Native `ScanExec` already casts its input to the +declared scan-input schema in `build_record_batch`, so the truthful first-batch schema lets that cast fire; if the +two differ, it logs one deduplicated warning naming the operator, column, and type drift. ### Memory Layout @@ -126,68 +144,19 @@ Off-heap Memory: │ └──────────────────┘ ``` -**Key Point**: The actual data buffers can be off-heap, but the `ArrowArray` and `ArrowSchema` wrapper objects are **always allocated on the JVM heap**. - -### Wrapper Object Lifecycle - -When arrays are created in the JVM and passed to native code, the JVM creates the array data off-heap and creates -wrapper objects `ArrowArray` and `ArrowSchema` on-heap. These wrapper objects can consume significant memory over -time. - -``` -Per batch overhead on JVM heap: -- ArrowArray object: ~100 bytes -- ArrowSchema object: ~100 bytes -- Per column: ~200 bytes -- 100 columns × 1000 batches = ~20 MB of wrapper objects -``` - -When native code pulls batches from the JVM, the JVM wrapper objects are kept alive until the native code drops -all references to the arrays. - -When operators such as `SortExec` fetch many batches and buffer them in native code, the number of wrapper objects -in Java on-heap memory keeps growing until the batches are released in native code at the end of the sort operation. - -### Ownership Transfer - -The Arrow C data interface supports ownership transfer by registering callbacks in the C struct that is passed over -the JNI boundary for the function to delete the array data. For example, the `ArrowArray` struct has: - -```c -// Release callback -void (*release)(struct ArrowArray*); -``` - -Comet currently does not always follow best practice around ownership transfer because there are some cases where -Comet JVM code will retain references to arrays after passing them to native code and may mutate the underlying -buffers. There is an `arrow_ffi_safe` flag in the protocol buffer definition of `Scan` that indicates whether -ownership is being transferred according to the Arrow C data interface specification. - -```protobuf -message Scan { - repeated spark.spark_expression.DataType fields = 1; - // The source of the scan (e.g. file scan, broadcast exchange, shuffle, etc). This - // is purely for informational purposes when viewing native query plans in - // debug mode. - string source = 2; - // Whether native code can assume ownership of batches that it receives - bool arrow_ffi_safe = 3; -} -``` - -#### When ownership is NOT transferred to native: - -If the data originates from a scan that uses mutable buffers -then ownership is not transferred to native and the JVM may re-use the underlying buffers in the future. +**Key Point**: The actual data buffers are shared zero-copy; native only takes pointers to the off-heap buffers. -It is critical that the native code performs a deep copy of the arrays if the arrays are to be buffered by -operators such as `SortExec` or `ShuffleWriterExec`, otherwise data corruption is likely to occur. +### Ownership and Lifecycle -#### When ownership IS transferred to native: +The Arrow C Stream Interface transfers ownership by reference count: native takes ownership of each imported batch, +so it is safe to buffer batches in operators such as `SortExec` or `ShuffleWriterExec` without a deep copy. -When ownership is transferred, it is safe to buffer batches in native. However, JVM wrapper objects will not be -released until the native batches are dropped. This can lead to OOM or GC pressure if there is not enough Java -heap memory configured. +The whole per-partition stream is exported once, so the JVM allocates one `ArrowArrayStream` per partition rather +than a per-batch, per-column `ArrowArray`/`ArrowSchema` wrapper object pair. Lifecycle is anchored at the stream: when +`ScanExec` drops its `AlignedArrowStreamReader`, the stream's release callback fires synchronously back into the JVM +and closes the `ArrowReader` and its `VectorSchemaRoot`, releasing the off-heap buffers. Because native holds those +buffers until the reader drops, an operator that buffers many batches keeps the corresponding JVM-side data alive +until then. ## Native → JVM Data Flow (CometExecIterator) @@ -342,10 +311,9 @@ arrowBuf.close(); // → calls native release_batch() ### JVM → Native -| Scenario | `arrow_ffi_safe` | Ownership | Action Required | -| ------------------ | ---------------- | ----------- | -------------------------------------- | -| Temporary scan | `false` | JVM keeps | **Must deep copy** to avoid corruption | -| Ownership transfer | `true` | Native owns | Copy only to unpack dictionaries | +| Scenario | Ownership | Action Required | +| --------- | ----------- | -------------------------------------------------------------------------------------------------------------------------------------------- | +| All cases | Native owns | None; the C Stream transfers ownership by reference count (copy only to unpack dictionaries). Dropping the reader releases the JVM-side data | ### Native → JVM @@ -356,5 +324,6 @@ arrowBuf.close(); // → calls native release_batch() ## Further Reading - [Arrow C Data Interface Specification](https://arrow.apache.org/docs/format/CDataInterface.html) +- [Arrow C Stream Interface Specification](https://arrow.apache.org/docs/format/CStreamInterface.html) - [Arrow Java FFI Implementation](https://github.com/apache/arrow/tree/main/java/c) - [Arrow Rust FFI Implementation](https://docs.rs/arrow/latest/arrow/ffi/) diff --git a/docs/source/contributor-guide/plugin_overview.md b/docs/source/contributor-guide/plugin_overview.md index 444c9ccb60..c3330a65d6 100644 --- a/docs/source/contributor-guide/plugin_overview.md +++ b/docs/source/contributor-guide/plugin_overview.md @@ -96,9 +96,10 @@ In the native code there is a `PhysicalPlanner` struct (in `planner.rs`) which c Apache DataFusion `ExecutionPlan`. In some cases, Comet provides specialized physical operators and expressions to override the DataFusion versions to ensure compatibility with Apache Spark. -The leaf nodes in the physical plan are always `ScanExec` and each of these operators will make a JNI call to -`CometBatchIterator.next()` to fetch the next input batch. The input could be a Comet native Parquet scan, -a Spark exchange, or another native plan. +The leaf nodes in the physical plan are always `ScanExec`. Each JVM-sourced input is exported once as an +[Arrow C Stream](https://arrow.apache.org/docs/format/CStreamInterface.html) (`org.apache.arrow.c.ArrowArrayStream`), +and `ScanExec` pulls each input batch through a single C callback rather than making a JNI call per batch. The +input could be a Comet native Parquet scan, a Spark exchange, or another native plan. `CometNativeExec` creates a `CometExecIterator` and applies this iterator to the input RDD partitions. Each call to `CometExecIterator.next()` will invoke `Native.executePlan`. Once the plan finishes diff --git a/docs/source/contributor-guide/profiling.md b/docs/source/contributor-guide/profiling.md index 67729a235e..1c4faa5a86 100644 --- a/docs/source/contributor-guide/profiling.md +++ b/docs/source/contributor-guide/profiling.md @@ -276,8 +276,8 @@ objects are created during data transfer between JVM and native code: $ASYNC_PROFILER_HOME/bin/asprof -e alloc -d 60 -f alloc-profile.html ``` -Look for allocations in `CometExecIterator`, `CometBatchIterator`, and Arrow vector -classes. +Look for allocations in `CometExecIterator` (native to JVM output), `CometArrowStream` (JVM to native +input export), and Arrow vector classes. ### Isolate Rust-only performance issues