Skip to content

Apply exact residual filtering for ORC, Avro, and Row reads#448

Open
JunRuiLee wants to merge 4 commits into
apache:mainfrom
JunRuiLee:feat/exact-residual-filter
Open

Apply exact residual filtering for ORC, Avro, and Row reads#448
JunRuiLee wants to merge 4 commits into
apache:mainfrom
JunRuiLee:feat/exact-residual-filter

Conversation

@JunRuiLee

@JunRuiLee JunRuiLee commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

Purpose

Make predicate evaluation exact across all file formats, so a consumer that returns scan batches verbatim gets exactly the rows matching the pushed-down predicate. Motivated by pypaimon delegating its read kernel to the Rust core: it returns the scan's Arrow batches directly without re-filtering, which was only safe for Parquet.

Before this change:

  • ORC pruned by stripe stats only — non-matching rows in a selected stripe survived.
  • Avro / Row ignored the predicate entirely.

These caused silent over-reads (wrong data, no error). Now every format applies the predicate exactly. Exact LIMIT remains the caller's responsibility (matching with_limit's documented hint semantics).

Changes

  • Consolidate the two duplicate Arrow-batch predicate evaluators (Parquet's and Vortex's) into a shared arrow::residual module; the Vortex copy previously fell open on startsWith/like/between, now evaluated exactly everywhere.
  • Apply the residual filter in ORC (after stripe pruning), Avro, and Row.
  • Readers scan predicate columns (widen_scan_fields) so a predicate on a non-projected column is applied; DataFileReader projects to the requested output by name. Residual columns are resolved by name, not batch position (ORC emits file-schema order).
  • Parquet keeps its zero-overhead RowFilter fast path when all predicates are fully enforced, else applies the residual pass (covers Or/Not/unsupported leaves).
  • Reject _ROW_ID projection combined with a data predicate (Unsupported) — positional row-id assignment desyncs when rows are filtered.

Tests

cargo test -p paimon --lib (+--features vortex) all pass. Covers per-format exactness incl. non-projected predicate columns, Or/Not, misordered batch columns, and the _ROW_ID guard.

No public API or storage-format change.

@JunRuiLee JunRuiLee force-pushed the feat/exact-residual-filter branch 2 times, most recently from c314315 to 69a6a87 Compare July 3, 2026 19:34
Comment thread crates/paimon/src/arrow/format/parquet.rs
Comment thread crates/paimon/src/arrow/residual.rs
Comment thread crates/paimon/src/arrow/format/mod.rs Outdated
Ok(combined)
}

fn evaluate_column_predicate(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses Arrow's generic comparison kernels for Binary/VarBinary as well, but Paimon's Datum::Bytes ordering is Java signed-byte order (0xFF < 0x00). Arrow compares byte slices unsigned, so binary range predicates can return the wrong rows. Please add a Binary/VarBinary-specific comparison path that matches Paimon's comparator, with a residual test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Binary/VarBinary comparisons (ordering and Between) now go through Paimon's signed-byte comparator (java_bytes_cmp), matching Datum::Bytes (0xFF < 0x00) instead of Arrow's unsigned order. Between delegates to the shared comparison path so it can't diverge. Regression tests added for both ordering and Between.

Comment thread crates/paimon/src/arrow/residual.rs Outdated
}
_ => return Ok(None),
},
DataType::Decimal(decimal) => match literal {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rejects decimal literals whose scale differs from the file column scale, but Paimon Datum compares decimals by mathematical value across scales. A valid predicate like DECIMAL 1.0 against a DECIMAL(…, 2) column can now fail during residual filtering. Please rescale exactly when possible, or evaluate decimal predicates with the same semantics as Datum.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Decimal value predicates are now evaluated row-wise via datum_cmp, so cross-scale comparison is by mathematical value (e.g. DECIMAL 1.0 matches a DECIMAL(,2) column, and d > 1.05 on a DECIMAL(,1) column is exactly d >= 1.1) — no rescale requirement and no spurious error. Tests cover the equal-value cross-scale and finer-scale-literal cases.

read_type: &[DataField],
predicates: &[Predicate],
) -> crate::Result<()> {
let projects_row_id = read_type

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guard treats any predicate as row-dropping. Constant predicates such as AlwaysTrue cannot desync _ROW_ID but are rejected here. Consider normalizing away no-op predicates, or only rejecting predicates that can actually filter rows.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. The guard now only rejects predicates that can actually filter rows — a constant AlwaysTrue is allowed.

Broader note: the residual filter this PR adds makes format-reader reads exact, but the data-evolution read path still passes no predicate to the reader, so index/fallback row-ranges aren't residual-filtered there. That gap is orthogonal to this PR and to #449's fallback-coverage fix; I'll follow up on it separately.

JunRuiLee added 4 commits July 4, 2026 16:56
Consolidate the two near-duplicate Arrow-batch predicate evaluators (one private
to the Parquet reader, one in the Vortex reader) into a shared arrow::residual
module, and make it faithful to Paimon's Datum comparison semantics:

- Complete leaf dispatch incl. startsWith/endsWith/contains/like/between, so
  those operators are evaluated exactly wherever the residual runs (the Vortex
  copy previously deferred them).
- Resolve each predicate column by name against the batch schema (a reader may
  emit columns in file-schema order, e.g. ORC ProjectionMask::named_roots), with
  a debug_assert guarding a predicate column missing from the scanned batch.
- Binary/VarBinary comparisons (ordering and Between) use Java signed-byte order
  (java_bytes_cmp) to match Datum::Bytes (0xFF < 0x00), not Arrow's unsigned
  comparison. Between delegates to the shared comparison path so it cannot
  diverge.
- Decimal value comparisons are evaluated row-wise via datum_cmp, exact across
  scales (e.g. d > 1.05 on a DECIMAL(_,1) column == d >= 1.1).
- Casts a decoded column up to the predicate's declared type before comparison,
  so a promoted INT->BIGINT read with an out-of-range literal yields no rows
  rather than an error.
- An unconvertible literal errors rather than fail-open to all-rows.
- Parquet keeps its RowFilter fast path and adds a residual backstop when a
  predicate is not fully enforced (Or/Not/unsupported leaves); Vortex reuses the
  shared helpers. Narrows the FormatFileReader contract to per-format exactness.
ORC pruned by stripe stats only; Avro and Row ignored predicates entirely, so a
filtered read silently returned non-matching rows. Each reader now scans the
predicate columns and applies the shared residual filter; DataFileReader projects
to the requested output by name. Row keeps its full physical (positional) read.
…dual

ReadBuilder/TableRead pruned And/Or/Not before the reader, so the residual pass
never saw compound predicates on the public path and an Or/Not filter returned
all rows. Stop pruning: native pushdown and stats already skip compound nodes
they cannot use, and the residual pass enforces the full predicate exactly.
Removes the now-unused reader_pruning_predicates.
_ROW_ID is materialized positionally from each batch's row count, which desyncs
once the residual filter drops rows. Reject projecting _ROW_ID together with a
predicate that can actually filter rows (a constant AlwaysTrue is allowed). The
guard lives in read_single_file_stream, the true _ROW_ID generation site.
@JunRuiLee JunRuiLee force-pushed the feat/exact-residual-filter branch from e029fc8 to 509fce1 Compare July 4, 2026 09:06
self.table,
read_type,
reader_pruning_predicates(self.filter.data_predicates.clone()),
self.filter.data_predicates.clone(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still does not make the public read path exact for data_evolution_enabled tables. new_read() now preserves the full data predicate here, but TableRead::read_with_evolution() builds DataEvolutionReader without passing self.data_predicates, and DataEvolutionReader creates every DataFileReader with Vec::new(). TableScan also disables normal data-predicate pushdown for data-evolution tables (pushdown_data_predicates = &[]), so a filtered scan without a usable global index, or with only coarse row ranges, will still return non-matching rows from the data-evolution reader. This regresses the PR goal that scan batches are exactly filtered; the evolution reader needs to receive and apply the same residual predicates after column merge (or before when it is safe).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants