From 6766e041641569dbf226d92b3139b5e6ddfac52f Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Wed, 20 May 2026 15:10:26 +0000 Subject: [PATCH 1/3] [SPARK-56618][SQL][TESTS] Add DSv2 join refresh tests for incrementally constructed queries Rebase onto latest master. Co-authored-by: Isaac --- .../ComposedColumnIdTableCatalog.scala | 2 + .../connector/catalog/InMemoryBaseTable.scala | 11 + ...nMemoryRowLevelOperationTableCatalog.scala | 2 + .../sql/connector/catalog/InMemoryTable.scala | 5 +- .../catalog/MixedColumnIdTableCatalog.scala | 2 + .../NullColumnIdInMemoryTableCatalog.scala | 2 + ...dAndNullColumnIdInMemoryTableCatalog.scala | 100 +++++ .../TypeChangeResetsColIdTableCatalog.scala | 1 + .../DataSourceV2DataFrameSuite.scala | 380 +++++++++++++++++- 9 files changed, 500 insertions(+), 5 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala index 0ef1a1970dea7..64488a76db7f3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala @@ -96,6 +96,7 @@ class ComposedColumnIdTableCatalog extends InMemoryTableCatalog { table.constraints, id = table.id) composedTable.alterTableWithData(table.data, table.schema) + composedTable.setVersionAndValidatedVersionFrom(table) tables.put(ident, composedTable) composedTable } @@ -157,6 +158,7 @@ class ComposedColumnIdTableCatalog extends InMemoryTableCatalog { alteredTable.constraints, id = alteredTable.id) composedTable.alterTableWithData(alteredTable.data, alteredTable.schema) + composedTable.setVersionAndValidatedVersionFrom(alteredTable) tables.put(ident, composedTable) composedTable } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index f582f3e408cb6..fd907e1726dc1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -97,6 +97,17 @@ abstract class InMemoryBaseTable( tableVersion = version.toInt } + /** + * Copies version and validated version from another table so that + * [[V2TableRefreshUtil]] can detect changes and refresh stale references. + */ + def setVersionAndValidatedVersionFrom(sourceTable: InMemoryBaseTable): Unit = { + setVersion(sourceTable.version()) + if (sourceTable.validatedVersion() != null) { + setValidatedVersion(sourceTable.validatedVersion()) + } + } + def increaseVersion(): Unit = { tableVersion += 1 } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala index 81f976dce510f..285a2891bc938 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala @@ -82,6 +82,7 @@ class InMemoryRowLevelOperationTableCatalog constraints = constraints, tableId = table.id) newTable.alterTableWithData(table.data, schema) + newTable.setVersionAndValidatedVersionFrom(table) tables.put(ident, newTable) @@ -123,6 +124,7 @@ class PartialSchemaEvolutionCatalog extends InMemoryRowLevelOperationTableCatalo properties = properties, constraints = table.constraints) newTable.alterTableWithData(table.data, table.schema) + newTable.setVersionAndValidatedVersionFrom(table) tables.put(ident, newTable) newTable } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala index 66db9c18fa981..c783bfbece149 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala @@ -158,10 +158,7 @@ class InMemoryTable( copiedTable.commits ++= commits.map(_.copy()) - copiedTable.setVersion(version()) - if (validatedVersion() != null) { - copiedTable.setValidatedVersion(validatedVersion()) - } + copiedTable.setVersionAndValidatedVersionFrom(this) copiedTable } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/MixedColumnIdTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/MixedColumnIdTableCatalog.scala index 70898c98afb88..f77cad3c077dc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/MixedColumnIdTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/MixedColumnIdTableCatalog.scala @@ -50,6 +50,7 @@ class MixedColumnIdTableCatalog extends InMemoryTableCatalog { id = table.id, nullIdNames = snapshot) mixedTable.alterTableWithData(table.data, table.schema) + mixedTable.setVersionAndValidatedVersionFrom(table) mixedTable } @@ -120,6 +121,7 @@ class MixedColumnIdInMemoryTable( dataMap.synchronized { copiedTable.alterTableWithData(data, schema) } + copiedTable.setVersionAndValidatedVersionFrom(this) copiedTable } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala index ee544f4da0e75..c26ce263c1f8b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala @@ -45,6 +45,7 @@ class NullColumnIdInMemoryTableCatalog extends InMemoryTableCatalog { constraints = table.constraints, id = table.id) nullColIdTable.alterTableWithData(table.data, table.schema) + nullColIdTable.setVersionAndValidatedVersionFrom(table) nullColIdTable } @@ -100,6 +101,7 @@ class NullColumnIdInMemoryTable( dataMap.synchronized { copiedTable.alterTableWithData(data, schema) } + copiedTable.setVersionAndValidatedVersionFrom(this) copiedTable } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala new file mode 100644 index 0000000000000..88f2f294a5a84 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import org.apache.spark.sql.connector.catalog.constraints.Constraint +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.internal.connector.ColumnImpl + +/** + * An [[InMemoryTableCatalog]] that strips both table IDs ([[Table.id]] + * returns null) and column IDs ([[Column.id]] returns null). This simulates + * connectors that support neither table nor column identity tracking. + * + * When both IDs are null, neither [[V2TableRefreshUtil.validateTableIdentity]] + * nor [[V2TableUtil.validateColumnIds]] fires, so drop/recreate of a table + * or drop/re-add of a column goes undetected. + */ +class NullTableIdAndNullColumnIdInMemoryTableCatalog extends InMemoryTableCatalog { + + private def toNullIdsTable( + table: InMemoryTable): NullTableIdAndNullColumnIdInMemoryTable = { + val nullTable = new NullTableIdAndNullColumnIdInMemoryTable( + name = table.name, + columns = table.columns(), + partitioning = table.partitioning, + properties = table.properties, + constraints = table.constraints) + nullTable.alterTableWithData(table.data, table.schema) + nullTable.setVersionAndValidatedVersionFrom(table) + nullTable + } + + override def createTable( + ident: Identifier, + info: TableInfo): Table = { + val table = super.createTable(ident, info).asInstanceOf[InMemoryTable] + val nullTable = toNullIdsTable(table) + tables.put(ident, nullTable) + nullTable + } + + override def alterTable(ident: Identifier, changes: TableChange*): Table = { + val table = super.alterTable(ident, changes: _*).asInstanceOf[InMemoryTable] + val nullTable = toNullIdsTable(table) + tables.put(ident, nullTable) + nullTable + } +} + +/** + * An [[InMemoryTable]] with both null table ID and null column IDs, + * simulating a connector that supports neither identity tracking mechanism. + */ +class NullTableIdAndNullColumnIdInMemoryTable( + name: String, + columns: Array[Column], + partitioning: Array[Transform], + properties: java.util.Map[String, String], + constraints: Array[Constraint] = Array.empty) + extends InMemoryTable( + name = name, + columns = columns, + partitioning = partitioning, + properties = properties, + constraints = constraints, + id = null) { + + override def columns(): Array[Column] = { + super.columns().map(_.asInstanceOf[ColumnImpl].copy(id = null)) + } + + override def copy(): Table = { + val copiedTable = new NullTableIdAndNullColumnIdInMemoryTable( + name, + columns(), + partitioning, + properties, + constraints) + dataMap.synchronized { + copiedTable.alterTableWithData(data, schema) + } + copiedTable.setVersionAndValidatedVersionFrom(this) + copiedTable + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala index e1c54994c5116..d68f2e62b1365 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala @@ -63,6 +63,7 @@ class TypeChangeResetsColIdTableCatalog extends InMemoryTableCatalog { alteredTable.constraints, id = alteredTable.id) tableWithResetIds.alterTableWithData(alteredTable.data, alteredTable.schema) + tableWithResetIds.setVersionAndValidatedVersionFrom(alteredTable) tables.put(ident, tableWithResetIds) tableWithResetIds } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 4fc93609fb41e..23fdfa703ad9a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} -import org.apache.spark.sql.connector.catalog.{BufferedRows, CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo, TypeChangeResetsColIdTableCatalog} +import org.apache.spark.sql.connector.catalog.{BufferedRows, CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdAndNullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo, TypeChangeResetsColIdTableCatalog} import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} import org.apache.spark.sql.connector.catalog.TableChange @@ -64,6 +64,9 @@ class DataSourceV2DataFrameSuite .set("spark.sql.catalog.nullcolidcat", classOf[NullColumnIdInMemoryTableCatalog].getName) .set("spark.sql.catalog.nullcolidcat.copyOnLoad", "true") + .set("spark.sql.catalog.nullbothidscat", + classOf[NullTableIdAndNullColumnIdInMemoryTableCatalog].getName) + .set("spark.sql.catalog.nullbothidscat.copyOnLoad", "true") .set("spark.sql.catalog.resetidcat", classOf[TypeChangeResetsColIdTableCatalog].getName) .set("spark.sql.catalog.resetidcat.copyOnLoad", "true") @@ -3948,6 +3951,381 @@ class DataSourceV2DataFrameSuite } } + /** Append a single row to the table via the catalog API, bypassing SQL. */ + private def externalAppend( + catalogName: String, + ident: Identifier, + schema: StructType, + row: InternalRow): Unit = { + val extTable = catalog(catalogName).loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(row))) + } + + // Incrementally constructed queries: join scenarios. + // df1 and df2 are analyzed at different times, then joined. The refresh + // phase in QueryExecution must align table versions across all references. + + // Scenario 1: join after insert refreshes both sides to latest version + test("SPARK-54157: join refreshes both sides after external insert" + + " (table with both table and column ID support)") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + val df1 = spark.table(t) + + // external writer adds (2, 200) via direct catalog API + externalAppend(catalogName = "testcat", ident = ident, + schema = StructType.fromDDL("id INT, salary INT"), row = InternalRow(2, 200)) + + val df2 = spark.table(t) + + // both sides refresh to latest version + checkAnswer( + df1.join(df2, df1("id") === df2("id")), + Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200))) + } + } + + test("SPARK-54157: join refreshes both sides after same-session insert" + + " (table with both table and column ID support)") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + val df1 = spark.table(t) + + // session insert via SQL + sql(s"INSERT INTO $t VALUES (2, 200)") + + val df2 = spark.table(t) + + checkAnswer( + df1.join(df2, df1("id") === df2("id")), + Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200))) + } + } + + // Scenario 2: join after ADD COLUMN refreshes versions but df1 keeps original schema + test("SPARK-54157: join after external ADD COLUMN preserves df1 schema" + + " (table with both table and column ID support)") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + val df1 = spark.table(t) + + // external schema change via catalog API + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + catalog("testcat").alterTable(ident, addCol) + + // external writer adds (2, 200, -1) with new schema + externalAppend(catalogName = "testcat", ident = ident, + schema = StructType.fromDDL("id INT, salary INT, new_column INT"), + row = InternalRow(2, 200, -1)) + + val df2 = spark.table(t) + + // df1 keeps 2-col schema, df2 has 3-col schema, both see latest data + checkAnswer( + df1.join(df2, df1("id") === df2("id")), + Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1))) + } + } + + test("SPARK-54157: join after same-session ADD COLUMN preserves df1 schema" + + " (table with both table and column ID support)") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + val df1 = spark.table(t) + + // session schema change + data via SQL + sql(s"ALTER TABLE $t ADD COLUMN new_column INT") + sql(s"INSERT INTO $t VALUES (2, 200, -1)") + + val df2 = spark.table(t) + + // df1 keeps 2-col schema, df2 has 3-col schema, both see latest data + checkAnswer( + df1.join(df2, df1("id") === df2("id")), + Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1))) + } + } + + // Scenario 3: join after DROP COLUMN fails with analysis exception + test("SPARK-54157: join after external DROP COLUMN fails" + + " (table with both table and column ID support)") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + val df1 = spark.table(t) + + // external column removal via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + catalog("testcat").alterTable(ident, dropCol) + + // external writer adds (2) with 1-col schema + externalAppend(catalogName = "testcat", ident = ident, + schema = StructType.fromDDL("id INT"), row = InternalRow(2)) + + val df2 = spark.table(t) + + checkError( + exception = intercept[AnalysisException] { + df1.join(df2, df1("id") === df2("id")).collect() + }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", + matchPVals = true, + parameters = Map("tableName" -> ".*", "errors" -> ".*")) + } + } + + test("SPARK-54157: join after same-session DROP COLUMN fails" + + " (table with both table and column ID support)") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + val df1 = spark.table(t) + + // session column removal + insert via SQL + sql(s"ALTER TABLE $t DROP COLUMN salary") + sql(s"INSERT INTO $t VALUES (2)") + + val df2 = spark.table(t) + + checkError( + exception = intercept[AnalysisException] { + df1.join(df2, df1("id") === df2("id")).collect() + }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", + matchPVals = true, + parameters = Map("tableName" -> ".*", "errors" -> ".*")) + } + } + + // Scenario 4a: join after external drop and recreate table fails with TABLE_ID_MISMATCH + test("SPARK-54157: join detects external table drop and recreate via table ID" + + " (table with both table and column ID support)") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + val df1 = spark.table(t) + val originTableId = catalog("testcat").loadTable(ident).id + + // external drop and recreate via catalog API + catalog("testcat").dropTable(ident) + catalog("testcat").createTable( + ident, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + // external writer adds (2, 200) to the new table + externalAppend(catalogName = "testcat", ident = ident, + schema = StructType.fromDDL("id INT, salary INT"), row = InternalRow(2, 200)) + + val df2 = spark.table(t) + val newTableId = catalog("testcat").loadTable(ident).id + assert(originTableId != newTableId) + + checkError( + exception = intercept[AnalysisException] { + df1.join(df2, df1("id") === df2("id")).collect() + }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.TABLE_ID_MISMATCH", + matchPVals = true, + parameters = Map( + "tableName" -> ".*", + "capturedTableId" -> ".*", + "currentTableId" -> ".*")) + } + } + + // Scenario 4b: when table ID is null, column IDs still detect external drop+recreate. + test("SPARK-54157: join detects external drop/recreate via column IDs" + + " (table without table ID support, but with column ID support)") { + val t = "nullidcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + val df1 = spark.table(t) + assert(catalog("nullidcat").loadTable(ident).id == null, + "NullTableIdInMemoryTableCatalog should produce null table IDs") + + // external drop and recreate via catalog API + catalog("nullidcat").dropTable(ident) + catalog("nullidcat").createTable( + ident, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + // external writer adds (2, 200) to the new table + externalAppend(catalogName = "nullidcat", ident = ident, + schema = StructType.fromDDL("id INT, salary INT"), row = InternalRow(2, 200)) + + val df2 = spark.table(t) + + // Table ID is null so TABLE_ID_MISMATCH does not fire. + // Column IDs differ (new table gets fresh IDs) so COLUMN_ID_MISMATCH fires. + checkError( + exception = intercept[AnalysisException] { + df1.join(df2, df1("id") === df2("id")).collect() + }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + matchPVals = true, + parameters = Map("tableName" -> ".*", "errors" -> "(?s).*")) + } + } + + // Scenario 4c: neither table ID nor column ID detects external drop/recreate. + test("SPARK-54157: join does not detect external table drop and recreate" + + " (table without table ID support and without column ID support)") { + val t = "nullbothidscat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + val df1 = spark.table(t) + val cat = catalog("nullbothidscat") + assert(cat.loadTable(ident).id == null, + "NullTableIdAndNullColumnIdInMemoryTableCatalog should produce null table IDs") + assert(cat.loadTable(ident).columns().forall(_.id() == null), + "NullTableIdAndNullColumnIdInMemoryTableCatalog should produce null column IDs") + + // external drop and recreate via catalog API + cat.dropTable(ident) + cat.createTable( + ident, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + val df2 = spark.table(t) + + // Neither TABLE_ID_MISMATCH nor COLUMN_ID_MISMATCH fires. + // Both sides refresh to the recreated (empty) table. The join succeeds + // but returns no rows because the new table has no data. + checkAnswer( + df1.join(df2, df1("id") === df2("id")), + Seq.empty) + } + } + + // Scenario 5a: two separate external alterTable calls assign a fresh column ID. + test("SPARK-54157: join detects external drop+re-add column via column IDs" + + " (table without table ID support, but with column ID support)") { + val t = "nullidcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + val df1 = spark.table(t) + + catalog("nullidcat").alterTable( + ident, TableChange.deleteColumn(Array("salary"), false)) + catalog("nullidcat").alterTable( + ident, TableChange.addColumn(Array("salary"), IntegerType, true)) + + val df2 = spark.table(t) + + checkError( + exception = intercept[AnalysisException] { + df1.join(df2, df1("id") === df2("id")).collect() + }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + matchPVals = true, + parameters = Map("tableName" -> ".*", "errors" -> ".*")) + } + } + + // Scenario 5b: neither table ID nor column ID detects external drop+re-add column. + test("SPARK-54157: join does not detect external drop+re-add column" + + " (table without table ID support and without column ID support)") { + val t = "nullbothidscat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + val df1 = spark.table(t) + + catalog("nullbothidscat").alterTable( + ident, TableChange.deleteColumn(Array("salary"), false)) + catalog("nullbothidscat").alterTable( + ident, TableChange.addColumn(Array("salary"), IntegerType, true)) + + val df2 = spark.table(t) + + // Neither TABLE_ID_MISMATCH nor COLUMN_ID_MISMATCH fires. + // The change goes undetected and the join succeeds. + checkAnswer( + df1.join(df2, df1("id") === df2("id")), + Seq(Row(1, null, 1, null))) + } + } + + // Scenario 6: external type change caught by schema validation (not column ID) + test("SPARK-54157: join detects external drop+re-add different-type column" + + " (table with both table and column ID support)") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + val df1 = spark.table(t) + + catalog("testcat").alterTable( + ident, TableChange.deleteColumn(Array("salary"), false)) + catalog("testcat").alterTable( + ident, TableChange.addColumn(Array("salary"), StringType, true)) + + // external writer adds (2, "high") with new schema + externalAppend(catalogName = "testcat", ident = ident, + schema = StructType.fromDDL("id INT, salary STRING"), + row = InternalRow(2, UTF8String.fromString("high"))) + + val df2 = spark.table(t) + + checkError( + exception = intercept[AnalysisException] { + df1.join(df2, df1("id") === df2("id")).collect() + }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + matchPVals = true, + parameters = Map("tableName" -> ".*", "errors" -> ".*")) + } + } + test("CTAS/RTAS should trigger two query executions") { // CTAS/RTAS triggers 2 query executions: // 1. The outer CTAS/RTAS command execution From df2bc6b9ed28424c65d673107553f6fb5c55ad55 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Wed, 20 May 2026 15:14:38 +0000 Subject: [PATCH 2/3] Fix misleading scenario 6 comment: column ID check fires, not schema validation The delete+re-add assigns a fresh column ID, so COLUMN_ID_MISMATCH fires before schema validation compares data types. Co-authored-by: Isaac --- .../spark/sql/connector/DataSourceV2DataFrameSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 23fdfa703ad9a..75d41496b142c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -4293,7 +4293,10 @@ class DataSourceV2DataFrameSuite } } - // Scenario 6: external type change caught by schema validation (not column ID) + // Scenario 6: external drop+re-add column with a different type. + // The delete removes the old column ID and the add assigns a fresh one, + // so the column ID check fires (COLUMN_ID_MISMATCH) before schema + // validation gets a chance to compare data types. test("SPARK-54157: join detects external drop+re-add different-type column" + " (table with both table and column ID support)") { val t = "testcat.ns1.ns2.tbl" From 5cce0d3d783337e27c1dd0e3492410f2d157f573 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Wed, 20 May 2026 22:27:28 +0000 Subject: [PATCH 3/3] Fix null/mixed column ID test assertions after version tracking propagation With version tracking now propagated to NullColumnId and MixedColumnId catalogs, the table is refreshed on re-read after drop+re-add column. The re-added salary column has null values, so assertions should expect Row(1, null) instead of Row(1, 100). Co-authored-by: Isaac --- .../connector/DataSourceV2DataFrameSuite.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 75d41496b142c..e15e7e0c0ad10 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -2601,7 +2601,8 @@ class DataSourceV2DataFrameSuite // Column ID tests: Null column ID connector - // When a connector does not support column IDs, validation is skipped. + // When a connector does not support column IDs, validation is skipped, but version + // tracking still detects the schema change and refreshes the table reference. test("connector with null column IDs: drop+re-add column not detected") { val t = "nullcolidcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") @@ -2617,8 +2618,9 @@ class DataSourceV2DataFrameSuite sql(s"ALTER TABLE $t DROP COLUMN salary") sql(s"ALTER TABLE $t ADD COLUMN salary INT") - // succeeds because column ID validation is skipped when IDs are null - checkAnswer(df, Seq(Row(1, 100))) + // No column ID error because IDs are null. The table is refreshed via version + // tracking, so the re-added salary column has null values. + checkAnswer(df, Seq(Row(1, null))) } } @@ -2666,8 +2668,9 @@ class DataSourceV2DataFrameSuite .find(_.name() == "salary").get assert(newSalaryCol.id() == null, "salary should have a null ID after re-add") - // succeeds because current column ID is null, so validation is skipped - checkAnswer(df, Seq(Row(1, 100))) + // No column ID error because current ID is null. The table is refreshed via + // version tracking, so the re-added salary column has null values. + checkAnswer(df, Seq(Row(1, null))) } } @@ -2696,8 +2699,9 @@ class DataSourceV2DataFrameSuite .find(_.name() == "salary").get assert(newSalaryCol.id() != null, "salary should have a non-null ID after re-add") - // succeeds because original column ID is null, so validation is skipped - checkAnswer(df, Seq(Row(1, 100))) + // No column ID error because original ID is null. The table is refreshed via + // version tracking, so the re-added salary column has null values. + checkAnswer(df, Seq(Row(1, null))) } }