-
Notifications
You must be signed in to change notification settings - Fork 75
feat(datafusion): support SHOW CREATE TABLE via get_table_definition #444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
7a1d8f2
4eb932d
4795977
8dfcb93
f3b3c0d
c0ffdf6
ff4f2e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| //! Paimon table provider for DataFusion. | ||
|
|
||
| use std::any::Any; | ||
| use std::fmt::Write as _; | ||
| use std::sync::Arc; | ||
|
|
||
| use async_trait::async_trait; | ||
|
|
@@ -29,6 +30,7 @@ use datafusion::error::Result as DFResult; | |
| use datafusion::logical_expr::dml::InsertOp; | ||
| use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; | ||
| use datafusion::physical_plan::ExecutionPlan; | ||
| use paimon::spec::DataType; | ||
| use paimon::table::Table; | ||
|
|
||
| use crate::physical_plan::PaimonDataSink; | ||
|
|
@@ -54,6 +56,7 @@ use crate::runtime::await_with_runtime; | |
| pub struct PaimonTableProvider { | ||
| table: Table, | ||
| schema: ArrowSchemaRef, | ||
| table_definition: String, | ||
| } | ||
|
|
||
| impl PaimonTableProvider { | ||
|
|
@@ -72,7 +75,12 @@ impl PaimonTableProvider { | |
| } | ||
| let schema = | ||
| paimon::arrow::build_target_arrow_schema(&fields).map_err(to_datafusion_error)?; | ||
| Ok(Self { table, schema }) | ||
| let table_definition = build_table_definition(&table); | ||
| Ok(Self { | ||
| table, | ||
| schema, | ||
| table_definition, | ||
| }) | ||
| } | ||
|
|
||
| pub fn try_new_with_blob_reader_registry( | ||
|
|
@@ -89,6 +97,131 @@ impl PaimonTableProvider { | |
| } | ||
| } | ||
|
|
||
| /// Build a `CREATE TABLE` DDL string for a Paimon table. | ||
| /// | ||
| /// Mirrors the syntax accepted by `SQLContext::handle_create_table`: | ||
| /// `CREATE TABLE <db>.<table> (<col> <type>, ..., PRIMARY KEY (...)) [PARTITIONED BY (...)] [WITH ('k'='v', ...)]`. | ||
| fn build_table_definition(table: &Table) -> String { | ||
| let identifier = table.identifier(); | ||
| let schema = table.schema(); | ||
| let mut ddl = String::new(); | ||
| let _ = write!( | ||
| ddl, | ||
| "CREATE TABLE {}.{} (", | ||
| identifier.database(), | ||
| identifier.object() | ||
| ); | ||
|
|
||
| for (i, field) in schema.fields().iter().enumerate() { | ||
| if i > 0 { | ||
| ddl.push_str(", "); | ||
| } | ||
| // `NOT NULL` is a column constraint; render it here at the column | ||
| // level rather than inside nested type arguments (see `data_type_to_sql`). | ||
| let ty = data_type_to_sql(field.data_type()); | ||
| if field.data_type().is_nullable() { | ||
| let _ = write!(ddl, "{} {}", field.name(), ty); | ||
| } else { | ||
| let _ = write!(ddl, "{} {} NOT NULL", field.name(), ty); | ||
| } | ||
| } | ||
|
|
||
| let pks = schema.primary_keys(); | ||
| if !pks.is_empty() { | ||
| ddl.push_str(", PRIMARY KEY ("); | ||
| for (i, pk) in pks.iter().enumerate() { | ||
| if i > 0 { | ||
| ddl.push_str(", "); | ||
| } | ||
| let _ = write!(ddl, "{}", pk); | ||
| } | ||
| ddl.push(')'); | ||
| } | ||
| ddl.push(')'); | ||
|
|
||
| let partition_keys = schema.partition_keys(); | ||
| if !partition_keys.is_empty() { | ||
| ddl.push_str(" PARTITIONED BY ("); | ||
| for (i, pk) in partition_keys.iter().enumerate() { | ||
| if i > 0 { | ||
| ddl.push_str(", "); | ||
| } | ||
| let _ = write!(ddl, "{}", pk); | ||
| } | ||
| ddl.push(')'); | ||
| } | ||
|
|
||
| let options = schema.options(); | ||
| if !options.is_empty() { | ||
| ddl.push_str(" WITH ("); | ||
| for (i, (k, v)) in options.iter().enumerate() { | ||
| if i > 0 { | ||
| ddl.push_str(", "); | ||
| } | ||
| let _ = write!(ddl, "'{}' = '{}'", k, v); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs SQL string escaping before interpolation. Option values are arbitrary table metadata, so a value containing a single quote, for example |
||
| } | ||
| ddl.push(')'); | ||
| } | ||
|
|
||
| ddl | ||
| } | ||
|
|
||
| /// Render a Paimon [`DataType`] as a SQL type string matching the syntax | ||
| /// accepted by paimon-rust's `CREATE TABLE` parser. | ||
| /// | ||
| /// `NOT NULL` is a column constraint, not a type modifier — it is only valid | ||
| /// at the top of a column definition, not nested inside `MAP`, `ARRAY`, or | ||
| /// `STRUCT` arguments. Callers that render a column should append `NOT NULL` | ||
| /// themselves when the field is non-nullable; recursive calls below must not. | ||
| fn data_type_to_sql(data_type: &DataType) -> String { | ||
| match data_type { | ||
| DataType::Boolean(_) => "BOOLEAN".to_string(), | ||
| DataType::TinyInt(_) => "TINYINT".to_string(), | ||
| DataType::SmallInt(_) => "SMALLINT".to_string(), | ||
| DataType::Int(_) => "INT".to_string(), | ||
| DataType::BigInt(_) => "BIGINT".to_string(), | ||
| DataType::Decimal(t) => format!("DECIMAL({}, {})", t.precision(), t.scale()), | ||
| DataType::Double(_) => "DOUBLE".to_string(), | ||
| DataType::Float(_) => "FLOAT".to_string(), | ||
| DataType::Binary(t) => format!("BINARY({})", t.length()), | ||
| DataType::VarBinary(t) => format!("VARBINARY({})", t.length()), | ||
| DataType::Blob(_) => "BLOB".to_string(), | ||
| DataType::Char(t) => format!("CHAR({})", t.length()), | ||
| DataType::VarChar(t) => format!("VARCHAR({})", t.length()), | ||
| DataType::Date(_) => "DATE".to_string(), | ||
| DataType::Time(t) => format!("TIME({})", t.precision()), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This arm (and the |
||
| DataType::Timestamp(t) => format!("TIMESTAMP({})", t.precision()), | ||
| DataType::LocalZonedTimestamp(t) => format!("TIMESTAMP_LTZ({})", t.precision()), | ||
| DataType::Array(t) => format!("ARRAY<{}>", data_type_to_sql(t.element_type())), | ||
| DataType::Map(t) => format!( | ||
|
shyjsarah marked this conversation as resolved.
|
||
| "MAP({}, {})", | ||
| data_type_to_sql(t.key_type()), | ||
| data_type_to_sql(t.value_type()) | ||
| ), | ||
| DataType::Multiset(t) => format!("MULTISET<{}>", data_type_to_sql(t.element_type())), | ||
| DataType::Row(t) => { | ||
|
shyjsarah marked this conversation as resolved.
|
||
| let inner: Vec<String> = t | ||
| .fields() | ||
| .iter() | ||
| .map(|f| { | ||
| let ty = data_type_to_sql(f.data_type()); | ||
| if f.name().is_empty() { | ||
| ty | ||
| } else { | ||
| format!("{} {}", f.name(), ty) | ||
| } | ||
| }) | ||
| .collect(); | ||
| format!("STRUCT<{}>", inner.join(", ")) | ||
| } | ||
| DataType::Vector(t) => format!( | ||
| "VECTOR<{}, {}>", | ||
| data_type_to_sql(t.element_type()), | ||
| t.length() | ||
| ), | ||
| } | ||
| } | ||
|
|
||
| /// Distribute `items` into `num_buckets` groups using round-robin assignment. | ||
| pub(crate) fn bucket_round_robin<T>(items: Vec<T>, num_buckets: usize) -> Vec<Vec<T>> { | ||
| let mut buckets: Vec<Vec<T>> = (0..num_buckets).map(|_| Vec::new()).collect(); | ||
|
|
@@ -167,6 +300,10 @@ impl TableProvider for PaimonTableProvider { | |
| TableType::Base | ||
| } | ||
|
|
||
| fn get_table_definition(&self) -> Option<&str> { | ||
| Some(&self.table_definition) | ||
| } | ||
|
|
||
| async fn scan( | ||
| &self, | ||
| state: &dyn Session, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SHOW CREATE TABLEshould emit replayable SQL, but the table name is written without identifier quoting. A table created with quoted identifiers such asCREATE TABLE paimon.test_db."select" ("order" INT)would be rendered asCREATE TABLE test_db.select (order INT), which is invalid or changes the identifier semantics when re-executed. The same issue applies to column names, primary/partition keys, and nested struct field names below. Please quote identifiers when required and escape embedded quotes, then add a round-trip case with a reserved-word or otherwise quoted identifier.