Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5c41215
Passes CometFuzzTestSuite, CometNativeShuffleSuite, CometExecSuite.
mbutrovich May 29, 2026
8c99fc5
Passes CometFuzzTestSuite, CometNativeShuffleSuite, CometExecSuite.
mbutrovich May 29, 2026
443a1c7
Cleanup, update docs.
mbutrovich May 29, 2026
cc7c5be
handle arrow type mismatches on child stream in native shuffle.
mbutrovich May 29, 2026
c76a263
stash
mbutrovich May 29, 2026
1b55b97
refactor to handle on JVM side.
mbutrovich May 29, 2026
d736dd5
remove instrumentation.
mbutrovich May 29, 2026
214a75b
cleanup.
mbutrovich May 29, 2026
e8e438f
cleanup.
mbutrovich May 29, 2026
29d9e19
Merge branch 'main' into opt_native_shuffle
mbutrovich May 29, 2026
cdbb0e6
Merge branch 'main' into opt_native_shuffle
mbutrovich May 29, 2026
1913208
fix aggregation wrapping now that we don't have an extra CometExecIte…
mbutrovich May 29, 2026
327a653
Remove archeology comments.
mbutrovich May 29, 2026
0b71de4
Undo stricter tests since they're not happy on Spark 3.x.
mbutrovich May 29, 2026
a6744eb
Merge branch 'main' into opt_native_shuffle
mbutrovich May 29, 2026
1ecfd8a
Remove unintended change.
mbutrovich May 29, 2026
51d4d42
Merge remote-tracking branch 'apache/main' into opt_native_shuffle
mbutrovich May 30, 2026
4dba7ea
Merge branch 'main' into opt_native_shuffle
mbutrovich Jun 2, 2026
b43b5da
Merge branch 'main' into opt_native_shuffle
mbutrovich Jun 2, 2026
bfbae18
Merge branch 'main' into opt_native_shuffle
mbutrovich Jun 2, 2026
a6a6c77
Address PR feedback.
mbutrovich Jun 2, 2026
5cfb888
Merge branch 'main' into opt_native_shuffle
mbutrovich Jun 2, 2026
f987601
Merge branch 'main' into opt_native_shuffle
mbutrovich Jun 3, 2026
fe0c949
Merge branch 'main' into opt_native_shuffle
mbutrovich Jun 3, 2026
f909136
Fix batch size calculation in native shuffle writer, and task metrics…
mbutrovich Jun 4, 2026
51b034c
Merge branch 'main' into opt_native_shuffle
mbutrovich Jun 4, 2026
7690ee3
Merge branch 'main' into opt_native_shuffle
mbutrovich Jun 8, 2026
71dc97d
Merge branch 'main' into opt_native_shuffle
mbutrovich Jun 9, 2026
5138bc0
add tests to schema_align.rs
mbutrovich Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions docs/source/contributor-guide/native_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
┌─────────────────────────────────────────────────────────────────────────────┐
│ CometNativeShuffleWriter │
│ - Constructs protobuf operator plan │
│ - Invokes native execution via CometExec.getCometIterator() │
│ - Builds protobuf operator plan: ShuffleWriter(child = childNativeOp) │
│ - Reads per-partition leaf iterators from CometNativeShuffleInputIterator │
│ - Drives one CometExecIterator per partition │
└─────────────────────────────────────────────────────────────────────────────┘
▼ (JNI)
Expand Down Expand Up @@ -103,13 +104,14 @@ Native shuffle (`CometExchange`) is selected when all of the following condition

### Scala Side

| Class | Location | Description |
| ------------------------------ | ------------------------------------------------ | --------------------------------------------------------------------------------------------- |
| `CometShuffleExchangeExec` | `.../shuffle/CometShuffleExchangeExec.scala` | Physical plan node. Validates types and partitioning, creates `CometShuffleDependency`. |
| `CometNativeShuffleWriter` | `.../shuffle/CometNativeShuffleWriter.scala` | Implements `ShuffleWriter`. Builds protobuf plan and invokes native execution. |
| `CometShuffleDependency` | `.../shuffle/CometShuffleDependency.scala` | Extends `ShuffleDependency`. Holds shuffle type, schema, and range partition bounds. |
| `CometBlockStoreShuffleReader` | `.../shuffle/CometBlockStoreShuffleReader.scala` | Reads shuffle blocks via `ShuffleBlockFetcherIterator`. Decodes Arrow IPC to `ColumnarBatch`. |
| `NativeBatchDecoderIterator` | `.../shuffle/NativeBatchDecoderIterator.scala` | Reads compressed Arrow IPC from input stream. Calls native decode via JNI. |
| Class | Location | Description |
| ------------------------------ | ------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------- |
| `CometShuffleExchangeExec` | `.../shuffle/CometShuffleExchangeExec.scala` | Physical plan node. Validates types and partitioning, creates `CometShuffleDependency`. |
| `CometNativeShuffleWriter` | `.../shuffle/CometNativeShuffleWriter.scala` | Implements `ShuffleWriter`. Builds the unified `ShuffleWriter(child = childNativeOp)` plan and runs it in one `CometExecIterator` per partition. |
| `CometShuffleDependency` | `.../shuffle/CometShuffleDependency.scala` | Extends `ShuffleDependency`. Holds shuffle type, schema, range partition bounds, and (native shuffle only) a `NativeShuffleSpec`. |
| `CometNativeShuffleInputRDD` | `.../shuffle/CometNativeShuffleInputRDD.scala` | Thin scheduling-anchor RDD on the native-shuffle path. `compute` returns a `CometNativeShuffleInputIterator` carrying per-partition leaf iterators. |
| `CometBlockStoreShuffleReader` | `.../shuffle/CometBlockStoreShuffleReader.scala` | Reads shuffle blocks via `ShuffleBlockFetcherIterator`. Decodes Arrow IPC to `ColumnarBatch`. |
| `NativeBatchDecoderIterator` | `.../shuffle/NativeBatchDecoderIterator.scala` | Reads compressed Arrow IPC from input stream. Calls native decode via JNI. |

### Rust Side

Expand All @@ -123,11 +125,19 @@ Native shuffle (`CometExchange`) is selected when all of the following condition

### Write Path

1. **Plan construction**: `CometNativeShuffleWriter` builds a protobuf operator plan containing:
- A scan operator reading from the input iterator
- A `ShuffleWriter` operator with partitioning config and compression codec

2. **Native execution**: `CometExec.getCometIterator()` executes the plan in Rust.
1. **Plan construction**: `CometNativeShuffleWriter` builds a protobuf operator tree with a
`ShuffleWriter` operator at the root and `childNativeOp` as its child. `childNativeOp` takes
one of two shapes:
- The child plan's `nativeOp` directly, when `CometShuffleExchangeExec`'s child is a
`CometNativeExec` subtree. The upstream operators run inside the same `CometExecIterator`
as the writer, with no JVM-to-native batch boundary between them.
- A synthetic `Scan("ShuffleWriterInput")` placeholder, when the dep was built via the
convenience `prepareShuffleDependency(rdd, ...)` overload (used by
`CometCollectLimitExec` and `CometTakeOrderedAndProjectExec`, or when the
exchange's child is a non-native `CometPlan` such as `CometSparkToColumnarExec`). Native
code reads `ColumnarBatch`es from the JVM input iterator via Arrow C Stream Interface.

2. **Native execution**: A single `CometExecIterator` per partition runs the unified plan.

3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner:
- `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin partitioning
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub use parquet_writer::ParquetWriterExec;
mod csv_scan;
pub mod projection;
mod scan;
mod schema_align;
mod shuffle_scan;
pub use csv_scan::init_csv_datasource_exec;
pub use schema_align::SchemaAlignExec;
pub use shuffle_scan::ShuffleScanExec;
Loading
Loading