Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 138 additions & 1 deletion crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Paimon table provider for DataFusion.

use std::any::Any;
use std::fmt::Write as _;
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -29,6 +30,7 @@ use datafusion::error::Result as DFResult;
use datafusion::logical_expr::dml::InsertOp;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_plan::ExecutionPlan;
use paimon::spec::DataType;
use paimon::table::Table;

use crate::physical_plan::PaimonDataSink;
Expand All @@ -54,6 +56,7 @@ use crate::runtime::await_with_runtime;
pub struct PaimonTableProvider {
table: Table,
schema: ArrowSchemaRef,
table_definition: String,
}

impl PaimonTableProvider {
Expand All @@ -72,7 +75,12 @@ impl PaimonTableProvider {
}
let schema =
paimon::arrow::build_target_arrow_schema(&fields).map_err(to_datafusion_error)?;
Ok(Self { table, schema })
let table_definition = build_table_definition(&table);
Ok(Self {
table,
schema,
table_definition,
})
}

pub fn try_new_with_blob_reader_registry(
Expand All @@ -89,6 +97,131 @@ impl PaimonTableProvider {
}
}

/// Build a `CREATE TABLE` DDL string for a Paimon table.
///
/// Mirrors the syntax accepted by `SQLContext::handle_create_table`:
/// `CREATE TABLE <db>.<table> (<col> <type>, ..., PRIMARY KEY (...)) [PARTITIONED BY (...)] [WITH ('k'='v', ...)]`.
fn build_table_definition(table: &Table) -> String {
let identifier = table.identifier();
let schema = table.schema();
let mut ddl = String::new();
let _ = write!(
ddl,
"CREATE TABLE {}.{} (",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SHOW CREATE TABLE should emit replayable SQL, but the table name is written without identifier quoting. A table created with quoted identifiers such as CREATE TABLE paimon.test_db."select" ("order" INT) would be rendered as CREATE TABLE test_db.select (order INT), which is invalid or changes the identifier semantics when re-executed. The same issue applies to column names, primary/partition keys, and nested struct field names below. Please quote identifiers when required and escape embedded quotes, then add a round-trip case with a reserved-word or otherwise quoted identifier.

identifier.database(),
identifier.object()
);

for (i, field) in schema.fields().iter().enumerate() {
if i > 0 {
ddl.push_str(", ");
}
// `NOT NULL` is a column constraint; render it here at the column
// level rather than inside nested type arguments (see `data_type_to_sql`).
let ty = data_type_to_sql(field.data_type());
if field.data_type().is_nullable() {
let _ = write!(ddl, "{} {}", field.name(), ty);
} else {
let _ = write!(ddl, "{} {} NOT NULL", field.name(), ty);
}
}

let pks = schema.primary_keys();
if !pks.is_empty() {
ddl.push_str(", PRIMARY KEY (");
for (i, pk) in pks.iter().enumerate() {
if i > 0 {
ddl.push_str(", ");
}
let _ = write!(ddl, "{}", pk);
}
ddl.push(')');
}
ddl.push(')');

let partition_keys = schema.partition_keys();
if !partition_keys.is_empty() {
ddl.push_str(" PARTITIONED BY (");
for (i, pk) in partition_keys.iter().enumerate() {
if i > 0 {
ddl.push_str(", ");
}
let _ = write!(ddl, "{}", pk);
}
ddl.push(')');
}

let options = schema.options();
if !options.is_empty() {
ddl.push_str(" WITH (");
for (i, (k, v)) in options.iter().enumerate() {
if i > 0 {
ddl.push_str(", ");
}
let _ = write!(ddl, "'{}' = '{}'", k, v);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs SQL string escaping before interpolation. Option values are arbitrary table metadata, so a value containing a single quote, for example WITH ('comment' = 'Bob's table'), makes the returned definition invalid when users copy or re-execute SHOW CREATE TABLE. Please render string literals with SQL escaping (doubling ' to '') for both keys and values, and add a round-trip test with an option value containing a quote.

}
ddl.push(')');
}

ddl
}

/// Render a Paimon [`DataType`] as a SQL type string matching the syntax
/// accepted by paimon-rust's `CREATE TABLE` parser.
///
/// `NOT NULL` is a column constraint, not a type modifier — it is only valid
/// at the top of a column definition, not nested inside `MAP`, `ARRAY`, or
/// `STRUCT` arguments. Callers that render a column should append `NOT NULL`
/// themselves when the field is non-nullable; recursive calls below must not.
fn data_type_to_sql(data_type: &DataType) -> String {
match data_type {
DataType::Boolean(_) => "BOOLEAN".to_string(),
DataType::TinyInt(_) => "TINYINT".to_string(),
DataType::SmallInt(_) => "SMALLINT".to_string(),
DataType::Int(_) => "INT".to_string(),
DataType::BigInt(_) => "BIGINT".to_string(),
DataType::Decimal(t) => format!("DECIMAL({}, {})", t.precision(), t.scale()),
DataType::Double(_) => "DOUBLE".to_string(),
DataType::Float(_) => "FLOAT".to_string(),
DataType::Binary(t) => format!("BINARY({})", t.length()),
DataType::VarBinary(t) => format!("VARBINARY({})", t.length()),
DataType::Blob(_) => "BLOB".to_string(),
DataType::Char(t) => format!("CHAR({})", t.length()),
DataType::VarChar(t) => format!("VARCHAR({})", t.length()),
DataType::Date(_) => "DATE".to_string(),
DataType::Time(t) => format!("TIME({})", t.precision()),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This arm (and the TIMESTAMP_LTZ, MULTISET, and VECTOR arms below) emits syntax that the current SQLContext cannot round-trip. sql_data_type_to_paimon_type has no SqlType::Time branch and falls through to Unsupported SQL data type; similarly TIMESTAMP_LTZ, MULTISET, and VECTOR are not accepted by that converter. A table loaded from existing Paimon metadata with any of these types will therefore return a SHOW CREATE TABLE definition that cannot be executed by paimon-rust. Please either add parser/converter support for the emitted syntax with round-trip tests, or avoid advertising these variants as replayable DDL.

DataType::Timestamp(t) => format!("TIMESTAMP({})", t.precision()),
DataType::LocalZonedTimestamp(t) => format!("TIMESTAMP_LTZ({})", t.precision()),
DataType::Array(t) => format!("ARRAY<{}>", data_type_to_sql(t.element_type())),
DataType::Map(t) => format!(
Comment thread
shyjsarah marked this conversation as resolved.
"MAP({}, {})",
data_type_to_sql(t.key_type()),
data_type_to_sql(t.value_type())
),
DataType::Multiset(t) => format!("MULTISET<{}>", data_type_to_sql(t.element_type())),
DataType::Row(t) => {
Comment thread
shyjsarah marked this conversation as resolved.
let inner: Vec<String> = t
.fields()
.iter()
.map(|f| {
let ty = data_type_to_sql(f.data_type());
if f.name().is_empty() {
ty
} else {
format!("{} {}", f.name(), ty)
}
})
.collect();
format!("STRUCT<{}>", inner.join(", "))
}
DataType::Vector(t) => format!(
"VECTOR<{}, {}>",
data_type_to_sql(t.element_type()),
t.length()
),
}
}

/// Distribute `items` into `num_buckets` groups using round-robin assignment.
pub(crate) fn bucket_round_robin<T>(items: Vec<T>, num_buckets: usize) -> Vec<Vec<T>> {
let mut buckets: Vec<Vec<T>> = (0..num_buckets).map(|_| Vec::new()).collect();
Expand Down Expand Up @@ -167,6 +300,10 @@ impl TableProvider for PaimonTableProvider {
TableType::Base
}

fn get_table_definition(&self) -> Option<&str> {
Some(&self.table_definition)
}

async fn scan(
&self,
state: &dyn Session,
Expand Down
Loading
Loading