Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class ComposedColumnIdTableCatalog extends InMemoryTableCatalog {
table.constraints,
id = table.id)
composedTable.alterTableWithData(table.data, table.schema)
composedTable.setVersionAndValidatedVersionFrom(table)
tables.put(ident, composedTable)
composedTable
}
Expand Down Expand Up @@ -157,6 +158,7 @@ class ComposedColumnIdTableCatalog extends InMemoryTableCatalog {
alteredTable.constraints,
id = alteredTable.id)
composedTable.alterTableWithData(alteredTable.data, alteredTable.schema)
composedTable.setVersionAndValidatedVersionFrom(alteredTable)
tables.put(ident, composedTable)
composedTable
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ abstract class InMemoryBaseTable(
tableVersion = version.toInt
}

/**
* Copies version and validated version from another table so that
* [[V2TableRefreshUtil]] can detect changes and refresh stale references.
*/
def setVersionAndValidatedVersionFrom(sourceTable: InMemoryBaseTable): Unit = {
setVersion(sourceTable.version())
if (sourceTable.validatedVersion() != null) {
setValidatedVersion(sourceTable.validatedVersion())
}
}

def increaseVersion(): Unit = {
tableVersion += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class InMemoryRowLevelOperationTableCatalog
constraints = constraints,
tableId = table.id)
newTable.alterTableWithData(table.data, schema)
newTable.setVersionAndValidatedVersionFrom(table)

tables.put(ident, newTable)

Expand Down Expand Up @@ -123,6 +124,7 @@ class PartialSchemaEvolutionCatalog extends InMemoryRowLevelOperationTableCatalo
properties = properties,
constraints = table.constraints)
newTable.alterTableWithData(table.data, table.schema)
newTable.setVersionAndValidatedVersionFrom(table)
tables.put(ident, newTable)
newTable
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,7 @@ class InMemoryTable(

copiedTable.commits ++= commits.map(_.copy())

copiedTable.setVersion(version())
if (validatedVersion() != null) {
copiedTable.setValidatedVersion(validatedVersion())
}
copiedTable.setVersionAndValidatedVersionFrom(this)

copiedTable
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class MixedColumnIdTableCatalog extends InMemoryTableCatalog {
id = table.id,
nullIdNames = snapshot)
mixedTable.alterTableWithData(table.data, table.schema)
mixedTable.setVersionAndValidatedVersionFrom(table)
mixedTable
}

Expand Down Expand Up @@ -120,6 +121,7 @@ class MixedColumnIdInMemoryTable(
dataMap.synchronized {
copiedTable.alterTableWithData(data, schema)
}
copiedTable.setVersionAndValidatedVersionFrom(this)
copiedTable
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class NullColumnIdInMemoryTableCatalog extends InMemoryTableCatalog {
constraints = table.constraints,
id = table.id)
nullColIdTable.alterTableWithData(table.data, table.schema)
nullColIdTable.setVersionAndValidatedVersionFrom(table)
nullColIdTable
}

Expand Down Expand Up @@ -100,6 +101,7 @@ class NullColumnIdInMemoryTable(
dataMap.synchronized {
copiedTable.alterTableWithData(data, schema)
}
copiedTable.setVersionAndValidatedVersionFrom(this)
copiedTable
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog

import org.apache.spark.sql.connector.catalog.constraints.Constraint
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.internal.connector.ColumnImpl

/**
* An [[InMemoryTableCatalog]] that strips both table IDs ([[Table.id]]
* returns null) and column IDs ([[Column.id]] returns null). This simulates
* connectors that support neither table nor column identity tracking.
*
* When both IDs are null, neither [[V2TableRefreshUtil.validateTableIdentity]]
* nor [[V2TableUtil.validateColumnIds]] fires, so drop/recreate of a table
* or drop/re-add of a column goes undetected.
*/
class NullTableIdAndNullColumnIdInMemoryTableCatalog extends InMemoryTableCatalog {

private def toNullIdsTable(
table: InMemoryTable): NullTableIdAndNullColumnIdInMemoryTable = {
val nullTable = new NullTableIdAndNullColumnIdInMemoryTable(
name = table.name,
columns = table.columns(),
partitioning = table.partitioning,
properties = table.properties,
constraints = table.constraints)
nullTable.alterTableWithData(table.data, table.schema)
nullTable.setVersionAndValidatedVersionFrom(table)
nullTable
}

override def createTable(
ident: Identifier,
info: TableInfo): Table = {
val table = super.createTable(ident, info).asInstanceOf[InMemoryTable]
val nullTable = toNullIdsTable(table)
tables.put(ident, nullTable)
nullTable
}

override def alterTable(ident: Identifier, changes: TableChange*): Table = {
val table = super.alterTable(ident, changes: _*).asInstanceOf[InMemoryTable]
val nullTable = toNullIdsTable(table)
tables.put(ident, nullTable)
nullTable
}
}

/**
* An [[InMemoryTable]] with both null table ID and null column IDs,
* simulating a connector that supports neither identity tracking mechanism.
*/
class NullTableIdAndNullColumnIdInMemoryTable(
name: String,
columns: Array[Column],
partitioning: Array[Transform],
properties: java.util.Map[String, String],
constraints: Array[Constraint] = Array.empty)
extends InMemoryTable(
name = name,
columns = columns,
partitioning = partitioning,
properties = properties,
constraints = constraints,
id = null) {

override def columns(): Array[Column] = {
super.columns().map(_.asInstanceOf[ColumnImpl].copy(id = null))
}

override def copy(): Table = {
val copiedTable = new NullTableIdAndNullColumnIdInMemoryTable(
name,
columns(),
partitioning,
properties,
constraints)
dataMap.synchronized {
copiedTable.alterTableWithData(data, schema)
}
copiedTable.setVersionAndValidatedVersionFrom(this)
copiedTable
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class TypeChangeResetsColIdTableCatalog extends InMemoryTableCatalog {
alteredTable.constraints,
id = alteredTable.id)
tableWithResetIds.alterTableWithData(alteredTable.data, alteredTable.schema)
tableWithResetIds.setVersionAndValidatedVersionFrom(alteredTable)
tables.put(ident, tableWithResetIds)
tableWithResetIds
}
Expand Down
Loading