Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 0 additions & 35 deletions crates/paimon/src/arrow/filtering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ use crate::arrow::schema_evolution::create_index_mapping;
pub(crate) use crate::predicate_stats::{predicates_may_match_with_schema, StatsAccessor};
use crate::spec::{DataField, Predicate, PredicateOperator};

pub(crate) fn reader_pruning_predicates(data_predicates: Vec<Predicate>) -> Vec<Predicate> {
data_predicates
.into_iter()
.filter(predicate_supported_for_reader_pruning)
.collect()
}

/// Remap predicates from table-level indices to file-level indices.
/// Predicates referencing fields not present in the file are resolved based on
/// NULL semantics: the missing column is treated as all-NULL, so `IS NULL`
Expand Down Expand Up @@ -127,34 +120,6 @@ pub(crate) fn build_field_mapping(
)
}

fn predicate_supported_for_reader_pruning(predicate: &Predicate) -> bool {
match predicate {
Predicate::AlwaysFalse => true,
Predicate::Leaf { op, .. } => {
matches!(
op,
PredicateOperator::IsNull
| PredicateOperator::IsNotNull
| PredicateOperator::Eq
| PredicateOperator::NotEq
| PredicateOperator::Lt
| PredicateOperator::LtEq
| PredicateOperator::Gt
| PredicateOperator::GtEq
| PredicateOperator::In
| PredicateOperator::NotIn
| PredicateOperator::StartsWith
| PredicateOperator::EndsWith
| PredicateOperator::Contains
| PredicateOperator::Like
| PredicateOperator::Between
| PredicateOperator::NotBetween
)
}
Predicate::AlwaysTrue | Predicate::And(_) | Predicate::Or(_) | Predicate::Not(_) => false,
}
}

fn identity_field_mapping(num_fields: usize) -> Vec<Option<usize>> {
(0..num_fields).map(Some).collect()
}
Expand Down
253 changes: 249 additions & 4 deletions crates/paimon/src/arrow/format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,26 @@ impl FormatFileReader for AvroFormatReader {
reader: Box<dyn FileRead>,
file_size: u64,
read_fields: &[DataField],
_predicates: Option<&FilePredicates>,
predicates: Option<&FilePredicates>,
batch_size: Option<usize>,
row_selection: Option<Vec<RowRange>>,
) -> crate::Result<ArrowRecordBatchStream> {
// NOTE: Avro OCF requires sequential reading, so we load the entire file into memory.
// This is fine for typical Paimon data files but may be problematic for very large files.
let file_bytes = reader.read(0..file_size).await?;

let read_fields = read_fields.to_vec();
let target_schema = build_target_arrow_schema(&read_fields)?;
// Widen the decoded schema to include any predicate columns that are not
// in the projection, so residual filtering can see them. DataFileReader
// projects the returned batch to `read_fields` by name, dropping extras.
let scan_fields = crate::arrow::residual::widen_scan_fields(read_fields, predicates);
let target_schema = build_target_arrow_schema(&scan_fields)?;
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
// Own the predicates so the returned 'static stream does not borrow the
// caller's `&FilePredicates` (FilePredicates is not `Clone`; rebuild it).
let predicates = predicates.map(|fp| FilePredicates {
predicates: fp.predicates.clone(),
file_fields: fp.file_fields.clone(),
});

// Collect Avro records directly as apache_avro::Value, avoiding intermediate conversion.
let all_records: Vec<Value> = Reader::new(&file_bytes[..])
Expand Down Expand Up @@ -87,7 +96,15 @@ impl FormatFileReader for AvroFormatReader {

Ok(try_stream! {
for chunk in records.chunks(batch_size) {
let batch = records_to_batch(chunk, &read_fields, &target_schema)?;
let batch = records_to_batch(chunk, &scan_fields, &target_schema)?;
let batch = match predicates.as_ref() {
Some(fp) => crate::arrow::residual::filter_record_batch_by_predicates(
batch,
fp,
&scan_fields,
)?,
None => batch,
};
yield batch;
}
}
Expand Down Expand Up @@ -943,4 +960,232 @@ mod tests {
let batch = records_to_batch(&records, &fields, &schema).unwrap();
assert_eq!(batch.num_rows(), 0);
}

// -----------------------------------------------------------------------
// Exact residual predicate filtering on read
// -----------------------------------------------------------------------

use crate::spec::{Datum, Predicate, PredicateOperator};

fn leaf(
index: usize,
data_type: DataType,
op: PredicateOperator,
literals: Vec<Datum>,
) -> Predicate {
Predicate::Leaf {
column: format!("c{index}"),
index,
data_type,
op,
literals,
}
}

/// Write the given records into an in-memory Avro OCF file, returning the
/// encoded bytes. The Avro schema mirrors `age: long, name: string`.
fn write_avro_age_name(rows: &[(i64, &str)]) -> Vec<u8> {
use apache_avro::{Codec, Schema, Writer};

let schema = Schema::parse_str(
r#"{"type": "record", "name": "row", "fields": [
{"name": "age", "type": "long"},
{"name": "name", "type": "string"}
]}"#,
)
.unwrap();
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);
for (age, name) in rows {
let mut record = apache_avro::types::Record::new(&schema).unwrap();
record.put("age", *age);
record.put("name", *name);
writer.append(record).unwrap();
}
writer.into_inner().unwrap()
}

#[tokio::test]
async fn avro_reader_applies_exact_residual_filter_int() {
use crate::btree::test_util::BytesFileRead;
use crate::spec::{BigIntType, VarCharType};
use futures::TryStreamExt;

let bytes = write_avro_age_name(&[(10, "a"), (20, "b"), (30, "c"), (40, "d"), (50, "e")]);

let age = DataField::new(0, "age".to_string(), DataType::BigInt(BigIntType::new()));
let name = DataField::new(
1,
"name".to_string(),
DataType::VarChar(VarCharType::new(50).unwrap()),
);
let read_fields = vec![age.clone(), name.clone()];
let file_fields = vec![age.clone(), name.clone()];

// age > 25 → [30, 40, 50].
let predicates = FilePredicates {
predicates: vec![leaf(
0,
DataType::BigInt(BigIntType::new()),
PredicateOperator::Gt,
vec![Datum::Long(25)],
)],
file_fields,
};

let batches = AvroFormatReader
.read_batch_stream(
Box::new(BytesFileRead(bytes.clone().into())),
bytes.len() as u64,
&read_fields,
Some(&predicates),
None,
None,
)
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();

let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 3);
let ages: Vec<i64> = batches
.iter()
.flat_map(|b| {
b.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.values()
.to_vec()
})
.collect();
assert_eq!(ages, vec![30, 40, 50]);
}

#[tokio::test]
async fn avro_reader_filters_on_non_projected_predicate_column() {
use crate::btree::test_util::BytesFileRead;
use crate::spec::{BigIntType, VarCharType};
use futures::TryStreamExt;

let bytes = write_avro_age_name(&[(10, "a"), (20, "b"), (30, "c"), (40, "d"), (50, "e")]);

let age = DataField::new(0, "age".to_string(), DataType::BigInt(BigIntType::new()));
let name = DataField::new(
1,
"name".to_string(),
DataType::VarChar(VarCharType::new(50).unwrap()),
);
// Project ONLY `name`; the predicate is on `age`, which is NOT projected.
let read_fields = vec![name.clone()];
let file_fields = vec![age.clone(), name.clone()];

// age > 25 → rows c, d, e (age is a BigInt/long, so literal is Datum::Long).
let predicates = FilePredicates {
predicates: vec![leaf(
0,
DataType::BigInt(BigIntType::new()),
PredicateOperator::Gt,
vec![Datum::Long(25)],
)],
file_fields,
};

let batches = AvroFormatReader
.read_batch_stream(
Box::new(BytesFileRead(bytes.clone().into())),
bytes.len() as u64,
&read_fields,
Some(&predicates),
None,
None,
)
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();

// Assert on the FILTERED rows/values, not an exact column set (the batch
// may contain the extra `age` column — DataFileReader projects it away).
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 3);
let names: Vec<String> = batches
.iter()
.flat_map(|b| {
let col = b
.column_by_name("name")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
(0..col.len())
.map(|i| col.value(i).to_string())
.collect::<Vec<_>>()
})
.collect();
assert_eq!(names, vec!["c", "d", "e"]);
}

#[tokio::test]
async fn avro_reader_applies_exact_residual_filter_like() {
use crate::btree::test_util::BytesFileRead;
use crate::spec::{BigIntType, VarCharType};
use futures::TryStreamExt;

let bytes = write_avro_age_name(&[
(10, "apple"),
(20, "banana"),
(30, "apricot"),
(40, "cherry"),
]);

let age = DataField::new(0, "age".to_string(), DataType::BigInt(BigIntType::new()));
let name = DataField::new(
1,
"name".to_string(),
DataType::VarChar(VarCharType::new(50).unwrap()),
);
let read_fields = vec![age.clone(), name.clone()];

// name LIKE 'a%' → ["apple", "apricot"].
let predicates = FilePredicates {
predicates: vec![leaf(
1,
DataType::VarChar(VarCharType::new(50).unwrap()),
PredicateOperator::Like,
vec![Datum::String("a%".to_string())],
)],
file_fields: vec![age, name],
};

let batches = AvroFormatReader
.read_batch_stream(
Box::new(BytesFileRead(bytes.clone().into())),
bytes.len() as u64,
&read_fields,
Some(&predicates),
None,
None,
)
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();

let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 2);
let names: Vec<String> = batches
.iter()
.flat_map(|b| {
let col = b.column(1).as_any().downcast_ref::<StringArray>().unwrap();
(0..col.len())
.map(|i| col.value(i).to_string())
.collect::<Vec<_>>()
})
.collect();
assert_eq!(names, vec!["apple", "apricot"]);
}
}
15 changes: 13 additions & 2 deletions crates/paimon/src/arrow/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,19 @@ pub(crate) struct FilePredicates {
/// - Row range selection
#[async_trait]
pub(crate) trait FormatFileReader: Send + Sync {
/// Read a single data file, returning a stream of RecordBatches
/// containing only the projected columns (using names from the file's schema).
/// Read a single data file, returning a stream of RecordBatches containing
/// at least the projected columns (using names from the file's schema). A
/// reader MAY include extra columns it needed to scan (e.g. predicate columns
/// for residual filtering); the caller (`DataFileReader`) projects to the
/// requested output by name, so extra columns are harmless.
///
/// Predicate exactness is per-format, NOT a blanket guarantee:
/// - Parquet, ORC, Avro, Row, and Vortex apply the predicate **exactly** —
/// each emitted batch contains only rows matching the pushed-down predicate
/// (native pushdown for pruning + a row-level residual pass for the rest).
/// - Blob does not evaluate predicates at all; Mosaic applies only
/// stats-level (row-group) pruning. For those, non-matching rows may
/// survive and the caller must not assume exactness.
///
/// `row_selection` is a pre-merged list of 0-based inclusive row ranges
/// (DV + row_ranges already combined by the caller).
Expand Down
Loading
Loading