diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 8bad7398..e2377e1d 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -22,7 +22,7 @@ mod table_test { ColumnPlan, array_dt_basics_columns, as_row_type, create_partitions, create_table, dt_array_int, dt_map_string_int, dt_row_seq_label, extract_ids_from_batches, get_shared_cluster, make_int_array, make_string_array, map_dt_basics_columns, - row_dt_basics_columns, scalar_dt_columns, + row_dt_basics_columns, scalar_dt_columns, wait_for_partitions_ready, wait_for_table_ready, }; use arrow::array::record_batch; use fluss::client::{EARLIEST_OFFSET, FlussTable, TableScan}; @@ -169,8 +169,7 @@ mod table_test { create_table(&admin, &table_path, &table_descriptor).await; - // Wait for table to be fully initialized - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + wait_for_table_ready(&admin, &table_path).await; // Test earliest offset (should be 0 for empty table) let earliest_offsets = admin @@ -475,7 +474,7 @@ mod table_test { &TableDescriptor::builder().schema(schema).build().unwrap(), ) .await; - tokio::time::sleep(Duration::from_secs(1)).await; + wait_for_table_ready(&admin, &table_path).await; let table = connection.get_table(&table_path).await.unwrap(); let scanner = table.new_scan().create_record_batch_log_scanner().unwrap(); @@ -595,8 +594,8 @@ mod table_test { // Create partitions create_partitions(&admin, &table_path, "region", &["US", "EU"]).await; - // Wait for partitions to be available - tokio::time::sleep(Duration::from_secs(2)).await; + // Wait for partition bucket leaders to be available. + wait_for_partitions_ready(&admin, &table_path, &["US", "EU"]).await; let table = connection .get_table(&table_path) diff --git a/crates/fluss/tests/integration/record_batch_log_reader.rs b/crates/fluss/tests/integration/record_batch_log_reader.rs index 13836453..cf89c065 100644 --- a/crates/fluss/tests/integration/record_batch_log_reader.rs +++ b/crates/fluss/tests/integration/record_batch_log_reader.rs @@ -21,6 +21,7 @@ mod reader_test { use crate::integration::utils::{ create_partitions, create_table, extract_ids_from_batches, get_shared_cluster, + wait_for_partitions_ready, wait_for_table_buckets_ready, wait_for_table_ready, }; use arrow::array::record_batch; use fluss::client::{EARLIEST_OFFSET, FlussConnection, RecordBatchLogReader}; @@ -48,7 +49,7 @@ mod reader_test { .build() .expect("Failed to build table"); create_table(&admin, &table_path, &table_descriptor).await; - tokio::time::sleep(Duration::from_secs(1)).await; + wait_for_table_ready(&admin, &table_path).await; let table = connection .get_table(&table_path) @@ -121,7 +122,7 @@ mod reader_test { .build() .expect("Failed to build table"); create_table(&admin, &table_path, &table_descriptor).await; - tokio::time::sleep(Duration::from_secs(1)).await; + wait_for_table_ready(&admin, &table_path).await; let table = connection .get_table(&table_path) @@ -189,7 +190,7 @@ mod reader_test { .build() .expect("Failed to build table"); create_table(&admin, &table_path, &table_descriptor).await; - tokio::time::sleep(Duration::from_secs(1)).await; + wait_for_table_ready(&admin, &table_path).await; let table = connection .get_table(&table_path) @@ -284,7 +285,7 @@ mod reader_test { .build() .expect("Failed to build table"); create_table(&admin, &table_path, &table_descriptor).await; - tokio::time::sleep(Duration::from_secs(1)).await; + wait_for_table_buckets_ready(&admin, &table_path, &[0, 1]).await; let table = connection .get_table(&table_path) @@ -380,7 +381,7 @@ mod reader_test { .expect("Failed to build table"); create_table(&admin, &table_path, &table_descriptor).await; - tokio::time::sleep(Duration::from_secs(1)).await; + wait_for_table_ready(&admin, &table_path).await; let table = connection .get_table(&table_path) @@ -453,7 +454,7 @@ mod reader_test { create_table(&admin, &table_path, &table_descriptor).await; create_partitions(&admin, &table_path, "region", &["US", "EU"]).await; - tokio::time::sleep(Duration::from_secs(2)).await; + wait_for_partitions_ready(&admin, &table_path, &["US", "EU"]).await; let table = connection .get_table(&table_path) diff --git a/crates/fluss/tests/integration/utils.rs b/crates/fluss/tests/integration/utils.rs index 934b6626..2ebe31d9 100644 --- a/crates/fluss/tests/integration/utils.rs +++ b/crates/fluss/tests/integration/utils.rs @@ -24,7 +24,9 @@ use fluss::metadata::{ use fluss::record::ScanBatch; use fluss::row::FlussArray; use fluss::row::binary_array::FlussArrayWriter; +use fluss::rpc::message::OffsetSpec; use std::collections::HashMap; +use std::future::Future; use std::sync::Arc; use std::sync::LazyLock; use std::time::Duration; @@ -100,6 +102,110 @@ pub async fn create_table( .expect("Failed to create table"); } +const READINESS_TIMEOUT: Duration = Duration::from_secs(30); +const READINESS_POLL_INTERVAL: Duration = Duration::from_millis(200); + +async fn poll_until( + timeout: Duration, + interval: Duration, + timeout_message: String, + mut probe: F, +) where + F: FnMut() -> Fut, + Fut: Future>, +{ + let start = std::time::Instant::now(); + + loop { + match probe().await { + Ok(()) => return, + Err(err) => { + if start.elapsed() >= timeout { + panic!( + "{timeout_message} after {} seconds. Last error: {err}", + timeout.as_secs() + ); + } + } + } + + tokio::time::sleep(interval).await; + } +} + +/// Waits until the default bucket of a non-partitioned table can serve offset requests. +/// +/// Newly-created tables may not have bucket leaders immediately. Polling list offsets avoids +/// fixed sleeps that are either flaky on slow CI or waste time when the cluster is ready sooner. +pub async fn wait_for_table_ready(admin: &FlussAdmin, table_path: &TablePath) { + wait_for_table_buckets_ready(admin, table_path, &[0]).await; +} + +/// Waits until the specified buckets of a non-partitioned table can serve offset requests. +pub async fn wait_for_table_buckets_ready( + admin: &FlussAdmin, + table_path: &TablePath, + buckets: &[i32], +) { + poll_until( + READINESS_TIMEOUT, + READINESS_POLL_INTERVAL, + format!("Timed out waiting for table '{table_path}' buckets {buckets:?} to become ready"), + || async { + admin + .list_offsets(table_path, buckets, OffsetSpec::Latest) + .await + .map(|_| ()) + .map_err(|err| format!("{err:?}")) + }, + ) + .await; +} + +/// Waits until all listed partition values can serve offset requests for the default bucket. +pub async fn wait_for_partitions_ready( + admin: &FlussAdmin, + table_path: &TablePath, + partition_values: &[&str], +) { + for partition_value in partition_values { + wait_for_partition_ready(admin, table_path, partition_value).await; + } +} + +/// Waits until one partition value can serve offset requests for the default bucket. +pub async fn wait_for_partition_ready( + admin: &FlussAdmin, + table_path: &TablePath, + partition_value: &str, +) { + wait_for_partition_buckets_ready(admin, table_path, partition_value, &[0]).await; +} + +/// Waits until the specified buckets of a partition can serve offset requests. +pub async fn wait_for_partition_buckets_ready( + admin: &FlussAdmin, + table_path: &TablePath, + partition_value: &str, + buckets: &[i32], +) { + poll_until( + READINESS_TIMEOUT, + READINESS_POLL_INTERVAL, + format!( + "Timed out waiting for table '{table_path}' partition '{partition_value}' buckets {buckets:?} to become ready" + ), + || async { + admin + .list_partition_offsets(table_path, partition_value, buckets, OffsetSpec::Latest) + .await + .map(|_| ()) + .map_err(|err| format!("{err:?}")) + }, + ) + .await; +} + pub fn make_string_array(values: &[Option<&str>]) -> FlussArray { let mut writer = FlussArrayWriter::new(values.len(), &DataTypes::string()); for (idx, value) in values.iter().enumerate() { @@ -141,38 +247,35 @@ pub fn extract_ids_from_batches(batches: &[ScanBatch]) -> Vec { /// Similar to wait_for_cluster_ready but connects with SASL credentials. pub async fn wait_for_cluster_ready_with_sasl(cluster: &FlussTestingCluster) { - let timeout = Duration::from_secs(30); - let poll_interval = Duration::from_millis(500); - let start = std::time::Instant::now(); - let (username, password) = cluster .sasl_users() .first() .expect("SASL cluster must have at least one user"); - loop { - let connection = cluster - .get_fluss_connection_with_sasl(username, password) - .await; - if connection - .get_metadata() - .get_cluster() - .get_one_available_server() - .is_some() - { - return; - } - - if start.elapsed() >= timeout { - panic!( - "SASL server readiness check timed out after {} seconds. \ - CoordinatorEventProcessor may not be initialized or TabletServer may not be available.", - timeout.as_secs() - ); - } - - tokio::time::sleep(poll_interval).await; - } + poll_until( + Duration::from_secs(30), + Duration::from_millis(500), + "SASL server readiness check timed out".to_string(), + || async { + let connection = cluster + .get_fluss_connection_with_sasl(username, password) + .await; + if connection + .get_metadata() + .get_cluster() + .get_one_available_server() + .is_some() + { + Ok(()) + } else { + Err( + "CoordinatorEventProcessor may not be initialized or TabletServer may not be available" + .to_string(), + ) + } + }, + ) + .await; } /// Creates partitions for a partitioned table.