Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 86 additions & 1 deletion bindings/python/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use paimon::table::{DataSplit, Table};
use paimon_datafusion::runtime::runtime;
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyDict};
use pyo3::types::{PyBytes, PyDict, PyList};

use crate::error::to_py_err;
use crate::predicate::dict_to_predicate;
Expand Down Expand Up @@ -309,6 +309,91 @@ impl PySplit {
self.inner.row_count()
}

/// Expose this planned split as a plain dict so a non-Rust reader (e.g.
/// pypaimon) can rebuild a split and read the files without re-planning.
/// `partition` is a serialized `BinaryRow` (as in a manifest `_PARTITION`);
/// `row_ranges` `{from, to}` is inclusive; a deletion `path` is fully
/// resolved; planning-only stats are omitted.
fn to_dict<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
let s = &self.inner;
let d = PyDict::new(py);
d.set_item("snapshot_id", s.snapshot_id())?;
d.set_item("bucket", s.bucket())?;
d.set_item("bucket_path", s.bucket_path())?;
d.set_item("total_buckets", s.total_buckets())?;
d.set_item("raw_convertible", s.raw_convertible())?;
let partition_bytes = s.partition().to_serialized_bytes();
d.set_item("partition", PyBytes::new(py, &partition_bytes))?;

// Row ranges (sub-file slicing) must not be dropped, else a consumer that
// rebuilds this split would over-read the whole file. None for plain splits.
match s.row_ranges() {
Some(ranges) => {
let out = PyList::empty(py);
for r in ranges {
let rd = PyDict::new(py);
rd.set_item("from", r.from())?;
rd.set_item("to", r.to())?;
out.append(rd)?;
}
d.set_item("row_ranges", out)?;
}
None => d.set_item("row_ranges", py.None())?,
}

let files = PyList::empty(py);
let deletions = PyList::empty(py);
let has_deletions = s.data_deletion_files().is_some();
for (i, f) in s.data_files().iter().enumerate() {
let fd = PyDict::new(py);
fd.set_item("file_name", f.file_name.as_str())?;
fd.set_item("file_path", s.data_file_path(f))?;
fd.set_item("file_size", f.file_size)?;
fd.set_item("row_count", f.row_count)?;
fd.set_item("min_sequence_number", f.min_sequence_number)?;
fd.set_item("max_sequence_number", f.max_sequence_number)?;
fd.set_item("schema_id", f.schema_id)?;
fd.set_item("level", f.level)?;
fd.set_item("extra_files", f.extra_files.clone())?;
fd.set_item(
"creation_time",
f.creation_time.map(|t| t.timestamp_millis()),
)?;
fd.set_item("delete_row_count", f.delete_row_count)?;
fd.set_item(
"embedded_index",
f.embedded_index.as_deref().map(|b| PyBytes::new(py, b)),
)?;
fd.set_item("file_source", f.file_source)?;
fd.set_item("value_stats_cols", f.value_stats_cols.clone())?;
fd.set_item("external_path", f.external_path.clone())?;
fd.set_item("first_row_id", f.first_row_id)?;
fd.set_item("write_cols", f.write_cols.clone())?;
files.append(fd)?;

if has_deletions {
match s.deletion_file_for_data_file_index(i) {
Some(df) => {
let dd = PyDict::new(py);
dd.set_item("path", df.path())?;
dd.set_item("offset", df.offset())?;
dd.set_item("length", df.length())?;
dd.set_item("cardinality", df.cardinality())?;
deletions.append(dd)?;
}
None => deletions.append(py.None())?,
}
}
}
d.set_item("data_files", files)?;
if has_deletions {
d.set_item("data_deletion_files", deletions)?;
} else {
d.set_item("data_deletion_files", py.None())?;
}
Ok(d)
}

/// Reduce to `Split(bytes)` for pickle/copy. The bytes are an opaque,
/// implementation-detail encoding; only same/compatible-version round-trip
/// is guaranteed.
Expand Down
69 changes: 69 additions & 0 deletions bindings/python/tests/test_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,3 +657,72 @@ def test_time_travel_conflicting_selectors_raises():
# both offending keys are named
assert "scan.snapshot-id" in str(exc.value)
assert "scan.tag-name" in str(exc.value)


def test_split_to_dict_exposes_fields():
with tempfile.TemporaryDirectory() as warehouse:
table = _make_table_with_data(warehouse)
splits = table.new_read_builder().new_scan().plan().splits()
assert splits
d = splits[0].to_dict()
assert set(d) >= {"bucket", "bucket_path", "total_buckets", "partition",
"raw_convertible", "snapshot_id", "data_files",
"data_deletion_files", "row_ranges"}
assert isinstance(d["bucket"], int)
assert isinstance(d["partition"], (bytes, bytearray))
assert d["data_deletion_files"] is None # no deletion vectors on an append table
assert d["row_ranges"] is None # plain split reads whole files
files = d["data_files"]
assert files
f = files[0]
assert set(f) >= {"file_name", "file_path", "file_size", "row_count",
"schema_id", "level", "first_row_id", "write_cols"}
assert f["file_path"].startswith(d["bucket_path"])
assert f["file_path"].endswith(f["file_name"])
assert sum(x["row_count"] for x in files) == 3


def test_split_to_dict_partition_and_reads():
with tempfile.TemporaryDirectory() as warehouse:
table = _make_partitioned_table(warehouse)
b = table.new_read_builder().with_filter(
{"method": "equal", "field": "dt", "literals": ["p1"]})
splits = b.new_scan().plan().splits()
assert splits
d = splits[0].to_dict()
# byte-identical to a manifest _PARTITION: 4-byte big-endian arity + body
assert d["partition"][:4] == b"\x00\x00\x00\x01" # one partition column (dt)
assert b"p1" in d["partition"] # the partition value is encoded
# partition filter pruned to p1
t = pa.Table.from_batches(b.new_read().read(splits))
assert sorted(t.column("id").to_pylist()) == [1, 2]


def test_split_to_dict_deletions_row_ranges_external_path():
# plan() can't produce these; inject them via the serde payload.
import json

with tempfile.TemporaryDirectory() as warehouse:
table = _make_string_table(warehouse) # two files
split = table.new_read_builder().new_scan().plan().splits()[0]
cls, (raw,) = split.__reduce__()
payload = json.loads(bytes(raw))
n = len(payload["data_files"])
assert n >= 2
deletion = {"path": "dv/idx-0", "offset": 4, "length": 20, "cardinality": 3}
payload["data_deletion_files"] = [deletion] + [None] * (n - 1)
payload["row_ranges"] = [{"from": 0, "to": 5}, {"from": 8, "to": 9}]
payload["data_files"][0]["_EXTERNAL_PATH"] = "s3://ext/data-0.parquet"
payload["data_files"][0]["_EMBEDDED_FILE_INDEX"] = [1, 2, 3]

d = cls(json.dumps(payload).encode()).to_dict()

ddf = d["data_deletion_files"] # index-aligned with data_files
assert isinstance(ddf, list) and len(ddf) == n
assert ddf[0] == deletion
assert all(x is None for x in ddf[1:])
assert d["row_ranges"] == [{"from": 0, "to": 5}, {"from": 8, "to": 9}]
# external path wins
assert d["data_files"][0]["file_path"] == "s3://ext/data-0.parquet"
assert d["data_files"][0]["external_path"] == "s3://ext/data-0.parquet"
assert d["data_files"][0]["embedded_index"] == b"\x01\x02\x03"