From 7850eae7689401a7511879cb51489a7f873346f2 Mon Sep 17 00:00:00 2001 From: Arnav-Panjla Date: Wed, 27 May 2026 00:35:26 +0100 Subject: [PATCH 1/2] [client] Implement BatchScanner with limit-based scan Implements a one-shot bounded BatchScanner backed by a single LimitScanRequest RPC (fixes #316): - adds TableScan::limit and create_batch_scanner - eager RPC with leader resolution (mirrors Lookuper) - Arrow IPC (log) and KV -> RecordBatch (PK) decoding - projection support Squashed from PR #515. --- .../fluss/src/client/table/batch_scanner.rs | 435 ++++++++++++++++++ crates/fluss/src/client/table/mod.rs | 2 + crates/fluss/src/client/table/scanner.rs | 51 ++ .../fluss/tests/integration/batch_scanner.rs | 162 +++++++ crates/fluss/tests/test_fluss.rs | 1 + 5 files changed, 651 insertions(+) create mode 100644 crates/fluss/src/client/table/batch_scanner.rs create mode 100644 crates/fluss/tests/integration/batch_scanner.rs diff --git a/crates/fluss/src/client/table/batch_scanner.rs b/crates/fluss/src/client/table/batch_scanner.rs new file mode 100644 index 00000000..f0319859 --- /dev/null +++ b/crates/fluss/src/client/table/batch_scanner.rs @@ -0,0 +1,435 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! One-shot bounded scanner backed by a single `LimitScanRequest` RPC. +//! +//! Unlike [`crate::client::table::LogScanner`], a `BatchScanner` does not +//! subscribe to bucket offsets or stream from the server. It performs a single +//! eager request for up to `limit` rows from one `TableBucket` and exposes the +//! result as a single Arrow [`RecordBatch`] on the first call to +//! [`BatchScanner::poll_batch`]; subsequent calls return `None`. + +use crate::client::metadata::Metadata; +use crate::error::{ApiError, Error, FlussError, Result}; +use crate::metadata::{TableBucket, TableInfo}; +use crate::proto::ErrorResponse; +use crate::record::kv::{KvRecordBatch, KvRecordReadContext, ReadContext as KvReadContext, SchemaGetter}; +use crate::record::{LogRecordsBatches, ReadContext as ArrowReadContext, ScanBatch, RowAppendRecordBatchBuilder, to_arrow_schema}; +use crate::rpc::RpcClient; +use crate::rpc::message::LimitScanRequest; +use arrow::array::RecordBatch; +use arrow_schema::SchemaRef; +use bytes::Bytes; +use std::sync::Arc; + +/// Adapter over a [`TableInfo`] that satisfies [`SchemaGetter`] for a single +/// table. KV lookups always carry the same schema id, so we just hand back +/// the embedded schema. +struct TableInfoSchemaGetter { + schema: Arc, +} + +impl SchemaGetter for TableInfoSchemaGetter { + fn get_schema(&self, _schema_id: i16) -> Result> { + Ok(Arc::clone(&self.schema)) + } +} + +/// One-shot bounded scanner. +/// +/// The scanner sends a single `LimitScanRequest` on construction and caches +/// the resulting Arrow `RecordBatch`. The first `poll_batch()` returns the +/// batch (wrapped in a [`ScanBatch`]); the second returns `None`. +pub struct BatchScanner { + bucket: TableBucket, + /// Pre-fetched batch, taken out on the first `poll_batch` call. + batch: Option, + /// Base log offset of the pre-fetched batch. For log tables, this is the + /// `base_log_offset` of the first underlying `LogRecordBatch`. For KV + /// tables (limit scan on a primary-key table) there is no log offset, so + /// this is `0`. + base_offset: i64, +} + +impl BatchScanner { + pub(super) async fn new( + rpc_client: Arc, + metadata: Arc, + table_info: TableInfo, + projected_fields: Option>, + bucket: TableBucket, + limit: i32, + ) -> Result { + // Resolve leader for the target bucket (mirrors Lookuper's pattern). + let leader = metadata + .leader_for(&table_info.table_path, &bucket) + .await? + .ok_or_else(|| { + Error::leader_not_available(format!( + "No leader found for table bucket: {bucket}" + )) + })?; + let connection = rpc_client.get_connection(&leader).await?; + + // Fire the single LimitScanRequest RPC. + let request = LimitScanRequest::new( + table_info.table_id, + bucket.partition_id(), + bucket.bucket_id(), + limit, + ); + let response = connection.request(request).await?; + + // Surface server-side errors using the same shape as Lookuper. + if let Some(error_code) = response.error_code + && error_code != FlussError::None.code() + { + let err: ApiError = ErrorResponse { + error_code, + error_message: response.error_message.clone(), + } + .into(); + return Err(Error::FlussAPIError { api_error: err }); + } + + let is_log_table = response.is_log_table.unwrap_or(false); + let raw = response.records.unwrap_or_default(); + + let (batch, base_offset) = if is_log_table { + decode_log_batch(&table_info, projected_fields.as_deref(), raw)? + } else { + (decode_kv_batch(&table_info, projected_fields.as_deref(), raw)?, 0) + }; + + Ok(Self { + bucket, + batch: Some(batch), + base_offset, + }) + } + + /// Returns the pre-fetched batch on the first call, then `None`. + pub async fn poll_batch(&mut self) -> Result> { + let base_offset = self.base_offset; + Ok(self + .batch + .take() + .map(|b| ScanBatch::new(self.bucket.clone(), b, base_offset))) + } + + /// The bucket scanned by this `BatchScanner`. + pub fn bucket(&self) -> &TableBucket { + &self.bucket + } +} + +/// Decode an Arrow-IPC encoded `LogRecordBatch` payload into a single Arrow +/// `RecordBatch`. Multiple inner batches (rare for a `LimitScanRequest`) are +/// concatenated. +fn decode_log_batch( + table_info: &TableInfo, + projected_fields: Option<&[usize]>, + raw: Vec, +) -> Result<(RecordBatch, i64)> { + let row_type = Arc::new(table_info.get_row_type().clone()); + let full_schema = to_arrow_schema(table_info.get_row_type())?; + let read_context = match projected_fields { + None => ArrowReadContext::new(full_schema.clone(), row_type.clone(), false), + Some(fields) => ArrowReadContext::with_projection_pushdown( + full_schema.clone(), + row_type.clone(), + fields.to_vec(), + false, + )?, + }; + + let target_schema: SchemaRef = match projected_fields { + None => full_schema, + Some(fields) => ArrowReadContext::project_schema( + to_arrow_schema(table_info.get_row_type())?, + fields, + )?, + }; + + if raw.is_empty() { + return Ok((RecordBatch::new_empty(target_schema), 0)); + } + + let mut batches: Vec = Vec::new(); + let mut base_offset: Option = None; + for log_batch in LogRecordsBatches::new(raw) { + let log_batch = log_batch?; + if base_offset.is_none() { + base_offset = Some(log_batch.base_log_offset()); + } + let rb = log_batch.record_batch(&read_context)?; + batches.push(rb); + } + + let base_offset = base_offset.unwrap_or(0); + if batches.is_empty() { + return Ok((RecordBatch::new_empty(target_schema), base_offset)); + } + if batches.len() == 1 { + return Ok((batches.into_iter().next().unwrap(), base_offset)); + } + let merged = arrow::compute::concat_batches(&target_schema, batches.iter()).map_err(|e| { + Error::UnexpectedError { + message: format!("Failed to concatenate log record batches: {e}"), + source: None, + } + })?; + Ok((merged, base_offset)) +} + +/// Decode a KV-format payload into a single Arrow `RecordBatch`. Each +/// `CompactedRow` is appended through [`RowAppendRecordBatchBuilder`]; deletion +/// records (no value) are skipped because primary key tables don't return +/// tombstones from a limit scan. +fn decode_kv_batch( + table_info: &TableInfo, + projected_fields: Option<&[usize]>, + raw: Vec, +) -> Result { + let row_type = table_info.get_row_type(); + let full_arrow_schema = to_arrow_schema(row_type)?; + + if raw.is_empty() { + let schema: SchemaRef = match projected_fields { + None => full_arrow_schema, + Some(fields) => ArrowReadContext::project_schema(full_arrow_schema, fields)?, + }; + return Ok(RecordBatch::new_empty(schema)); + } + + let kv_format = table_info.table_config.get_kv_format()?; + let schema_getter = Arc::new(TableInfoSchemaGetter { + schema: Arc::new(table_info.get_schema().clone()), + }); + let read_context = KvRecordReadContext::new(kv_format, schema_getter); + + // The KV records payload may be a single batch or a sequence of batches. + // The server-side `LimitScanResponse` returns one batch in practice, but + // we walk the buffer defensively. + let bytes = Bytes::from(raw); + let mut builder = RowAppendRecordBatchBuilder::new(row_type)?; + let mut position = 0usize; + + while position < bytes.len() { + let kv_batch = KvRecordBatch::new(bytes.clone(), position); + let size = kv_batch.size_in_bytes().map_err(|e| Error::UnexpectedError { + message: format!("Invalid KvRecordBatch length: {e}"), + source: None, + })?; + + let records = kv_batch.records_unchecked(&read_context as &dyn KvReadContext)?; + let decoder = records.decoder_arc(); + for record in records { + let record = record.map_err(|e| Error::UnexpectedError { + message: format!("Failed to read KV record: {e}"), + source: None, + })?; + if let Some(row) = record.row(&*decoder) { + builder.append(&row)?; + } + } + + position = position.checked_add(size).ok_or_else(|| Error::UnexpectedError { + message: "KvRecordBatch position overflow".to_string(), + source: None, + })?; + } + + let full_batch = Arc::unwrap_or_clone(builder.build_arrow_record_batch()?); + + match projected_fields { + None => Ok(full_batch), + Some(fields) => { + let projected_schema = + ArrowReadContext::project_schema(full_arrow_schema, fields)?; + let columns: Vec<_> = fields + .iter() + .map(|&idx| full_batch.column(idx).clone()) + .collect(); + Ok(RecordBatch::try_new(projected_schema, columns)?) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::WriteRecord; + use crate::compression::{ + ArrowCompressionInfo, ArrowCompressionRatioEstimator, ArrowCompressionType, + DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }; + use crate::metadata::{ + DataField, DataTypes, PhysicalTablePath, Schema, TableDescriptor, TableInfo, + TablePath, + }; + use crate::record::MemoryLogRecordsArrowBuilder; + use crate::row::GenericRow; + + fn build_two_col_table_info() -> TableInfo { + let row_type = DataTypes::row(vec![ + DataField::new("id", DataTypes::int(), None), + DataField::new("name", DataTypes::string(), None), + ]); + let schema = Schema::builder() + .with_row_type(&row_type) + .build() + .expect("schema build"); + let descriptor = TableDescriptor::builder() + .schema(schema) + .distributed_by(Some(1), vec![]) + .build() + .expect("descriptor build"); + TableInfo::of( + TablePath::new("db".to_string(), "tbl".to_string()), + 42, + 1, + descriptor, + 0, + 0, + ) + } + + fn build_log_records(table_info: &TableInfo, base_offset: i64, rows: &[(i32, &str)]) -> Vec { + let row_type = table_info.get_row_type(); + let table_path = table_info.table_path.clone(); + let table_info_arc = Arc::new(table_info.clone()); + let physical = Arc::new(PhysicalTablePath::of(Arc::new(table_path))); + let mut builder = MemoryLogRecordsArrowBuilder::new( + 1, + row_type, + false, + ArrowCompressionInfo { + compression_type: ArrowCompressionType::None, + compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }, + usize::MAX, + Arc::new(ArrowCompressionRatioEstimator::default()), + ) + .expect("builder"); + + for (i, (id, name)) in rows.iter().enumerate() { + let mut row = GenericRow::new(2); + row.set_field(0, *id); + row.set_field(1, *name); + let record = WriteRecord::for_append( + Arc::clone(&table_info_arc), + physical.clone(), + (i + 1) as i32, + &row, + ); + builder.append(&record).expect("append"); + } + let mut data = builder.build().expect("build log batch"); + // Builder always writes base_log_offset=0; patch it so tests can verify + // BatchScanner faithfully propagates whatever offset the server returned. + let bytes = base_offset.to_le_bytes(); + data[..bytes.len()].copy_from_slice(&bytes); + data + } + + #[test] + fn decode_log_batch_empty_returns_empty_record_batch() { + let table_info = build_two_col_table_info(); + let (batch, base_offset) = + decode_log_batch(&table_info, None, Vec::new()).expect("decode empty"); + assert_eq!(batch.num_rows(), 0); + assert_eq!(batch.num_columns(), 2); + assert_eq!(base_offset, 0); + } + + #[test] + fn decode_log_batch_empty_with_projection() { + let table_info = build_two_col_table_info(); + let (batch, base_offset) = + decode_log_batch(&table_info, Some(&[1usize]), Vec::new()).expect("decode empty"); + assert_eq!(batch.num_rows(), 0); + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.schema().field(0).name(), "name"); + assert_eq!(base_offset, 0); + } + + #[test] + fn decode_log_batch_extracts_base_offset_and_rows() { + let table_info = build_two_col_table_info(); + let raw = build_log_records(&table_info, 17, &[(1, "alice"), (2, "bob"), (3, "carol")]); + + let (batch, base_offset) = + decode_log_batch(&table_info, None, raw).expect("decode populated"); + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 2); + assert_eq!(base_offset, 17); + } + + #[test] + fn decode_log_batch_projection_keeps_requested_columns() { + let table_info = build_two_col_table_info(); + let raw = build_log_records(&table_info, 0, &[(7, "x"), (8, "y")]); + + let (batch, _) = + decode_log_batch(&table_info, Some(&[0usize]), raw).expect("decode projected"); + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.schema().field(0).name(), "id"); + } + + #[test] + fn decode_kv_batch_empty_returns_empty_record_batch() { + let table_info = build_two_col_table_info(); + let batch = decode_kv_batch(&table_info, None, Vec::new()).expect("decode empty kv"); + assert_eq!(batch.num_rows(), 0); + assert_eq!(batch.num_columns(), 2); + } + + #[test] + fn decode_kv_batch_empty_with_projection() { + let table_info = build_two_col_table_info(); + let batch = decode_kv_batch(&table_info, Some(&[0usize]), Vec::new()) + .expect("decode projected empty kv"); + assert_eq!(batch.num_rows(), 0); + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.schema().field(0).name(), "id"); + } + + #[tokio::test] + async fn poll_batch_returns_batch_then_none() { + let table_info = build_two_col_table_info(); + let raw = build_log_records(&table_info, 5, &[(1, "alice"), (2, "bob")]); + let (batch, base_offset) = decode_log_batch(&table_info, None, raw).expect("decode"); + + let bucket = TableBucket::new(table_info.table_id, 0); + let mut scanner = BatchScanner { + bucket: bucket.clone(), + batch: Some(batch), + base_offset, + }; + + let first = scanner.poll_batch().await.expect("poll").expect("some"); + assert_eq!(first.bucket(), &bucket); + assert_eq!(first.num_records(), 2); + assert_eq!(first.base_offset(), 5); + assert_eq!(first.last_offset(), 6); + + let second = scanner.poll_batch().await.expect("poll"); + assert!(second.is_none()); + } +} diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index e116bbb4..5ef5d179 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -25,6 +25,7 @@ use std::sync::Arc; pub const EARLIEST_OFFSET: i64 = -2; mod append; +mod batch_scanner; mod lookup; mod log_fetch_buffer; @@ -35,6 +36,7 @@ mod scanner; mod upsert; pub use append::{AppendWriter, TableAppend}; +pub use batch_scanner::BatchScanner; pub use lookup::{LookupResult, Lookuper, PrefixKeyLookuper, TableLookup, TablePrefixLookup}; pub use reader::{RecordBatchLogReader, SyncRecordBatchLogReader}; pub use remote_log::{ diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index d7e2dd2c..031975b3 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -16,6 +16,7 @@ // under the License. use crate::client::connection::FlussConnection; +use crate::client::table::batch_scanner::BatchScanner; use crate::client::credentials::SecurityTokenManager; use crate::client::metadata::Metadata; use crate::client::table::log_fetch_buffer::{ @@ -55,6 +56,8 @@ pub struct TableScan<'a> { metadata: Arc, /// Column indices to project. None means all columns, Some(vec) means only the specified columns (non-empty). projected_fields: Option>, + /// Optional row limit. When set, callers may construct a [`BatchScanner`] for a one-shot bounded scan. + limit: Option, } impl<'a> TableScan<'a> { @@ -64,9 +67,57 @@ impl<'a> TableScan<'a> { table_info, metadata, projected_fields: None, + limit: None, } } + /// Sets a row limit for the scan, enabling [`Self::create_batch_scanner`]. + /// + /// The limit must be positive. Callers configure a limit prior to + /// constructing a `BatchScanner` for a one-shot bounded read. + pub fn limit(mut self, n: i32) -> Result { + if n <= 0 { + return Err(Error::IllegalArgument { + message: format!("Scan limit must be positive, got {n}"), + }); + } + self.limit = Some(n); + Ok(self) + } + + /// Creates a `BatchScanner` that performs a single bounded scan of `table_bucket`. + /// + /// Requires a previously-configured limit via [`Self::limit`]. The scanner sends + /// a `LimitScanRequest` eagerly and exposes the resulting batch through + /// [`BatchScanner::poll_batch`]. + pub async fn create_batch_scanner( + self, + table_bucket: TableBucket, + ) -> Result { + let limit = self.limit.ok_or_else(|| Error::IllegalArgument { + message: "create_batch_scanner requires a limit configured via .limit(n)" + .to_string(), + })?; + if table_bucket.table_id() != self.table_info.table_id { + return Err(Error::IllegalArgument { + message: format!( + "Bucket table_id {} does not match scan table_id {}", + table_bucket.table_id(), + self.table_info.table_id + ), + }); + } + BatchScanner::new( + self.conn.get_connections(), + self.metadata.clone(), + self.table_info, + self.projected_fields, + table_bucket, + limit, + ) + .await + } + /// Projects the scan to only include specified columns by their indices. /// /// # Arguments diff --git a/crates/fluss/tests/integration/batch_scanner.rs b/crates/fluss/tests/integration/batch_scanner.rs new file mode 100644 index 00000000..26235281 --- /dev/null +++ b/crates/fluss/tests/integration/batch_scanner.rs @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#[cfg(test)] +mod batch_scanner_test { + use crate::integration::utils::{create_table, get_shared_cluster}; + use arrow::array::record_batch; + use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath}; + use std::time::Duration; + + /// End-to-end check that BatchScanner returns the appended rows on first + /// poll and `None` on the next, honoring the configured limit. + #[tokio::test] + async fn batch_scanner_returns_appended_rows_then_none() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_batch_scanner_log"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .column("c2", DataTypes::string()) + .build() + .expect("schema"), + ) + // Single bucket so a single BatchScanner sees every row. + .distributed_by(Some(1), vec!["c1".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let writer = table + .new_append() + .expect("append") + .create_writer() + .expect("writer"); + + let batch = record_batch!( + ("c1", Int32, [1, 2, 3, 4, 5]), + ("c2", Utf8, ["a", "b", "c", "d", "e"]) + ) + .unwrap(); + writer.append_arrow_batch(batch).expect("append batch"); + writer.flush().await.expect("flush"); + + // Give the server a moment to commit and make the records readable. + tokio::time::sleep(Duration::from_secs(1)).await; + + let table_info = table.get_table_info(); + let bucket = TableBucket::new(table_info.table_id, 0); + + let mut batch_scanner = table + .new_scan() + .limit(3) + .expect("limit") + .create_batch_scanner(bucket.clone()) + .await + .expect("create batch scanner"); + + let first = batch_scanner + .poll_batch() + .await + .expect("poll") + .expect("first batch should be Some"); + + assert_eq!(first.bucket(), &bucket); + // The server may return fewer rows than the limit on the first call, + // but must never exceed it. + assert!( + first.num_records() > 0 && first.num_records() <= 3, + "expected 1..=3 records, got {}", + first.num_records() + ); + + let second = batch_scanner + .poll_batch() + .await + .expect("second poll succeeds"); + assert!(second.is_none(), "second poll must return None"); + } + + /// A bucket id outside the table's bucket range should be rejected by the + /// scanner before any RPC is made. + #[tokio::test] + async fn batch_scanner_requires_matching_table_id() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_batch_scanner_table_id"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["c1".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + + // Bucket with a wrong table_id — must fail without hitting the server. + let bogus_bucket = TableBucket::new(table.get_table_info().table_id + 9999, 0); + + let result = table + .new_scan() + .limit(1) + .expect("limit") + .create_batch_scanner(bogus_bucket) + .await; + assert!( + result.is_err(), + "batch scanner must reject mismatched table_id" + ); + } + + /// `.limit(n)` must reject non-positive values before any scanner is built. + #[tokio::test] + async fn batch_scanner_rejects_non_positive_limit() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_batch_scanner_bad_limit"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["c1".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + assert!(table.new_scan().limit(0).is_err()); + assert!(table.new_scan().limit(-5).is_err()); + } +} diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs index 792b68f0..2d2bd152 100644 --- a/crates/fluss/tests/test_fluss.rs +++ b/crates/fluss/tests/test_fluss.rs @@ -21,6 +21,7 @@ extern crate fluss; #[cfg(feature = "integration_tests")] mod integration { mod admin; + mod batch_scanner; mod fluss_cluster; mod kv_table; mod log_table; From e101ee24a3efaf4bd3e9485ca1e5185c74faa3fa Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Wed, 27 May 2026 01:31:55 +0100 Subject: [PATCH 2/2] [client] Fix KV limit-scan decode + enforce limit, support schema evolution --- .../fluss/src/client/table/batch_scanner.rs | 710 +++++++++++++----- crates/fluss/src/client/table/mod.rs | 2 +- crates/fluss/src/client/table/scanner.rs | 74 +- crates/fluss/src/record/kv/mod.rs | 2 + .../fluss/src/record/kv/value_record_batch.rs | 188 +++++ .../fluss/tests/integration/batch_scanner.rs | 236 +++++- crates/fluss/tests/integration/log_table.rs | 2 - .../integration/record_batch_log_reader.rs | 7 +- website/docs/user-guide/rust/api-reference.md | 15 + .../user-guide/rust/example/log-tables.md | 17 + .../rust/example/primary-key-tables.md | 15 + 11 files changed, 1025 insertions(+), 243 deletions(-) create mode 100644 crates/fluss/src/record/kv/value_record_batch.rs diff --git a/crates/fluss/src/client/table/batch_scanner.rs b/crates/fluss/src/client/table/batch_scanner.rs index f0319859..cc0585f3 100644 --- a/crates/fluss/src/client/table/batch_scanner.rs +++ b/crates/fluss/src/client/table/batch_scanner.rs @@ -15,135 +15,165 @@ // specific language governing permissions and limitations // under the License. -//! One-shot bounded scanner backed by a single `LimitScanRequest` RPC. +//! Bounded batch scanner backed by a single `LimitScanRequest`, polled with +//! `next_batch` until it returns `None` (like `RecordBatchLogReader`). //! -//! Unlike [`crate::client::table::LogScanner`], a `BatchScanner` does not -//! subscribe to bucket offsets or stream from the server. It performs a single -//! eager request for up to `limit` rows from one `TableBucket` and exposes the -//! result as a single Arrow [`RecordBatch`] on the first call to -//! [`BatchScanner::poll_batch`]; subsequent calls return `None`. +//! The KV branch decodes a [`ValueRecordBatch`], decoding each record against +//! its own schema id via [`FixedSchemaDecoder`] so older records are projected +//! onto the current schema (the same path as lookup). +use crate::client::ClientSchemaGetter; use crate::client::metadata::Metadata; use crate::error::{ApiError, Error, FlussError, Result}; -use crate::metadata::{TableBucket, TableInfo}; +use crate::metadata::{KvFormat, RowType, Schema, TableBucket, TableInfo}; use crate::proto::ErrorResponse; -use crate::record::kv::{KvRecordBatch, KvRecordReadContext, ReadContext as KvReadContext, SchemaGetter}; -use crate::record::{LogRecordsBatches, ReadContext as ArrowReadContext, ScanBatch, RowAppendRecordBatchBuilder, to_arrow_schema}; +use crate::record::kv::{SCHEMA_ID_LENGTH, ValueRecordBatch}; +use crate::record::{ + LogRecordsBatches, ReadContext as ArrowReadContext, RowAppendRecordBatchBuilder, ScanBatch, + to_arrow_schema, +}; +use crate::row::FixedSchemaDecoder; use crate::rpc::RpcClient; use crate::rpc::message::LimitScanRequest; use arrow::array::RecordBatch; +use arrow::compute::concat_batches; use arrow_schema::SchemaRef; +use byteorder::{ByteOrder, LittleEndian}; use bytes::Bytes; +use std::collections::HashMap; +use std::ops::Range; use std::sync::Arc; -/// Adapter over a [`TableInfo`] that satisfies [`SchemaGetter`] for a single -/// table. KV lookups always carry the same schema id, so we just hand back -/// the embedded schema. -struct TableInfoSchemaGetter { - schema: Arc, +/// One-shot bounded scanner: a single `LimitScanRequest` yielded as one +/// [`ScanBatch`]. Creation is cheap; the request runs on the first +/// [`next_batch`](Self::next_batch), which returns the batch once, then `None`. +pub struct LimitBatchScanner { + bucket: TableBucket, + /// Taken on the first `next_batch` to run the scan; `None` afterward. + pending: Option, } -impl SchemaGetter for TableInfoSchemaGetter { - fn get_schema(&self, _schema_id: i16) -> Result> { - Ok(Arc::clone(&self.schema)) - } +/// Request inputs captured at creation, consumed by the first `next_batch`. +struct PendingScan { + rpc_client: Arc, + metadata: Arc, + table_info: TableInfo, + schema_getter: Arc, + projected_fields: Option>, + limit: i32, } -/// One-shot bounded scanner. -/// -/// The scanner sends a single `LimitScanRequest` on construction and caches -/// the resulting Arrow `RecordBatch`. The first `poll_batch()` returns the -/// batch (wrapped in a [`ScanBatch`]); the second returns `None`. -pub struct BatchScanner { - bucket: TableBucket, - /// Pre-fetched batch, taken out on the first `poll_batch` call. - batch: Option, - /// Base log offset of the pre-fetched batch. For log tables, this is the - /// `base_log_offset` of the first underlying `LogRecordBatch`. For KV - /// tables (limit scan on a primary-key table) there is no log offset, so - /// this is `0`. - base_offset: i64, -} - -impl BatchScanner { - pub(super) async fn new( +impl LimitBatchScanner { + pub(super) fn new( rpc_client: Arc, metadata: Arc, table_info: TableInfo, + schema_getter: Arc, projected_fields: Option>, bucket: TableBucket, limit: i32, - ) -> Result { - // Resolve leader for the target bucket (mirrors Lookuper's pattern). - let leader = metadata - .leader_for(&table_info.table_path, &bucket) - .await? - .ok_or_else(|| { - Error::leader_not_available(format!( - "No leader found for table bucket: {bucket}" - )) - })?; - let connection = rpc_client.get_connection(&leader).await?; - - // Fire the single LimitScanRequest RPC. - let request = LimitScanRequest::new( - table_info.table_id, - bucket.partition_id(), - bucket.bucket_id(), - limit, - ); - let response = connection.request(request).await?; - - // Surface server-side errors using the same shape as Lookuper. - if let Some(error_code) = response.error_code - && error_code != FlussError::None.code() - { - let err: ApiError = ErrorResponse { - error_code, - error_message: response.error_message.clone(), - } - .into(); - return Err(Error::FlussAPIError { api_error: err }); + ) -> Self { + Self { + bucket, + pending: Some(PendingScan { + rpc_client, + metadata, + table_info, + schema_getter, + projected_fields, + limit, + }), } + } - let is_log_table = response.is_log_table.unwrap_or(false); - let raw = response.records.unwrap_or_default(); - - let (batch, base_offset) = if is_log_table { - decode_log_batch(&table_info, projected_fields.as_deref(), raw)? - } else { - (decode_kv_batch(&table_info, projected_fields.as_deref(), raw)?, 0) + /// Runs the scan on the first call and returns its batch, then `None`. Not + /// retried — an error leaves the scanner spent; create a new one to retry. + pub async fn next_batch(&mut self) -> Result> { + let Some(pending) = self.pending.take() else { + return Ok(None); }; - - Ok(Self { - bucket, - batch: Some(batch), - base_offset, - }) + run_limit_scan(&pending, &self.bucket).await.map(Some) } - /// Returns the pre-fetched batch on the first call, then `None`. - pub async fn poll_batch(&mut self) -> Result> { - let base_offset = self.base_offset; - Ok(self - .batch - .take() - .map(|b| ScanBatch::new(self.bucket.clone(), b, base_offset))) + /// Drains the scanner into all of its batches. + pub async fn collect_all_batches(&mut self) -> Result> { + let mut batches = Vec::new(); + while let Some(batch) = self.next_batch().await? { + batches.push(batch); + } + Ok(batches) } - /// The bucket scanned by this `BatchScanner`. + /// The bucket scanned by this `LimitBatchScanner`. pub fn bucket(&self) -> &TableBucket { &self.bucket } } -/// Decode an Arrow-IPC encoded `LogRecordBatch` payload into a single Arrow -/// `RecordBatch`. Multiple inner batches (rare for a `LimitScanRequest`) are -/// concatenated. +/// Resolves the leader, sends the `LimitScanRequest`, and decodes the response +/// into one [`ScanBatch`]. +async fn run_limit_scan(pending: &PendingScan, bucket: &TableBucket) -> Result { + let leader = pending + .metadata + .leader_for(&pending.table_info.table_path, bucket) + .await? + .ok_or_else(|| { + Error::leader_not_available(format!("No leader found for table bucket: {bucket}")) + })?; + let connection = pending.rpc_client.get_connection(&leader).await?; + + let request = LimitScanRequest::new( + pending.table_info.table_id, + bucket.partition_id(), + bucket.bucket_id(), + pending.limit, + ); + let response = connection.request(request).await?; + + if let Some(error_code) = response.error_code + && error_code != FlussError::None.code() + { + let err: ApiError = ErrorResponse { + error_code, + error_message: response.error_message.clone(), + } + .into(); + return Err(Error::FlussAPIError { api_error: err }); + } + + let raw = response.records.unwrap_or_default(); + // `limit` is validated positive by `TableScan::limit`. + let limit = pending.limit.max(0) as usize; + let projected = pending.projected_fields.as_deref(); + + // Choose the payload format from table metadata, not the response's advisory + // `is_log_table` flag. + let (batch, base_offset) = if !pending.table_info.has_primary_key() { + decode_log_batch(&pending.table_info, projected, raw, limit)? + } else { + // KV (primary-key) limit scan: no log offset, so base_offset is 0. + let batch = decode_kv_batch( + &pending.table_info, + &pending.schema_getter, + projected, + raw, + limit, + ) + .await?; + (batch, 0) + }; + + Ok(ScanBatch::new(bucket.clone(), batch, base_offset)) +} + +/// Decode the log payload into a single Arrow `RecordBatch`, concatenating any +/// inner batches. If more than `limit` rows are returned, the last `limit` are +/// kept and `base_offset` is advanced by the number dropped. fn decode_log_batch( table_info: &TableInfo, projected_fields: Option<&[usize]>, raw: Vec, + limit: usize, ) -> Result<(RecordBatch, i64)> { let row_type = Arc::new(table_info.get_row_type().clone()); let full_schema = to_arrow_schema(table_info.get_row_type())?; @@ -159,10 +189,9 @@ fn decode_log_batch( let target_schema: SchemaRef = match projected_fields { None => full_schema, - Some(fields) => ArrowReadContext::project_schema( - to_arrow_schema(table_info.get_row_type())?, - fields, - )?, + Some(fields) => { + ArrowReadContext::project_schema(to_arrow_schema(table_info.get_row_type())?, fields)? + } }; if raw.is_empty() { @@ -181,89 +210,189 @@ fn decode_log_batch( } let base_offset = base_offset.unwrap_or(0); - if batches.is_empty() { - return Ok((RecordBatch::new_empty(target_schema), base_offset)); - } - if batches.len() == 1 { - return Ok((batches.into_iter().next().unwrap(), base_offset)); - } - let merged = arrow::compute::concat_batches(&target_schema, batches.iter()).map_err(|e| { - Error::UnexpectedError { + let merged = if batches.is_empty() { + RecordBatch::new_empty(target_schema) + } else if batches.len() == 1 { + batches.into_iter().next().unwrap() + } else { + concat_batches(&target_schema, batches.iter()).map_err(|e| Error::UnexpectedError { message: format!("Failed to concatenate log record batches: {e}"), source: None, - } - })?; - Ok((merged, base_offset)) + })? + }; + + Ok(take_last_rows(merged, base_offset, limit)) } -/// Decode a KV-format payload into a single Arrow `RecordBatch`. Each -/// `CompactedRow` is appended through [`RowAppendRecordBatchBuilder`]; deletion -/// records (no value) are skipped because primary key tables don't return -/// tombstones from a limit scan. -fn decode_kv_batch( +/// Decode a KV limit-scan [`ValueRecordBatch`] into a single Arrow +/// `RecordBatch`, decoding each record by its own schema id and projecting onto +/// the current schema. +async fn decode_kv_batch( table_info: &TableInfo, + schema_getter: &ClientSchemaGetter, projected_fields: Option<&[usize]>, raw: Vec, + limit: usize, ) -> Result { - let row_type = table_info.get_row_type(); - let full_arrow_schema = to_arrow_schema(row_type)?; - + // No records: return an empty (projected) batch. if raw.is_empty() { - let schema: SchemaRef = match projected_fields { - None => full_arrow_schema, - Some(fields) => ArrowReadContext::project_schema(full_arrow_schema, fields)?, - }; - return Ok(RecordBatch::new_empty(schema)); + return empty_record_batch(table_info.get_row_type(), projected_fields); } let kv_format = table_info.table_config.get_kv_format()?; - let schema_getter = Arc::new(TableInfoSchemaGetter { - schema: Arc::new(table_info.get_schema().clone()), - }); - let read_context = KvRecordReadContext::new(kv_format, schema_getter); - - // The KV records payload may be a single batch or a sequence of batches. - // The server-side `LimitScanResponse` returns one batch in practice, but - // we walk the buffer defensively. - let bytes = Bytes::from(raw); - let mut builder = RowAppendRecordBatchBuilder::new(row_type)?; - let mut position = 0usize; - - while position < bytes.len() { - let kv_batch = KvRecordBatch::new(bytes.clone(), position); - let size = kv_batch.size_in_bytes().map_err(|e| Error::UnexpectedError { - message: format!("Invalid KvRecordBatch length: {e}"), + let target_schema = table_info.get_schema(); + let target_schema_id = + i16::try_from(table_info.get_schema_id()).map_err(|_| Error::UnexpectedError { + message: format!( + "Schema id {} does not fit in 16 bits — wire format violated", + table_info.get_schema_id() + ), source: None, })?; - let records = kv_batch.records_unchecked(&read_context as &dyn KvReadContext)?; - let decoder = records.decoder_arc(); - for record in records { - let record = record.map_err(|e| Error::UnexpectedError { - message: format!("Failed to read KV record: {e}"), + let batch = ValueRecordBatch::new(Bytes::from(raw)); + let ranges = batch.value_ranges()?; + + // Collect the distinct schema ids present, then build one decoder per id + // (fetching older schemas via the coordinator as needed). + let mut schema_ids: Vec = Vec::new(); + for range in &ranges { + let id = read_schema_id(&batch.data()[range.clone()])?; + if !schema_ids.contains(&id) { + schema_ids.push(id); + } + } + let decoders = build_kv_decoders( + schema_getter, + target_schema, + target_schema_id, + kv_format, + &schema_ids, + ) + .await?; + + value_records_to_record_batch( + &batch, + &ranges, + &decoders, + table_info.get_row_type(), + projected_fields, + limit, + ) +} + +/// Build one [`FixedSchemaDecoder`] per distinct schema id. The current schema +/// decodes without projection; older schemas are fetched and projected onto the +/// current schema. +async fn build_kv_decoders( + schema_getter: &ClientSchemaGetter, + target_schema: &Schema, + target_schema_id: i16, + kv_format: KvFormat, + schema_ids: &[i16], +) -> Result> { + let mut decoders = HashMap::with_capacity(schema_ids.len()); + for &id in schema_ids { + if decoders.contains_key(&id) { + continue; + } + let decoder = if id == target_schema_id { + FixedSchemaDecoder::new_no_projection(kv_format, target_schema)? + } else { + let source = schema_getter.get_schema(id as i32).await?; + FixedSchemaDecoder::new(kv_format, source.as_ref(), target_schema)? + }; + decoders.insert(id, decoder); + } + Ok(decoders) +} + +/// Decode every value record into a row shaped by `target_row_type`, build a +/// single Arrow batch, keep the last `limit` rows, then apply column projection. +fn value_records_to_record_batch( + batch: &ValueRecordBatch, + ranges: &[Range], + decoders: &HashMap, + target_row_type: &RowType, + projected_fields: Option<&[usize]>, + limit: usize, +) -> Result { + let mut builder = RowAppendRecordBatchBuilder::new(target_row_type)?; + for range in ranges { + let payload = &batch.data()[range.clone()]; + let schema_id = read_schema_id(payload)?; + let decoder = decoders + .get(&schema_id) + .ok_or_else(|| Error::UnexpectedError { + message: format!("No decoder built for schema id {schema_id}"), source: None, })?; - if let Some(row) = record.row(&*decoder) { - builder.append(&row)?; - } - } + let row = decoder.decode(payload)?; + builder.append(&row)?; + } + + let full = Arc::unwrap_or_clone(builder.build_arrow_record_batch()?); + let (full, _) = take_last_rows(full, 0, limit); + project_batch(full, target_row_type, projected_fields) +} - position = position.checked_add(size).ok_or_else(|| Error::UnexpectedError { - message: "KvRecordBatch position overflow".to_string(), +/// Read the leading little-endian schema id from a `[schema_id | row]` payload. +fn read_schema_id(payload: &[u8]) -> Result { + if payload.len() < SCHEMA_ID_LENGTH { + return Err(Error::UnexpectedError { + message: format!( + "Value record payload too short: {} bytes, need {} for schema id", + payload.len(), + SCHEMA_ID_LENGTH + ), source: None, - })?; + }); + } + let schema_id = LittleEndian::read_i16(&payload[..SCHEMA_ID_LENGTH]); + if schema_id < 0 { + return Err(Error::UnexpectedError { + message: format!("Invalid negative schema id {schema_id}; payload is corrupt"), + source: None, + }); } + Ok(schema_id) +} - let full_batch = Arc::unwrap_or_clone(builder.build_arrow_record_batch()?); +/// Keep the last `limit` rows of `batch`, advancing `base_offset` by the number +/// of dropped leading rows. A `batch` at or under the limit is returned as-is. +fn take_last_rows(batch: RecordBatch, base_offset: i64, limit: usize) -> (RecordBatch, i64) { + let rows = batch.num_rows(); + if rows > limit { + let dropped = rows - limit; + (batch.slice(dropped, limit), base_offset + dropped as i64) + } else { + (batch, base_offset) + } +} + +/// An empty `RecordBatch` with the (optionally projected) target schema. +fn empty_record_batch( + target_row_type: &RowType, + projected_fields: Option<&[usize]>, +) -> Result { + let empty = RecordBatch::new_empty(to_arrow_schema(target_row_type)?); + project_batch(empty, target_row_type, projected_fields) +} +/// Project `batch` (shaped by `target_row_type`) onto the requested columns. +fn project_batch( + batch: RecordBatch, + target_row_type: &RowType, + projected_fields: Option<&[usize]>, +) -> Result { match projected_fields { - None => Ok(full_batch), + None => Ok(batch), Some(fields) => { let projected_schema = - ArrowReadContext::project_schema(full_arrow_schema, fields)?; + ArrowReadContext::project_schema(to_arrow_schema(target_row_type)?, fields)?; let columns: Vec<_> = fields .iter() - .map(|&idx| full_batch.column(idx).clone()) + .map(|&idx| batch.column(idx).clone()) .collect(); Ok(RecordBatch::try_new(projected_schema, columns)?) } @@ -279,11 +408,14 @@ mod tests { DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, }; use crate::metadata::{ - DataField, DataTypes, PhysicalTablePath, Schema, TableDescriptor, TableInfo, - TablePath, + Column, DataField, DataType, DataTypes, PhysicalTablePath, Schema, TableDescriptor, + TableInfo, TablePath, }; use crate::record::MemoryLogRecordsArrowBuilder; use crate::row::GenericRow; + use crate::row::binary::BinaryWriter; + use crate::row::compacted::CompactedRowWriter; + use arrow::array::{Array, Int32Array, Int64Array}; fn build_two_col_table_info() -> TableInfo { let row_type = DataTypes::row(vec![ @@ -309,7 +441,11 @@ mod tests { ) } - fn build_log_records(table_info: &TableInfo, base_offset: i64, rows: &[(i32, &str)]) -> Vec { + fn build_log_records( + table_info: &TableInfo, + base_offset: i64, + rows: &[(i32, &str)], + ) -> Vec { let row_type = table_info.get_row_type(); let table_path = table_info.table_path.clone(); let table_info_arc = Arc::new(table_info.clone()); @@ -347,11 +483,13 @@ mod tests { data } + // ---- log path ---------------------------------------------------------- + #[test] fn decode_log_batch_empty_returns_empty_record_batch() { let table_info = build_two_col_table_info(); let (batch, base_offset) = - decode_log_batch(&table_info, None, Vec::new()).expect("decode empty"); + decode_log_batch(&table_info, None, Vec::new(), usize::MAX).expect("decode empty"); assert_eq!(batch.num_rows(), 0); assert_eq!(batch.num_columns(), 2); assert_eq!(base_offset, 0); @@ -361,7 +499,8 @@ mod tests { fn decode_log_batch_empty_with_projection() { let table_info = build_two_col_table_info(); let (batch, base_offset) = - decode_log_batch(&table_info, Some(&[1usize]), Vec::new()).expect("decode empty"); + decode_log_batch(&table_info, Some(&[1usize]), Vec::new(), usize::MAX) + .expect("decode empty"); assert_eq!(batch.num_rows(), 0); assert_eq!(batch.num_columns(), 1); assert_eq!(batch.schema().field(0).name(), "name"); @@ -374,7 +513,7 @@ mod tests { let raw = build_log_records(&table_info, 17, &[(1, "alice"), (2, "bob"), (3, "carol")]); let (batch, base_offset) = - decode_log_batch(&table_info, None, raw).expect("decode populated"); + decode_log_batch(&table_info, None, raw, usize::MAX).expect("decode populated"); assert_eq!(batch.num_rows(), 3); assert_eq!(batch.num_columns(), 2); assert_eq!(base_offset, 17); @@ -385,51 +524,244 @@ mod tests { let table_info = build_two_col_table_info(); let raw = build_log_records(&table_info, 0, &[(7, "x"), (8, "y")]); - let (batch, _) = - decode_log_batch(&table_info, Some(&[0usize]), raw).expect("decode projected"); + let (batch, _) = decode_log_batch(&table_info, Some(&[0usize]), raw, usize::MAX) + .expect("decode projected"); assert_eq!(batch.num_rows(), 2); assert_eq!(batch.num_columns(), 1); assert_eq!(batch.schema().field(0).name(), "id"); } #[test] - fn decode_kv_batch_empty_returns_empty_record_batch() { + fn decode_log_batch_truncates_to_last_limit_rows() { let table_info = build_two_col_table_info(); - let batch = decode_kv_batch(&table_info, None, Vec::new()).expect("decode empty kv"); - assert_eq!(batch.num_rows(), 0); - assert_eq!(batch.num_columns(), 2); + // Server returned 4 rows starting at offset 100, but limit is 2. + let raw = build_log_records(&table_info, 100, &[(1, "a"), (2, "b"), (3, "c"), (4, "d")]); + + let (batch, base_offset) = decode_log_batch(&table_info, None, raw, 2).expect("decode"); + assert_eq!(batch.num_rows(), 2); + // The last two rows are kept, so the base offset advances by 2. + assert_eq!(base_offset, 102); + let ids = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.value(0), 3); + assert_eq!(ids.value(1), 4); + } + + // ---- KV path ----------------------------------------------------------- + + fn schema_with_ids(columns: &[(i32, &str, DataType)]) -> Schema { + let cols: Vec = columns + .iter() + .map(|(id, name, dt)| Column::new(*name, dt.clone()).with_id(*id)) + .collect(); + Schema::builder().with_columns(cols).build().unwrap() + } + + /// Encode a value-record batch from `(schema_id, compacted-row-bytes)` + /// pairs, matching the Java `DefaultValueRecordBatch` wire layout. + fn value_batch(records: &[(i16, Vec)]) -> ValueRecordBatch { + let mut body = Vec::new(); + for (schema_id, row) in records { + let rec_len = (SCHEMA_ID_LENGTH + row.len()) as i32; + body.extend_from_slice(&rec_len.to_le_bytes()); + body.extend_from_slice(&schema_id.to_le_bytes()); + body.extend_from_slice(row); + } + let mut out = Vec::new(); + out.extend_from_slice(&((1 + 4 + body.len()) as i32).to_le_bytes()); // Length + out.push(0); // Magic + out.extend_from_slice(&(records.len() as i32).to_le_bytes()); // RecordCount + out.extend_from_slice(&body); + ValueRecordBatch::new(Bytes::from(out)) + } + + fn compacted(field_count: usize, write: impl FnOnce(&mut CompactedRowWriter)) -> Vec { + let mut w = CompactedRowWriter::new(field_count); + write(&mut w); + w.to_bytes().as_ref().to_vec() + } + + fn id_name_schema() -> Schema { + schema_with_ids(&[ + (0, "id", DataTypes::int()), + (1, "name", DataTypes::string()), + ]) } #[test] - fn decode_kv_batch_empty_with_projection() { - let table_info = build_two_col_table_info(); - let batch = decode_kv_batch(&table_info, Some(&[0usize]), Vec::new()) - .expect("decode projected empty kv"); - assert_eq!(batch.num_rows(), 0); - assert_eq!(batch.num_columns(), 1); - assert_eq!(batch.schema().field(0).name(), "id"); + fn value_records_empty_returns_empty_batch() { + let schema = id_name_schema(); + let batch = value_batch(&[]); + let ranges = batch.value_ranges().unwrap(); + let rb = value_records_to_record_batch( + &batch, + &ranges, + &HashMap::new(), + schema.row_type(), + None, + usize::MAX, + ) + .expect("decode empty kv"); + assert_eq!(rb.num_rows(), 0); + assert_eq!(rb.num_columns(), 2); } - #[tokio::test] - async fn poll_batch_returns_batch_then_none() { - let table_info = build_two_col_table_info(); - let raw = build_log_records(&table_info, 5, &[(1, "alice"), (2, "bob")]); - let (batch, base_offset) = decode_log_batch(&table_info, None, raw).expect("decode"); - - let bucket = TableBucket::new(table_info.table_id, 0); - let mut scanner = BatchScanner { - bucket: bucket.clone(), - batch: Some(batch), - base_offset, - }; + #[test] + fn empty_kv_payload_returns_empty_batch() { + let schema = id_name_schema(); + // Full schema. + let rb = empty_record_batch(schema.row_type(), None).expect("empty"); + assert_eq!(rb.num_rows(), 0); + assert_eq!(rb.num_columns(), 2); + // Projected. + let rb = empty_record_batch(schema.row_type(), Some(&[1usize])).expect("empty projected"); + assert_eq!(rb.num_rows(), 0); + assert_eq!(rb.num_columns(), 1); + assert_eq!(rb.schema().field(0).name(), "name"); + } - let first = scanner.poll_batch().await.expect("poll").expect("some"); - assert_eq!(first.bucket(), &bucket); - assert_eq!(first.num_records(), 2); - assert_eq!(first.base_offset(), 5); - assert_eq!(first.last_offset(), 6); + #[test] + fn value_records_decode_rows() { + let schema = id_name_schema(); + let decoder = FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &schema).unwrap(); + let mut decoders = HashMap::new(); + decoders.insert(0i16, decoder); + + let r0 = compacted(2, |w| { + w.write_int(1); + w.write_string("alice"); + }); + let r1 = compacted(2, |w| { + w.write_int(2); + w.write_string("bob"); + }); + let batch = value_batch(&[(0, r0), (0, r1)]); + let ranges = batch.value_ranges().unwrap(); + + let rb = value_records_to_record_batch( + &batch, + &ranges, + &decoders, + schema.row_type(), + None, + usize::MAX, + ) + .expect("decode kv rows"); + assert_eq!(rb.num_rows(), 2); + let ids = rb.column(0).as_any().downcast_ref::().unwrap(); + assert_eq!(ids.value(0), 1); + assert_eq!(ids.value(1), 2); + } + + #[test] + fn value_records_limit_keeps_last_rows() { + let schema = id_name_schema(); + let decoder = FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &schema).unwrap(); + let mut decoders = HashMap::new(); + decoders.insert(0i16, decoder); + + let records: Vec<(i16, Vec)> = (1..=5) + .map(|i| { + ( + 0i16, + compacted(2, |w| { + w.write_int(i); + w.write_string("x"); + }), + ) + }) + .collect(); + let batch = value_batch(&records); + let ranges = batch.value_ranges().unwrap(); + + let rb = + value_records_to_record_batch(&batch, &ranges, &decoders, schema.row_type(), None, 3) + .expect("decode kv rows"); + assert_eq!(rb.num_rows(), 3); + let ids = rb.column(0).as_any().downcast_ref::().unwrap(); + // Last 3 of [1,2,3,4,5]. + assert_eq!(ids.values(), &[3, 4, 5]); + } - let second = scanner.poll_batch().await.expect("poll"); - assert!(second.is_none()); + #[test] + fn value_records_projection_keeps_requested_columns() { + let schema = id_name_schema(); + let decoder = FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &schema).unwrap(); + let mut decoders = HashMap::new(); + decoders.insert(0i16, decoder); + + let r0 = compacted(2, |w| { + w.write_int(9); + w.write_string("nine"); + }); + let batch = value_batch(&[(0, r0)]); + let ranges = batch.value_ranges().unwrap(); + + let rb = value_records_to_record_batch( + &batch, + &ranges, + &decoders, + schema.row_type(), + Some(&[1usize]), + usize::MAX, + ) + .expect("decode projected kv"); + assert_eq!(rb.num_columns(), 1); + assert_eq!(rb.schema().field(0).name(), "name"); + } + + #[test] + fn value_records_decode_across_schema_evolution() { + // Source schema (older): [id, name]. Target (current): added `age`. + let source = id_name_schema(); + let target = schema_with_ids(&[ + (0, "id", DataTypes::int()), + (1, "name", DataTypes::string()), + (2, "age", DataTypes::bigint()), + ]); + + let mut decoders = HashMap::new(); + // Records with schema id 0 were written under the old schema. + decoders.insert( + 0i16, + FixedSchemaDecoder::new(KvFormat::COMPACTED, &source, &target).unwrap(), + ); + // Records with schema id 1 carry the current schema. + decoders.insert( + 1i16, + FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &target).unwrap(), + ); + + let old_row = compacted(2, |w| { + w.write_int(1); + w.write_string("alice"); + }); + let new_row = compacted(3, |w| { + w.write_int(2); + w.write_string("bob"); + w.write_long(30); + }); + let batch = value_batch(&[(0, old_row), (1, new_row)]); + let ranges = batch.value_ranges().unwrap(); + + let rb = value_records_to_record_batch( + &batch, + &ranges, + &decoders, + target.row_type(), + None, + usize::MAX, + ) + .expect("decode mixed-schema kv"); + + assert_eq!(rb.num_rows(), 2); + assert_eq!(rb.num_columns(), 3); + let age = rb.column(2).as_any().downcast_ref::().unwrap(); + // Old record has no `age` column -> null; new record carries 30. + assert!(age.is_null(0), "old-schema record must read age as null"); + assert_eq!(age.value(1), 30); } } diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 5ef5d179..657a44bf 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -36,7 +36,7 @@ mod scanner; mod upsert; pub use append::{AppendWriter, TableAppend}; -pub use batch_scanner::BatchScanner; +pub use batch_scanner::LimitBatchScanner; pub use lookup::{LookupResult, Lookuper, PrefixKeyLookuper, TableLookup, TablePrefixLookup}; pub use reader::{RecordBatchLogReader, SyncRecordBatchLogReader}; pub use remote_log::{ diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 031975b3..35cc52e3 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. +use crate::client::ClientSchemaGetter; use crate::client::connection::FlussConnection; -use crate::client::table::batch_scanner::BatchScanner; use crate::client::credentials::SecurityTokenManager; use crate::client::metadata::Metadata; +use crate::client::table::batch_scanner::LimitBatchScanner; use crate::client::table::log_fetch_buffer::{ CompletedFetch, DefaultCompletedFetch, FetchErrorAction, FetchErrorContext, FetchErrorLogLevel, LogFetchBuffer, RemotePendingFetch, @@ -27,7 +28,9 @@ use crate::client::table::remote_log::{RemoteLogDownloader, RemoteLogFetchInfo}; use crate::config::Config; use crate::error::Error::UnsupportedOperation; use crate::error::{ApiError, Error, FlussError, Result}; -use crate::metadata::{LogFormat, PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath}; +use crate::metadata::{ + LogFormat, PhysicalTablePath, RowType, SchemaInfo, TableBucket, TableInfo, TablePath, +}; use crate::metrics::ScannerMetrics; use crate::proto::{ ErrorResponse, FetchLogRequest, FetchLogResponse, PbFetchLogReqForBucket, PbFetchLogReqForTable, @@ -71,10 +74,10 @@ impl<'a> TableScan<'a> { } } - /// Sets a row limit for the scan, enabling [`Self::create_batch_scanner`]. + /// Sets a row limit for the scan, enabling [`Self::create_bucket_batch_scanner`]. /// - /// The limit must be positive. Callers configure a limit prior to - /// constructing a `BatchScanner` for a one-shot bounded read. + /// The limit must be positive. A limit is incompatible with the log + /// scanners, which reject it. pub fn limit(mut self, n: i32) -> Result { if n <= 0 { return Err(Error::IllegalArgument { @@ -85,17 +88,31 @@ impl<'a> TableScan<'a> { Ok(self) } - /// Creates a `BatchScanner` that performs a single bounded scan of `table_bucket`. + /// Log scanners don't support limit pushdown; reject a configured limit + /// rather than silently ignoring it. + fn reject_limit(&self, scanner: &str) -> Result<()> { + if let Some(limit) = self.limit { + return Err(Error::UnsupportedOperation { + message: format!( + "{scanner} doesn't support limit pushdown. Table: {}, requested limit: {limit}", + self.table_info.table_path + ), + }); + } + Ok(()) + } + + /// Creates a one-shot bounded scan of `table_bucket`. /// - /// Requires a previously-configured limit via [`Self::limit`]. The scanner sends - /// a `LimitScanRequest` eagerly and exposes the resulting batch through - /// [`BatchScanner::poll_batch`]. - pub async fn create_batch_scanner( + /// Requires a previously-configured limit via [`Self::limit`]. Creation is + /// cheap; the `LimitScanRequest` runs on the first + /// [`LimitBatchScanner::next_batch`]. + pub fn create_bucket_batch_scanner( self, table_bucket: TableBucket, - ) -> Result { + ) -> Result { let limit = self.limit.ok_or_else(|| Error::IllegalArgument { - message: "create_batch_scanner requires a limit configured via .limit(n)" + message: "create_bucket_batch_scanner requires a limit configured via .limit(n)" .to_string(), })?; if table_bucket.table_id() != self.table_info.table_id { @@ -107,15 +124,40 @@ impl<'a> TableScan<'a> { ), }); } - BatchScanner::new( + let num_buckets = self.table_info.get_num_buckets(); + if table_bucket.bucket_id() < 0 || table_bucket.bucket_id() >= num_buckets { + return Err(Error::IllegalArgument { + message: format!( + "Bucket id {} out of range for table with {num_buckets} buckets", + table_bucket.bucket_id() + ), + }); + } + // Log tables decode as Arrow IPC, so only ARROW format is supported (KV + // tables use the value-record path and are exempt). + if !self.table_info.has_primary_key() { + validate_scan_support(&self.table_info.table_path, &self.table_info)?; + } + // Pre-seed the current schema; older versions are fetched lazily during + // KV decode. Mirrors `Table::new_lookup`. + let latest = SchemaInfo::new( + self.table_info.get_schema().clone(), + self.table_info.get_schema_id(), + ); + let schema_getter = Arc::new(ClientSchemaGetter::new( + self.table_info.table_path.clone(), + self.conn.get_admin()?, + latest, + )); + Ok(LimitBatchScanner::new( self.conn.get_connections(), self.metadata.clone(), self.table_info, + schema_getter, self.projected_fields, table_bucket, limit, - ) - .await + )) } /// Projects the scan to only include specified columns by their indices. @@ -270,6 +312,7 @@ impl<'a> TableScan<'a> { } pub fn create_log_scanner(self) -> Result { + self.reject_limit("LogScanner")?; validate_scan_support(&self.table_info.table_path, &self.table_info)?; let inner = LogScannerInner::new( &self.table_info, @@ -284,6 +327,7 @@ impl<'a> TableScan<'a> { } pub fn create_record_batch_log_scanner(self) -> Result { + self.reject_limit("RecordBatchLogScanner")?; validate_scan_support(&self.table_info.table_path, &self.table_info)?; let inner = LogScannerInner::new( &self.table_info, diff --git a/crates/fluss/src/record/kv/mod.rs b/crates/fluss/src/record/kv/mod.rs index 857c5e5f..8f7750a8 100644 --- a/crates/fluss/src/record/kv/mod.rs +++ b/crates/fluss/src/record/kv/mod.rs @@ -22,6 +22,7 @@ mod kv_record_batch; mod kv_record_batch_builder; mod kv_record_read_context; mod read_context; +mod value_record_batch; #[cfg(test)] mod test_util; @@ -31,6 +32,7 @@ pub use kv_record_batch::*; pub use kv_record_batch_builder::*; pub use kv_record_read_context::{KvRecordReadContext, SchemaGetter}; pub use read_context::ReadContext; +pub use value_record_batch::ValueRecordBatch; /// Current KV magic value pub const CURRENT_KV_MAGIC_VALUE: u8 = 0; diff --git a/crates/fluss/src/record/kv/value_record_batch.rs b/crates/fluss/src/record/kv/value_record_batch.rs new file mode 100644 index 00000000..cfcb4a6d --- /dev/null +++ b/crates/fluss/src/record/kv/value_record_batch.rs @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reader for the value-record batch returned by a KV (primary-key) limit +//! scan. This is a distinct wire format from [`super::KvRecordBatch`]: it +//! carries value-only records (no keys, no CRC/writer-id header) and a schema +//! id *per record* rather than per batch. +//! +//! Batch layout (little-endian): +//! - Length => Int32 (size of everything after this field) +//! - Magic => Int8 +//! - RecordCount => Int32 +//! - Records => [ValueRecord] +//! +//! Each `ValueRecord`: +//! - Length => Int32 (size after this field: SchemaId + Value) +//! - SchemaId => Int16 +//! - Value => row bytes +//! +//! Reference: `org.apache.fluss.record.DefaultValueRecordBatch` and +//! `org.apache.fluss.record.DefaultValueRecord`. + +use crate::error::{Error, Result}; +use byteorder::{ByteOrder, LittleEndian}; +use bytes::Bytes; +use std::ops::Range; + +const LENGTH_LENGTH: usize = 4; +const MAGIC_LENGTH: usize = 1; +const RECORD_COUNT_LENGTH: usize = 4; +/// Offset of the record count within the batch header. +const RECORD_COUNT_OFFSET: usize = LENGTH_LENGTH + MAGIC_LENGTH; +/// Size of the batch header (`Length + Magic + RecordCount`). +const RECORD_BATCH_HEADER_SIZE: usize = LENGTH_LENGTH + MAGIC_LENGTH + RECORD_COUNT_LENGTH; +/// Size of a `ValueRecord`'s leading length field. +const RECORD_LENGTH_LENGTH: usize = 4; + +/// Read-only view over a serialized value-record batch. +pub struct ValueRecordBatch { + data: Bytes, +} + +impl ValueRecordBatch { + /// Wraps raw batch bytes. The batch is expected to start at offset 0. + pub fn new(data: Bytes) -> Self { + Self { data } + } + + /// Number of records declared in the batch header. + pub fn record_count(&self) -> Result { + if self.data.len() < RECORD_BATCH_HEADER_SIZE { + return Err(corrupt(format!( + "value-record batch too short: {} bytes, need {} for header", + self.data.len(), + RECORD_BATCH_HEADER_SIZE + ))); + } + Ok(LittleEndian::read_i32( + &self.data[RECORD_COUNT_OFFSET..RECORD_COUNT_OFFSET + RECORD_COUNT_LENGTH], + )) + } + + /// Returns one byte range per record, each spanning `[SchemaId | Value]`: + /// the payload [`crate::row::FixedSchemaDecoder::decode`] expects. Index + /// [`Self::data`] with a returned range to get it without copying. + pub fn value_ranges(&self) -> Result>> { + let count = self.record_count()?; + if count < 0 { + return Err(corrupt(format!("invalid record count {count}"))); + } + let mut ranges = Vec::with_capacity(count as usize); + let mut pos = RECORD_BATCH_HEADER_SIZE; + for i in 0..count as usize { + if pos + RECORD_LENGTH_LENGTH > self.data.len() { + return Err(corrupt(format!( + "truncated value-record batch: record {i} length field runs past end" + ))); + } + let rec_len = LittleEndian::read_i32(&self.data[pos..pos + RECORD_LENGTH_LENGTH]); + if rec_len < 0 { + return Err(corrupt(format!("record {i} has negative length {rec_len}"))); + } + let start = pos + RECORD_LENGTH_LENGTH; + let end = start + rec_len as usize; + if end > self.data.len() { + return Err(corrupt(format!( + "truncated value-record batch: record {i} payload runs past end" + ))); + } + ranges.push(start..end); + pos = end; + } + Ok(ranges) + } + + /// The underlying batch bytes. + pub fn data(&self) -> &Bytes { + &self.data + } +} + +fn corrupt(message: String) -> Error { + Error::UnexpectedError { + message, + source: None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::record::kv::SCHEMA_ID_LENGTH; + + /// Build a value-record batch from `(schema_id, row_bytes)` pairs, mirroring + /// the Java `DefaultValueRecordBatch.Builder` wire layout. + fn build_batch(records: &[(i16, &[u8])]) -> Vec { + let mut body = Vec::new(); + for (schema_id, row) in records { + let rec_len = (SCHEMA_ID_LENGTH + row.len()) as i32; + body.extend_from_slice(&rec_len.to_le_bytes()); + body.extend_from_slice(&schema_id.to_le_bytes()); + body.extend_from_slice(row); + } + let mut out = Vec::new(); + // Length covers Magic + RecordCount + body. + let length = (MAGIC_LENGTH + RECORD_COUNT_LENGTH + body.len()) as i32; + out.extend_from_slice(&length.to_le_bytes()); + out.push(0); // magic + out.extend_from_slice(&(records.len() as i32).to_le_bytes()); + out.extend_from_slice(&body); + out + } + + #[test] + fn parses_record_count_and_ranges() { + let raw = build_batch(&[(7, &[1, 2, 3]), (7, &[4, 5])]); + let batch = ValueRecordBatch::new(Bytes::from(raw)); + assert_eq!(batch.record_count().unwrap(), 2); + + let ranges = batch.value_ranges().unwrap(); + assert_eq!(ranges.len(), 2); + // First record payload = [schema_id(2) | row(3)] = 5 bytes. + let r0 = &batch.data()[ranges[0].clone()]; + assert_eq!(r0.len(), 5); + assert_eq!(LittleEndian::read_i16(&r0[..2]), 7); + assert_eq!(&r0[2..], &[1, 2, 3]); + // Second record payload = [schema_id(2) | row(2)] = 4 bytes. + let r1 = &batch.data()[ranges[1].clone()]; + assert_eq!(r1.len(), 4); + assert_eq!(&r1[2..], &[4, 5]); + } + + #[test] + fn empty_batch_has_no_ranges() { + let raw = build_batch(&[]); + let batch = ValueRecordBatch::new(Bytes::from(raw)); + assert_eq!(batch.record_count().unwrap(), 0); + assert!(batch.value_ranges().unwrap().is_empty()); + } + + #[test] + fn truncated_payload_errors() { + let mut raw = build_batch(&[(7, &[1, 2, 3])]); + raw.truncate(raw.len() - 2); // chop into the row payload + let batch = ValueRecordBatch::new(Bytes::from(raw)); + assert!(batch.value_ranges().is_err()); + } + + #[test] + fn short_header_errors() { + let batch = ValueRecordBatch::new(Bytes::from(vec![0u8, 1, 2])); + assert!(batch.record_count().is_err()); + } +} diff --git a/crates/fluss/tests/integration/batch_scanner.rs b/crates/fluss/tests/integration/batch_scanner.rs index 26235281..0b484a8c 100644 --- a/crates/fluss/tests/integration/batch_scanner.rs +++ b/crates/fluss/tests/integration/batch_scanner.rs @@ -19,12 +19,13 @@ #[cfg(test)] mod batch_scanner_test { use crate::integration::utils::{create_table, get_shared_cluster}; - use arrow::array::record_batch; - use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath}; - use std::time::Duration; + use arrow::array::{Int32Array, StringArray, record_batch}; + use fluss::metadata::{DataTypes, LogFormat, Schema, TableBucket, TableDescriptor, TablePath}; + use fluss::row::GenericRow; + use std::collections::HashMap; - /// End-to-end check that BatchScanner returns the appended rows on first - /// poll and `None` on the next, honoring the configured limit. + /// End-to-end check that the scanner yields the appended rows once and then + /// `None`, honoring the configured limit. #[tokio::test] async fn batch_scanner_returns_appended_rows_then_none() { let cluster = get_shared_cluster(); @@ -61,22 +62,18 @@ mod batch_scanner_test { writer.append_arrow_batch(batch).expect("append batch"); writer.flush().await.expect("flush"); - // Give the server a moment to commit and make the records readable. - tokio::time::sleep(Duration::from_secs(1)).await; - let table_info = table.get_table_info(); let bucket = TableBucket::new(table_info.table_id, 0); - let mut batch_scanner = table + let mut scanner = table .new_scan() .limit(3) .expect("limit") - .create_batch_scanner(bucket.clone()) - .await + .create_bucket_batch_scanner(bucket.clone()) .expect("create batch scanner"); - let first = batch_scanner - .poll_batch() + let first = scanner + .next_batch() .await .expect("poll") .expect("first batch should be Some"); @@ -90,17 +87,109 @@ mod batch_scanner_test { first.num_records() ); - let second = batch_scanner - .poll_batch() + assert!( + scanner.next_batch().await.expect("poll").is_none(), + "scanner must end after one batch" + ); + } + + /// Limit scan on a primary-key table: decodes the value-record batch and + /// honors the limit. Exercises the KV wire path (distinct from the log one). + #[tokio::test] + async fn batch_scanner_reads_primary_key_table() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_batch_scanner_pk"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id"]) + .build() + .expect("schema"), + ) + // Single bucket so one BatchScanner sees every row. + .distributed_by(Some(1), vec!["id".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let writer = table + .new_upsert() + .expect("upsert") + .create_writer() + .expect("writer"); + + let expected: HashMap = + [(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")].into(); + for (id, name) in &expected { + let mut row = GenericRow::new(2); + row.set_field(0, *id); + row.set_field(1, *name); + writer.upsert(&row).expect("upsert row"); + } + writer.flush().await.expect("flush"); + + let table_info = table.get_table_info(); + let bucket = TableBucket::new(table_info.table_id, 0); + + let mut scanner = table + .new_scan() + .limit(3) + .expect("limit") + .create_bucket_batch_scanner(bucket.clone()) + .expect("create batch scanner"); + + let first = scanner + .next_batch() .await - .expect("second poll succeeds"); - assert!(second.is_none(), "second poll must return None"); + .expect("poll") + .expect("first batch should be Some"); + + assert_eq!(first.bucket(), &bucket); + let rows = first.batch(); + assert_eq!(rows.num_columns(), 2, "id + name"); + assert!( + rows.num_rows() > 0 && rows.num_rows() <= 3, + "expected 1..=3 records, got {}", + rows.num_rows() + ); + + // Every returned (id, name) must match what we upserted. + let ids = rows + .column(0) + .as_any() + .downcast_ref::() + .expect("id column Int32"); + let names = rows + .column(1) + .as_any() + .downcast_ref::() + .expect("name column Utf8"); + for i in 0..rows.num_rows() { + let id = ids.value(i); + let name = names.value(i); + assert_eq!( + expected.get(&id), + Some(&name), + "decoded row ({id}, {name}) does not match upserted data" + ); + } + + assert!( + scanner.next_batch().await.expect("poll").is_none(), + "scanner must end after one batch" + ); } - /// A bucket id outside the table's bucket range should be rejected by the - /// scanner before any RPC is made. + /// A bucket with the wrong table_id or an out-of-range bucket_id must be + /// rejected before any RPC is made. #[tokio::test] - async fn batch_scanner_requires_matching_table_id() { + async fn batch_scanner_rejects_invalid_bucket() { let cluster = get_shared_cluster(); let connection = cluster.get_fluss_connection().await; let admin = connection.get_admin().expect("admin"); @@ -119,19 +208,64 @@ mod batch_scanner_test { create_table(&admin, &table_path, &descriptor).await; let table = connection.get_table(&table_path).await.expect("table"); + let table_id = table.get_table_info().table_id; + + // Wrong table_id. + assert!( + table + .new_scan() + .limit(1) + .expect("limit") + .create_bucket_batch_scanner(TableBucket::new(table_id + 9999, 0)) + .is_err(), + "must reject mismatched table_id" + ); - // Bucket with a wrong table_id — must fail without hitting the server. - let bogus_bucket = TableBucket::new(table.get_table_info().table_id + 9999, 0); + // Bucket id past the single bucket of this table. + assert!( + table + .new_scan() + .limit(1) + .expect("limit") + .create_bucket_batch_scanner(TableBucket::new(table_id, 99)) + .is_err(), + "must reject out-of-range bucket_id" + ); + } + + /// A limit scan over a non-ARROW log table must be rejected (the log path + /// decodes Arrow IPC). + #[tokio::test] + async fn batch_scanner_rejects_non_arrow_log_format() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_batch_scanner_indexed"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .build() + .expect("schema"), + ) + .log_format(LogFormat::INDEXED) + .distributed_by(Some(1), vec!["c1".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let bucket = TableBucket::new(table.get_table_info().table_id, 0); - let result = table - .new_scan() - .limit(1) - .expect("limit") - .create_batch_scanner(bogus_bucket) - .await; assert!( - result.is_err(), - "batch scanner must reject mismatched table_id" + table + .new_scan() + .limit(1) + .expect("limit") + .create_bucket_batch_scanner(bucket) + .is_err(), + "must reject INDEXED log format" ); } @@ -159,4 +293,46 @@ mod batch_scanner_test { assert!(table.new_scan().limit(0).is_err()); assert!(table.new_scan().limit(-5).is_err()); } + + /// A configured limit must be rejected by the log scanners rather than + /// silently ignored. + #[tokio::test] + async fn limit_is_rejected_by_log_scanners() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_batch_scanner_limit_logscan"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["c1".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + assert!( + table + .new_scan() + .limit(5) + .expect("limit") + .create_log_scanner() + .is_err(), + "create_log_scanner must reject a configured limit" + ); + assert!( + table + .new_scan() + .limit(5) + .expect("limit") + .create_record_batch_log_scanner() + .is_err(), + "create_record_batch_log_scanner must reject a configured limit" + ); + } } diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index e2377e1d..f8323df7 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -217,8 +217,6 @@ mod table_test { // Flush to ensure all writes are acknowledged append_writer.flush().await.expect("Failed to flush"); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - // Test latest offset after appending (should be 3) let latest_offsets_after = admin .list_offsets(&table_path, &[0], OffsetSpec::Latest) diff --git a/crates/fluss/tests/integration/record_batch_log_reader.rs b/crates/fluss/tests/integration/record_batch_log_reader.rs index cf89c065..6c8d5392 100644 --- a/crates/fluss/tests/integration/record_batch_log_reader.rs +++ b/crates/fluss/tests/integration/record_batch_log_reader.rs @@ -21,7 +21,7 @@ mod reader_test { use crate::integration::utils::{ create_partitions, create_table, extract_ids_from_batches, get_shared_cluster, - wait_for_partitions_ready, wait_for_table_buckets_ready, wait_for_table_ready, + wait_for_partitions_ready, }; use arrow::array::record_batch; use fluss::client::{EARLIEST_OFFSET, FlussConnection, RecordBatchLogReader}; @@ -49,7 +49,6 @@ mod reader_test { .build() .expect("Failed to build table"); create_table(&admin, &table_path, &table_descriptor).await; - wait_for_table_ready(&admin, &table_path).await; let table = connection .get_table(&table_path) @@ -122,7 +121,6 @@ mod reader_test { .build() .expect("Failed to build table"); create_table(&admin, &table_path, &table_descriptor).await; - wait_for_table_ready(&admin, &table_path).await; let table = connection .get_table(&table_path) @@ -190,7 +188,6 @@ mod reader_test { .build() .expect("Failed to build table"); create_table(&admin, &table_path, &table_descriptor).await; - wait_for_table_ready(&admin, &table_path).await; let table = connection .get_table(&table_path) @@ -285,7 +282,6 @@ mod reader_test { .build() .expect("Failed to build table"); create_table(&admin, &table_path, &table_descriptor).await; - wait_for_table_buckets_ready(&admin, &table_path, &[0, 1]).await; let table = connection .get_table(&table_path) @@ -381,7 +377,6 @@ mod reader_test { .expect("Failed to build table"); create_table(&admin, &table_path, &table_descriptor).await; - wait_for_table_ready(&admin, &table_path).await; let table = connection .get_table(&table_path) diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 5d983030..bb2ec3e8 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -134,8 +134,10 @@ Complete API reference for the Fluss Rust client. |-----------------------------------------------------------------------------|-----------------------------------------| | `fn project(self, indices: &[usize]) -> Result` | Project columns by index | | `fn project_by_name(self, names: &[&str]) -> Result` | Project columns by name | +| `fn limit(self, n: i32) -> Result` | Set a row limit (enables `create_bucket_batch_scanner`; rejected by log scanners) | | `fn create_log_scanner(self) -> Result` | Create a record-based log scanner | | `fn create_record_batch_log_scanner(self) -> Result` | Create an Arrow batch-based log scanner | +| `fn create_bucket_batch_scanner(self, bucket: TableBucket) -> Result` | Bounded scan of one bucket (requires `limit`; runs on first `next_batch`) | ## `LogScanner` @@ -211,6 +213,19 @@ bucket identity per batch, use `next_batch` instead. | `fn next(&mut self) -> Option>` | Iterator: next batch, or `None` when caught up | | `fn schema(&self) -> SchemaRef` | Arrow schema for produced batches | +## `LimitBatchScanner` + +One-shot bounded scanner from `TableScan::limit(n).create_bucket_batch_scanner(bucket)`. +Poll it with `next_batch` until it returns `None` (mirrors `RecordBatchLogReader`). +Supports both log and primary-key tables (the latter returns the current, +server-deduplicated state); yields a single batch of at most `n` rows. + +| Method | Description | +|---------------------------------------------------------------|--------------------------------------| +| `async fn next_batch(&mut self) -> Result>` | Rows on the first call, `None` after | +| `async fn collect_all_batches(&mut self) -> Result>` | Drain into all batches | +| `fn bucket(&self) -> &TableBucket` | The scanned bucket | + ## `ScanRecord` | Method | Description | diff --git a/website/docs/user-guide/rust/example/log-tables.md b/website/docs/user-guide/rust/example/log-tables.md index 04857796..e77c8c6c 100644 --- a/website/docs/user-guide/rust/example/log-tables.md +++ b/website/docs/user-guide/rust/example/log-tables.md @@ -153,3 +153,20 @@ let scanner = table.new_scan() .project_by_name(&["event_id", "timestamp"])? .create_log_scanner()?; ``` + +## Limit Scan + +For a bounded read of up to `n` rows from a single bucket, use a batch scanner +instead of subscribing. It issues one request; poll it with `next_batch` until +it returns `None`. + +```rust +let bucket = TableBucket::new(table.get_table_info().table_id, 0); +let mut scanner = table.new_scan().limit(10)?.create_bucket_batch_scanner(bucket)?; + +while let Some(batch) = scanner.next_batch().await? { + println!("rows: {}", batch.batch().num_rows()); +} +``` + +Limit applies per bucket; scan each bucket to cover a multi-bucket table. diff --git a/website/docs/user-guide/rust/example/primary-key-tables.md b/website/docs/user-guide/rust/example/primary-key-tables.md index 82a07c4c..01836e29 100644 --- a/website/docs/user-guide/rust/example/primary-key-tables.md +++ b/website/docs/user-guide/rust/example/primary-key-tables.md @@ -124,3 +124,18 @@ println!("Rows: {}", batch.num_rows()); ## Prefix Lookup To fetch all rows sharing a common primary-key prefix (by choosing a bucket key that's a strict prefix of the primary key), see [Prefix Lookup](./prefix-lookup.md). + +## Limit Scan + +To read up to `n` rows of a bucket's current state without supplying keys, use a batch scanner. The server returns the deduplicated current rows as Arrow batches, which is convenient for previews or DataFusion sources. + +```rust +let bucket = TableBucket::new(table.get_table_info().table_id, 0); +let mut scanner = table.new_scan().limit(10)?.create_bucket_batch_scanner(bucket)?; + +while let Some(batch) = scanner.next_batch().await? { + println!("rows: {}", batch.batch().num_rows()); +} +``` + +Limit applies per bucket; scan each bucket to cover a multi-bucket table.