Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions crates/integrations/datafusion/src/procedures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
//! - `CALL sys.rollback_to(table => '...', snapshot_id => ... | tag => '...')`
//! - `CALL sys.rollback_to_timestamp(table => '...', timestamp => ...)`
//! - `CALL sys.create_tag_from_timestamp(table => '...', tag => '...', timestamp => ...)`
//! - `CALL sys.create_global_index(table => '...', index_column => '...', index_type => 'btree')`
//! - `CALL sys.create_lumina_index(table => '...', index_column => '...')`

use std::collections::HashMap;
Expand Down Expand Up @@ -148,6 +149,7 @@ pub async fn execute_call(
"create_tag_from_timestamp" => {
proc_create_tag_from_timestamp(ctx, catalog, catalog_name, &args).await
}
"create_global_index" => proc_create_global_index(ctx, catalog, catalog_name, &args).await,
"create_lumina_index" => proc_create_lumina_index(ctx, catalog, catalog_name, &args).await,
_ => Err(DataFusionError::Plan(format!(
"Unknown procedure: {proc_name}"
Expand Down Expand Up @@ -527,6 +529,35 @@ async fn proc_create_lumina_index(
ok_result(ctx)
}

async fn proc_create_global_index(
ctx: &SessionContext,
catalog: &Arc<dyn Catalog>,
catalog_name: &str,
args: &HashMap<String, String>,
) -> DFResult<DataFrame> {
let table = get_table(catalog, catalog_name, args).await?;
let index_column = require_arg(args, "index_column")?;
let index_type = args
.get("index_type")
.map(String::as_str)
.unwrap_or("btree");
if !index_type.eq_ignore_ascii_case("btree") {
return Err(DataFusionError::NotImplemented(format!(
"create_global_index only supports index_type => 'btree', got '{index_type}'"
)));
}
if args.contains_key("options") {
return Err(DataFusionError::NotImplemented(
"create_global_index options are not supported for btree yet".to_string(),
));
}

let mut builder = table.new_btree_global_index_build_builder();
builder.with_index_column(index_column);
builder.execute().await.map_err(to_datafusion_error)?;
ok_result(ctx)
}

fn parse_key_value_options(options: &str) -> DFResult<HashMap<String, String>> {
let mut parsed = HashMap::new();
for entry in options.split(',').map(str::trim).filter(|s| !s.is_empty()) {
Expand Down
146 changes: 145 additions & 1 deletion crates/integrations/datafusion/tests/procedures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

mod common;

use common::{assert_sql_error, exec, row_count, setup_sql_context};
use common::{assert_sql_error, collect_id_name, exec, row_count, setup_sql_context};

async fn setup_table_with_snapshots() -> (tempfile::TempDir, paimon_datafusion::SQLContext) {
let (tmp, sql_context) = setup_sql_context().await;
Expand Down Expand Up @@ -121,6 +121,150 @@ async fn test_create_lumina_index_rejects_invalid_options() {
.await;
}

async fn setup_btree_global_index_table(
table_name: &str,
) -> (tempfile::TempDir, paimon_datafusion::SQLContext) {
let (tmp, sql_context) = setup_sql_context().await;
exec(
&sql_context,
&format!(
"CREATE TABLE paimon.test_db.{table_name} (id INT, name VARCHAR(100)) WITH (\
'row-tracking.enabled' = 'true',\
'data-evolution.enabled' = 'true',\
'global-index.enabled' = 'true',\
'sorted-index.records-per-range' = '10'\
)"
),
)
.await;
(tmp, sql_context)
}

#[tokio::test]
async fn test_create_global_index_requires_index_column() {
let (_tmp, sql_context) = setup_btree_global_index_table("btree_missing_col").await;

assert_sql_error(
&sql_context,
"CALL sys.create_global_index(table => 'test_db.btree_missing_col')",
"Missing required argument: 'index_column'",
)
.await;
}

#[tokio::test]
async fn test_create_global_index_rejects_non_btree() {
let (_tmp, sql_context) = setup_btree_global_index_table("btree_bad_type").await;

assert_sql_error(
&sql_context,
"CALL sys.create_global_index(table => 'test_db.btree_bad_type', index_column => 'id', index_type => 'bitmap')",
"only supports index_type => 'btree'",
)
.await;
}

#[tokio::test]
async fn test_create_global_index_rejects_options() {
let (_tmp, sql_context) = setup_btree_global_index_table("btree_options").await;

assert_sql_error(
&sql_context,
"CALL sys.create_global_index(table => 'test_db.btree_options', index_column => 'id', options => 'x=y')",
"options are not supported",
)
.await;
}

#[tokio::test]
async fn test_create_global_index_builds_btree_and_filter_reads() {
let (_tmp, sql_context) = setup_btree_global_index_table("btree_build").await;
exec(
&sql_context,
"INSERT INTO paimon.test_db.btree_build (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
)
.await;

exec(
&sql_context,
"CALL sys.create_global_index(table => 'test_db.btree_build', index_column => 'id', index_type => 'btree')",
)
.await;

let index_count = row_count(
&sql_context,
"SELECT * FROM paimon.test_db.`btree_build$table_indexes` \
WHERE index_type = 'btree' AND row_range_start = 0 AND row_range_end = 2 \
AND index_field_name = 'id'",
)
.await;
assert_eq!(index_count, 1);

let rows = collect_id_name(
&sql_context,
"SELECT id, name FROM paimon.test_db.btree_build WHERE id = 2",
)
.await;
assert_eq!(rows, vec![(2, "bob".to_string())]);
}

#[tokio::test]
async fn test_create_global_index_fast_full_detail_after_append() {
let (_tmp, sql_context) = setup_btree_global_index_table("btree_coverage").await;
exec(
&sql_context,
"INSERT INTO paimon.test_db.btree_coverage (id, name) VALUES (1, 'alice'), (2, 'bob')",
)
.await;
exec(
&sql_context,
"CALL sys.create_global_index(table => 'test_db.btree_coverage', index_column => 'id')",
)
.await;
exec(
&sql_context,
"INSERT INTO paimon.test_db.btree_coverage (id, name) VALUES (3, 'carol')",
)
.await;

let fast_rows = collect_id_name(
&sql_context,
"SELECT id, name FROM paimon.test_db.btree_coverage WHERE id >= 2",
)
.await;
assert_eq!(fast_rows, vec![(2, "bob".to_string())]);

exec(
&sql_context,
"SET 'paimon.global-index.search-mode' = 'full'",
)
.await;
let full_rows = collect_id_name(
&sql_context,
"SELECT id, name FROM paimon.test_db.btree_coverage WHERE id >= 2",
)
.await;
assert_eq!(
full_rows,
vec![(2, "bob".to_string()), (3, "carol".to_string())]
);

exec(
&sql_context,
"SET 'paimon.global-index.search-mode' = 'detail'",
)
.await;
let detail_rows = collect_id_name(
&sql_context,
"SELECT id, name FROM paimon.test_db.btree_coverage WHERE id >= 2",
)
.await;
assert_eq!(
detail_rows,
vec![(2, "bob".to_string()), (3, "carol".to_string())]
);
}

#[tokio::test]
async fn test_create_tag_already_exists() {
let (_tmp, sql_context) = setup_table_with_snapshots().await;
Expand Down
1 change: 1 addition & 0 deletions crates/paimon/src/btree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ mod sst_file;
mod var_len;
mod writer;

pub use block::BlockCompressionType;
pub use footer::BTreeFileFooter;
pub use key_serde::{make_key_comparator, serialize_datum};
pub use meta::BTreeIndexMeta;
Expand Down
43 changes: 43 additions & 0 deletions crates/paimon/src/spec/core_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const GLOBAL_INDEX_ENABLED_OPTION: &str = "global-index.enabled";
const GLOBAL_INDEX_SEARCH_MODE_OPTION: &str = "global-index.search-mode";
const GLOBAL_INDEX_ROW_COUNT_PER_SHARD_OPTION: &str = "global-index.row-count-per-shard";
const GLOBAL_INDEX_COLUMN_UPDATE_ACTION_OPTION: &str = "global-index.column-update-action";
const SORTED_INDEX_RECORDS_PER_RANGE_OPTION: &str = "sorted-index.records-per-range";
const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size";
const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost";
const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name";
Expand Down Expand Up @@ -417,6 +418,22 @@ impl<'a> CoreOptions<'a> {
Ok(value)
}

pub fn sorted_index_records_per_range(&self) -> crate::Result<i64> {
let value = self
.parse_i64_option(SORTED_INDEX_RECORDS_PER_RANGE_OPTION)?
.unwrap_or(DEFAULT_GLOBAL_INDEX_ROW_COUNT_PER_SHARD);
if value <= 0 {
return Err(crate::Error::DataInvalid {
message: format!(
"Option '{}' must be greater than 0, got: {}",
SORTED_INDEX_RECORDS_PER_RANGE_OPTION, value
),
source: None,
});
}
Ok(value)
}

pub fn global_index_column_update_action(
&self,
) -> crate::Result<GlobalIndexColumnUpdateAction> {
Expand Down Expand Up @@ -815,6 +832,10 @@ mod tests {
core_options.global_index_row_count_per_shard().unwrap(),
100_000
);
assert_eq!(
core_options.sorted_index_records_per_range().unwrap(),
100_000
);
assert_eq!(
core_options.global_index_column_update_action().unwrap(),
GlobalIndexColumnUpdateAction::ThrowError
Expand All @@ -840,6 +861,10 @@ mod tests {
GLOBAL_INDEX_ROW_COUNT_PER_SHARD_OPTION.to_string(),
"2048".to_string(),
),
(
SORTED_INDEX_RECORDS_PER_RANGE_OPTION.to_string(),
"4096".to_string(),
),
(
GLOBAL_INDEX_COLUMN_UPDATE_ACTION_OPTION.to_string(),
"DROP_PARTITION_INDEX".to_string(),
Expand All @@ -857,6 +882,7 @@ mod tests {
core_options.global_index_row_count_per_shard().unwrap(),
2048
);
assert_eq!(core_options.sorted_index_records_per_range().unwrap(), 4096);
assert_eq!(
core_options.global_index_column_update_action().unwrap(),
GlobalIndexColumnUpdateAction::DropPartitionIndex
Expand Down Expand Up @@ -912,6 +938,23 @@ mod tests {
}
}

#[test]
fn test_sorted_index_records_per_range_rejects_invalid_values() {
for value in ["0", "-1", "abc"] {
let options = HashMap::from([(
SORTED_INDEX_RECORDS_PER_RANGE_OPTION.to_string(),
value.to_string(),
)]);
let core = CoreOptions::new(&options);

let err = core
.sorted_index_records_per_range()
.expect_err("invalid records-per-range should fail");
assert!(matches!(err, crate::Error::DataInvalid { message, .. }
if message.contains(SORTED_INDEX_RECORDS_PER_RANGE_OPTION)));
}
}

#[test]
fn test_parse_memory_size() {
assert_eq!(parse_memory_size("1024"), Some(1024));
Expand Down
Loading
Loading