From 2b534b342b87fe76c2eecf512fb30e8042b6c914 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 4 Jul 2026 20:53:19 -0700 Subject: [PATCH] [python] Add Split.to_dict() to expose planned split contents Let a non-Rust reader (e.g. pypaimon) rebuild its own DataSplit from a Rust-planned split and read the files directly, without re-running its own scan planning. to_dict() exposes bucket / bucket_path / total_buckets / partition / raw_convertible and, per data file, the fully-resolved file_path plus scalar metadata (schema_id, level, sequence numbers, first_row_id, write_cols, creation_time, ...) and aligned per-file deletion files. partition is the serialized BinaryRow, byte-identical to a manifest _PARTITION. Planning-only statistics (key/value stats, min/max key) are omitted since planning already happened. Tested via test_split_to_dict_exposes_fields and test_split_to_dict_partition_and_reads. --- bindings/python/src/read.rs | 87 +++++++++++++++++++++++++++++- bindings/python/tests/test_read.py | 69 ++++++++++++++++++++++++ 2 files changed, 155 insertions(+), 1 deletion(-) diff --git a/bindings/python/src/read.rs b/bindings/python/src/read.rs index 3cd662c5..30a392a6 100644 --- a/bindings/python/src/read.rs +++ b/bindings/python/src/read.rs @@ -25,7 +25,7 @@ use paimon::table::{DataSplit, Table}; use paimon_datafusion::runtime::runtime; use pyo3::exceptions::{PyTypeError, PyValueError}; use pyo3::prelude::*; -use pyo3::types::{PyBytes, PyDict}; +use pyo3::types::{PyBytes, PyDict, PyList}; use crate::error::to_py_err; use crate::predicate::dict_to_predicate; @@ -309,6 +309,91 @@ impl PySplit { self.inner.row_count() } + /// Expose this planned split as a plain dict so a non-Rust reader (e.g. + /// pypaimon) can rebuild a split and read the files without re-planning. + /// `partition` is a serialized `BinaryRow` (as in a manifest `_PARTITION`); + /// `row_ranges` `{from, to}` is inclusive; a deletion `path` is fully + /// resolved; planning-only stats are omitted. + fn to_dict<'py>(&self, py: Python<'py>) -> PyResult> { + let s = &self.inner; + let d = PyDict::new(py); + d.set_item("snapshot_id", s.snapshot_id())?; + d.set_item("bucket", s.bucket())?; + d.set_item("bucket_path", s.bucket_path())?; + d.set_item("total_buckets", s.total_buckets())?; + d.set_item("raw_convertible", s.raw_convertible())?; + let partition_bytes = s.partition().to_serialized_bytes(); + d.set_item("partition", PyBytes::new(py, &partition_bytes))?; + + // Row ranges (sub-file slicing) must not be dropped, else a consumer that + // rebuilds this split would over-read the whole file. None for plain splits. + match s.row_ranges() { + Some(ranges) => { + let out = PyList::empty(py); + for r in ranges { + let rd = PyDict::new(py); + rd.set_item("from", r.from())?; + rd.set_item("to", r.to())?; + out.append(rd)?; + } + d.set_item("row_ranges", out)?; + } + None => d.set_item("row_ranges", py.None())?, + } + + let files = PyList::empty(py); + let deletions = PyList::empty(py); + let has_deletions = s.data_deletion_files().is_some(); + for (i, f) in s.data_files().iter().enumerate() { + let fd = PyDict::new(py); + fd.set_item("file_name", f.file_name.as_str())?; + fd.set_item("file_path", s.data_file_path(f))?; + fd.set_item("file_size", f.file_size)?; + fd.set_item("row_count", f.row_count)?; + fd.set_item("min_sequence_number", f.min_sequence_number)?; + fd.set_item("max_sequence_number", f.max_sequence_number)?; + fd.set_item("schema_id", f.schema_id)?; + fd.set_item("level", f.level)?; + fd.set_item("extra_files", f.extra_files.clone())?; + fd.set_item( + "creation_time", + f.creation_time.map(|t| t.timestamp_millis()), + )?; + fd.set_item("delete_row_count", f.delete_row_count)?; + fd.set_item( + "embedded_index", + f.embedded_index.as_deref().map(|b| PyBytes::new(py, b)), + )?; + fd.set_item("file_source", f.file_source)?; + fd.set_item("value_stats_cols", f.value_stats_cols.clone())?; + fd.set_item("external_path", f.external_path.clone())?; + fd.set_item("first_row_id", f.first_row_id)?; + fd.set_item("write_cols", f.write_cols.clone())?; + files.append(fd)?; + + if has_deletions { + match s.deletion_file_for_data_file_index(i) { + Some(df) => { + let dd = PyDict::new(py); + dd.set_item("path", df.path())?; + dd.set_item("offset", df.offset())?; + dd.set_item("length", df.length())?; + dd.set_item("cardinality", df.cardinality())?; + deletions.append(dd)?; + } + None => deletions.append(py.None())?, + } + } + } + d.set_item("data_files", files)?; + if has_deletions { + d.set_item("data_deletion_files", deletions)?; + } else { + d.set_item("data_deletion_files", py.None())?; + } + Ok(d) + } + /// Reduce to `Split(bytes)` for pickle/copy. The bytes are an opaque, /// implementation-detail encoding; only same/compatible-version round-trip /// is guaranteed. diff --git a/bindings/python/tests/test_read.py b/bindings/python/tests/test_read.py index c37f564b..93ee37a7 100644 --- a/bindings/python/tests/test_read.py +++ b/bindings/python/tests/test_read.py @@ -657,3 +657,72 @@ def test_time_travel_conflicting_selectors_raises(): # both offending keys are named assert "scan.snapshot-id" in str(exc.value) assert "scan.tag-name" in str(exc.value) + + +def test_split_to_dict_exposes_fields(): + with tempfile.TemporaryDirectory() as warehouse: + table = _make_table_with_data(warehouse) + splits = table.new_read_builder().new_scan().plan().splits() + assert splits + d = splits[0].to_dict() + assert set(d) >= {"bucket", "bucket_path", "total_buckets", "partition", + "raw_convertible", "snapshot_id", "data_files", + "data_deletion_files", "row_ranges"} + assert isinstance(d["bucket"], int) + assert isinstance(d["partition"], (bytes, bytearray)) + assert d["data_deletion_files"] is None # no deletion vectors on an append table + assert d["row_ranges"] is None # plain split reads whole files + files = d["data_files"] + assert files + f = files[0] + assert set(f) >= {"file_name", "file_path", "file_size", "row_count", + "schema_id", "level", "first_row_id", "write_cols"} + assert f["file_path"].startswith(d["bucket_path"]) + assert f["file_path"].endswith(f["file_name"]) + assert sum(x["row_count"] for x in files) == 3 + + +def test_split_to_dict_partition_and_reads(): + with tempfile.TemporaryDirectory() as warehouse: + table = _make_partitioned_table(warehouse) + b = table.new_read_builder().with_filter( + {"method": "equal", "field": "dt", "literals": ["p1"]}) + splits = b.new_scan().plan().splits() + assert splits + d = splits[0].to_dict() + # byte-identical to a manifest _PARTITION: 4-byte big-endian arity + body + assert d["partition"][:4] == b"\x00\x00\x00\x01" # one partition column (dt) + assert b"p1" in d["partition"] # the partition value is encoded + # partition filter pruned to p1 + t = pa.Table.from_batches(b.new_read().read(splits)) + assert sorted(t.column("id").to_pylist()) == [1, 2] + + +def test_split_to_dict_deletions_row_ranges_external_path(): + # plan() can't produce these; inject them via the serde payload. + import json + + with tempfile.TemporaryDirectory() as warehouse: + table = _make_string_table(warehouse) # two files + split = table.new_read_builder().new_scan().plan().splits()[0] + cls, (raw,) = split.__reduce__() + payload = json.loads(bytes(raw)) + n = len(payload["data_files"]) + assert n >= 2 + deletion = {"path": "dv/idx-0", "offset": 4, "length": 20, "cardinality": 3} + payload["data_deletion_files"] = [deletion] + [None] * (n - 1) + payload["row_ranges"] = [{"from": 0, "to": 5}, {"from": 8, "to": 9}] + payload["data_files"][0]["_EXTERNAL_PATH"] = "s3://ext/data-0.parquet" + payload["data_files"][0]["_EMBEDDED_FILE_INDEX"] = [1, 2, 3] + + d = cls(json.dumps(payload).encode()).to_dict() + + ddf = d["data_deletion_files"] # index-aligned with data_files + assert isinstance(ddf, list) and len(ddf) == n + assert ddf[0] == deletion + assert all(x is None for x in ddf[1:]) + assert d["row_ranges"] == [{"from": 0, "to": 5}, {"from": 8, "to": 9}] + # external path wins + assert d["data_files"][0]["file_path"] == "s3://ext/data-0.parquet" + assert d["data_files"][0]["external_path"] == "s3://ext/data-0.parquet" + assert d["data_files"][0]["embedded_index"] == b"\x01\x02\x03"