diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 3508362a..077b06c6 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -18,6 +18,7 @@ //! Paimon table provider for DataFusion. use std::any::Any; +use std::fmt::Write as _; use std::sync::Arc; use async_trait::async_trait; @@ -29,6 +30,7 @@ use datafusion::error::Result as DFResult; use datafusion::logical_expr::dml::InsertOp; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; +use paimon::spec::DataType; use paimon::table::Table; use crate::physical_plan::PaimonDataSink; @@ -54,6 +56,7 @@ use crate::runtime::await_with_runtime; pub struct PaimonTableProvider { table: Table, schema: ArrowSchemaRef, + table_definition: String, } impl PaimonTableProvider { @@ -72,7 +75,12 @@ impl PaimonTableProvider { } let schema = paimon::arrow::build_target_arrow_schema(&fields).map_err(to_datafusion_error)?; - Ok(Self { table, schema }) + let table_definition = build_table_definition(&table); + Ok(Self { + table, + schema, + table_definition, + }) } pub fn try_new_with_blob_reader_registry( @@ -89,6 +97,131 @@ impl PaimonTableProvider { } } +/// Build a `CREATE TABLE` DDL string for a Paimon table. +/// +/// Mirrors the syntax accepted by `SQLContext::handle_create_table`: +/// `CREATE TABLE . (, ..., PRIMARY KEY (...)) [PARTITIONED BY (...)] [WITH ('k'='v', ...)]`. +fn build_table_definition(table: &Table) -> String { + let identifier = table.identifier(); + let schema = table.schema(); + let mut ddl = String::new(); + let _ = write!( + ddl, + "CREATE TABLE {}.{} (", + identifier.database(), + identifier.object() + ); + + for (i, field) in schema.fields().iter().enumerate() { + if i > 0 { + ddl.push_str(", "); + } + // `NOT NULL` is a column constraint; render it here at the column + // level rather than inside nested type arguments (see `data_type_to_sql`). + let ty = data_type_to_sql(field.data_type()); + if field.data_type().is_nullable() { + let _ = write!(ddl, "{} {}", field.name(), ty); + } else { + let _ = write!(ddl, "{} {} NOT NULL", field.name(), ty); + } + } + + let pks = schema.primary_keys(); + if !pks.is_empty() { + ddl.push_str(", PRIMARY KEY ("); + for (i, pk) in pks.iter().enumerate() { + if i > 0 { + ddl.push_str(", "); + } + let _ = write!(ddl, "{}", pk); + } + ddl.push(')'); + } + ddl.push(')'); + + let partition_keys = schema.partition_keys(); + if !partition_keys.is_empty() { + ddl.push_str(" PARTITIONED BY ("); + for (i, pk) in partition_keys.iter().enumerate() { + if i > 0 { + ddl.push_str(", "); + } + let _ = write!(ddl, "{}", pk); + } + ddl.push(')'); + } + + let options = schema.options(); + if !options.is_empty() { + ddl.push_str(" WITH ("); + for (i, (k, v)) in options.iter().enumerate() { + if i > 0 { + ddl.push_str(", "); + } + let _ = write!(ddl, "'{}' = '{}'", k, v); + } + ddl.push(')'); + } + + ddl +} + +/// Render a Paimon [`DataType`] as a SQL type string matching the syntax +/// accepted by paimon-rust's `CREATE TABLE` parser. +/// +/// `NOT NULL` is a column constraint, not a type modifier — it is only valid +/// at the top of a column definition, not nested inside `MAP`, `ARRAY`, or +/// `STRUCT` arguments. Callers that render a column should append `NOT NULL` +/// themselves when the field is non-nullable; recursive calls below must not. +fn data_type_to_sql(data_type: &DataType) -> String { + match data_type { + DataType::Boolean(_) => "BOOLEAN".to_string(), + DataType::TinyInt(_) => "TINYINT".to_string(), + DataType::SmallInt(_) => "SMALLINT".to_string(), + DataType::Int(_) => "INT".to_string(), + DataType::BigInt(_) => "BIGINT".to_string(), + DataType::Decimal(t) => format!("DECIMAL({}, {})", t.precision(), t.scale()), + DataType::Double(_) => "DOUBLE".to_string(), + DataType::Float(_) => "FLOAT".to_string(), + DataType::Binary(t) => format!("BINARY({})", t.length()), + DataType::VarBinary(t) => format!("VARBINARY({})", t.length()), + DataType::Blob(_) => "BLOB".to_string(), + DataType::Char(t) => format!("CHAR({})", t.length()), + DataType::VarChar(t) => format!("VARCHAR({})", t.length()), + DataType::Date(_) => "DATE".to_string(), + DataType::Time(t) => format!("TIME({})", t.precision()), + DataType::Timestamp(t) => format!("TIMESTAMP({})", t.precision()), + DataType::LocalZonedTimestamp(t) => format!("TIMESTAMP_LTZ({})", t.precision()), + DataType::Array(t) => format!("ARRAY<{}>", data_type_to_sql(t.element_type())), + DataType::Map(t) => format!( + "MAP({}, {})", + data_type_to_sql(t.key_type()), + data_type_to_sql(t.value_type()) + ), + DataType::Multiset(t) => format!("MULTISET<{}>", data_type_to_sql(t.element_type())), + DataType::Row(t) => { + let inner: Vec = t + .fields() + .iter() + .map(|f| { + let ty = data_type_to_sql(f.data_type()); + if f.name().is_empty() { + ty + } else { + format!("{} {}", f.name(), ty) + } + }) + .collect(); + format!("STRUCT<{}>", inner.join(", ")) + } + DataType::Vector(t) => format!( + "VECTOR<{}, {}>", + data_type_to_sql(t.element_type()), + t.length() + ), + } +} + /// Distribute `items` into `num_buckets` groups using round-robin assignment. pub(crate) fn bucket_round_robin(items: Vec, num_buckets: usize) -> Vec> { let mut buckets: Vec> = (0..num_buckets).map(|_| Vec::new()).collect(); @@ -167,6 +300,10 @@ impl TableProvider for PaimonTableProvider { TableType::Base } + fn get_table_definition(&self) -> Option<&str> { + Some(&self.table_definition) + } + async fn scan( &self, state: &dyn Session, diff --git a/crates/integrations/datafusion/tests/sql_context_tests.rs b/crates/integrations/datafusion/tests/sql_context_tests.rs index d4f0c1da..ae062bc4 100644 --- a/crates/integrations/datafusion/tests/sql_context_tests.rs +++ b/crates/integrations/datafusion/tests/sql_context_tests.rs @@ -1262,3 +1262,283 @@ async fn test_multiple_temporary_views_in_same_database() { .sum::(); assert_eq!(rows2, 3); } + +// ======================= SHOW CREATE TABLE ======================= + +/// Collect the `definition` column from `SHOW CREATE TABLE` output as a String. +async fn collect_definition(sql_context: &SQLContext, table_ref: &str) -> String { + let rows = sql_context + .sql(&format!("SHOW CREATE TABLE {}", table_ref)) + .await + .expect("SHOW CREATE TABLE should plan") + .collect() + .await + .expect("SHOW CREATE TABLE should execute"); + assert_eq!( + rows.len(), + 1, + "SHOW CREATE TABLE should return exactly one row" + ); + let row = &rows[0]; + assert_eq!( + row.num_rows(), + 1, + "SHOW CREATE TABLE should return exactly one row" + ); + let val = row.column(3); // definition is the 4th column + let def = val + .as_any() + .downcast_ref::() + .expect("definition column should be a StringArray") + .value(0); + def.to_string() +} + +#[tokio::test] +async fn test_show_create_table_simple() { + let (_tmp, catalog) = create_test_env(); + let sql_context = create_sql_context(catalog).await; + + sql_context + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA should succeed"); + sql_context + .sql("CREATE TABLE paimon.test_db.t (id INT, name VARCHAR(100))") + .await + .expect("CREATE TABLE should succeed"); + + let definition = collect_definition(&sql_context, "paimon.test_db.t").await; + assert!( + definition.contains("CREATE TABLE test_db.t"), + "definition should start with CREATE TABLE test_db.t, got: {definition}" + ); + assert!( + definition.contains("id INT"), + "definition should contain `id INT`, got: {definition}" + ); + assert!( + definition.contains("name VARCHAR("), + "definition should contain `name VARCHAR(...)`, got: {definition}" + ); +} + +#[tokio::test] +async fn test_show_create_table_with_primary_key() { + let (_tmp, catalog) = create_test_env(); + let sql_context = create_sql_context(catalog).await; + + sql_context + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA should succeed"); + sql_context + .sql("CREATE TABLE paimon.test_db.t (id INT NOT NULL, name VARCHAR, PRIMARY KEY (id))") + .await + .expect("CREATE TABLE should succeed"); + + let definition = collect_definition(&sql_context, "paimon.test_db.t").await; + assert!( + definition.contains("PRIMARY KEY (id)"), + "definition should contain PRIMARY KEY (id), got: {definition}" + ); +} + +#[tokio::test] +async fn test_show_create_table_with_partition_and_options() { + let (_tmp, catalog) = create_test_env(); + let sql_context = create_sql_context(catalog).await; + + sql_context + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA should succeed"); + sql_context + .sql( + "CREATE TABLE paimon.test_db.t (id INT, name VARCHAR, pt INT) \ + PARTITIONED BY (pt) WITH ('bucket' = '4', 'file.format' = 'parquet')", + ) + .await + .expect("CREATE TABLE should succeed"); + + let definition = collect_definition(&sql_context, "paimon.test_db.t").await; + assert!( + definition.contains("PARTITIONED BY (pt)"), + "definition should contain PARTITIONED BY (pt), got: {definition}" + ); + assert!( + definition.contains("'bucket' = '4'"), + "definition should contain bucket option, got: {definition}" + ); + assert!( + definition.contains("'file.format' = 'parquet'"), + "definition should contain file.format option, got: {definition}" + ); +} + +#[tokio::test] +async fn test_show_create_table_various_types() { + let (_tmp, catalog) = create_test_env(); + let sql_context = create_sql_context(catalog).await; + + sql_context + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA should succeed"); + sql_context + .sql( + "CREATE TABLE paimon.test_db.t (\ + a BOOLEAN, \ + b TINYINT, \ + c SMALLINT, \ + d BIGINT, \ + e DECIMAL(10, 2), \ + f DOUBLE, \ + g FLOAT, \ + h DATE, \ + i TIMESTAMP(3), \ + j BLOB) \ + WITH ('data-evolution.enabled' = 'true')", + ) + .await + .expect("CREATE TABLE should succeed"); + + let definition = collect_definition(&sql_context, "paimon.test_db.t").await; + for needle in [ + "a BOOLEAN", + "b TINYINT", + "c SMALLINT", + "d BIGINT", + "e DECIMAL(10, 2)", + "f DOUBLE", + "g FLOAT", + "h DATE", + "i TIMESTAMP(3)", + "j BLOB", + ] { + assert!( + definition.contains(needle), + "definition should contain `{needle}`, got: {definition}" + ); + } +} + +/// Assert that two `TableSchema`s are equivalent for round-trip purposes: +/// same fields (id, name, type), same primary keys, same partition keys. +/// +/// We do not compare `options` because the CREATE TABLE path may inject +/// catalog defaults (e.g. `bucket`) that the user did not specify; the +/// schema fields and key columns are what the DDL must preserve. +fn assert_schema_equivalent(left: &paimon::spec::TableSchema, right: &paimon::spec::TableSchema) { + assert_eq!( + left.fields().len(), + right.fields().len(), + "field count mismatch\nleft (original): {:?}\nright (recreated): {:?}", + left.fields(), + right.fields() + ); + for (lf, rf) in left.fields().iter().zip(right.fields().iter()) { + assert_eq!( + lf.id(), + rf.id(), + "field id mismatch for `{}`: {} vs {}", + lf.name(), + lf.id(), + rf.id() + ); + assert_eq!( + lf.name(), + rf.name(), + "field name mismatch: `{}` vs `{}`", + lf.name(), + rf.name() + ); + assert_eq!( + lf.data_type(), + rf.data_type(), + "field type mismatch for `{}`: {:?} vs {:?}", + lf.name(), + lf.data_type(), + rf.data_type() + ); + } + assert_eq!( + left.primary_keys(), + right.primary_keys(), + "primary keys mismatch: {:?} vs {:?}", + left.primary_keys(), + right.primary_keys() + ); + assert_eq!( + left.partition_keys(), + right.partition_keys(), + "partition keys mismatch: {:?} vs {:?}", + left.partition_keys(), + right.partition_keys() + ); +} + +/// Round-trip test: the DDL returned by `SHOW CREATE TABLE` must be executable +/// by paimon-rust's own `CREATE TABLE` parser and reproduce an equivalent +/// schema (fields, primary keys, partition keys). +/// +/// This guards against regressions where the rendered DDL drifts away from +/// what the parser accepts (e.g. `ROW` vs `STRUCT`, +/// `MAP` vs `MAP(k, v)`, or dropped `NOT NULL`). +#[tokio::test] +async fn test_show_create_table_round_trip() { + let (_tmp, catalog) = create_test_env(); + let sql_context = create_sql_context(catalog.clone()).await; + + sql_context + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA should succeed"); + sql_context + .sql( + "CREATE TABLE paimon.test_db.t1 (\ + id INT NOT NULL, \ + name VARCHAR NOT NULL, \ + tags ARRAY, \ + props MAP(INT, VARCHAR), \ + addr STRUCT, \ + meta STRUCT>, \ + PRIMARY KEY (id)) \ + PARTITIONED BY (name) \ + WITH ('bucket' = '2', 'file.format' = 'parquet')", + ) + .await + .expect("CREATE TABLE should succeed"); + + let identifier = Identifier::new("test_db", "t1"); + let original = catalog.get_table(&identifier).await.expect("table exists"); + let original_schema = original.schema().clone(); + + let definition = collect_definition(&sql_context, "paimon.test_db.t1").await; + // The DDL is rendered as `CREATE TABLE test_db.t1 (...)` without the + // catalog prefix; paimon is the default catalog so this resolves back + // to the same catalog/database. + assert!( + definition.starts_with("CREATE TABLE test_db.t1"), + "definition should start with `CREATE TABLE test_db.t1`, got: {definition}" + ); + + catalog + .drop_table(&identifier, false) + .await + .expect("drop should succeed"); + + sql_context + .sql(&definition) + .await + .expect("DDL should re-execute") + .collect() + .await + .expect("DDL should execute"); + + let recreated = catalog + .get_table(&identifier) + .await + .expect("recreated table exists"); + assert_schema_equivalent(&original_schema, recreated.schema()); +}