Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 74 additions & 19 deletions cli/api/dbadapters/execution_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,8 @@ from (${query}) as insertions`;
this.buildIncrementalSchemaChangeTasks(tasks, table);
// Fall through to run the static DML after the procedure alters the schema
case dataform.OnSchemaChange.IGNORE:
default:
tasks.add(
Task.statement(
table.uniqueKey && table.uniqueKey.length > 0
? this.mergeInto(
table.target,
tableMetadata?.fields.map(f => f.name),
this.getIncrementalQuery(table),
table.uniqueKey,
table.bigquery && table.bigquery.updatePartitionFilter
)
: this.insertInto(
table.target,
tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``),
this.getIncrementalQuery(table)
)
)
);
const columns = tableMetadata?.fields.map(f => f.name) || [];
tasks.add(Task.statement(this.getIncrementalDmlStatement(table, columns)));
break;
}
}
Expand Down Expand Up @@ -451,7 +435,7 @@ DROP TABLE IF EXISTS ${emptyTempTableName};
create or replace view ${this.resolveTarget(target)} as ${query}`;
}

private mergeInto(
private mergeInto(
target: dataform.ITarget,
columns: string[],
query: string,
Expand All @@ -470,6 +454,77 @@ when matched then
when not matched then
insert (${backtickedColumns.join(",")}) values (${backtickedColumns.join(",")})`;
}

private insertOverwrite(
target: dataform.ITarget,
columns: string[],
query: string,
partitionBy: string,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the table.bigquery or table.bigquery.partitionBy is not provided? will it be an empty string or 'null'?

updatePartitionFilter: string
): string {
const uniqueId = this.uniqueIdGenerator();
const stagingTableUnqualified = `staging_table_temp_${uniqueId}`;
const backtickedColumns = columns.map(column => `\`${column}\``);
const resolveTargetTable = this.resolveTarget(target);

return `CREATE OR REPLACE TEMP TABLE \`${stagingTableUnqualified}\` AS (

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you split separate SQL statements into granular Task.statement calls?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can't split this SQL because we are using TEMP table. We could in case we would use persistent table.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what Nick means here is to split the sql creation into statements not the sql itself.

${query}
);

BEGIN
DECLARE partitions_for_replacement DEFAULT (
ARRAY(
SELECT DISTINCT ${partitionBy}
FROM \`${stagingTableUnqualified}\`
WHERE ${partitionBy} IS NOT NULL

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will this statement look like if we have empty partitionBy? also will it have `` around the column name somehow? Do we have an example for final sql that will be generated.

)
);

MERGE ${resolveTargetTable} T
USING \`${stagingTableUnqualified}\` S
ON FALSE
WHEN NOT MATCHED BY SOURCE AND ${partitionBy} IN UNNEST(partitions_for_replacement) ${updatePartitionFilter ? `and T.${updatePartitionFilter}` : ""} THEN

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such code T.${updatePartitionFilter} will only work when updatePartitionFilter has exactly one expression?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are correct. It is a known limitation that T.${updatePartitionFilter} only works for simple expressions (and fails on multi-expression SQL). Current implementation is designed to match the existing behavior of the standard MERGE strategy to maintain consistency between the two strategies for now. The fix of using updatePartitionFilter with several expression will be introduced in a separate PR.

DELETE
WHEN NOT MATCHED BY TARGET THEN
INSERT (${backtickedColumns.join(",")}) VALUES (${backtickedColumns.join(",")});
END;

DROP TABLE IF EXISTS \`${stagingTableUnqualified}\`;`;
}

private getIncrementalDmlStatement(
table: dataform.ITable,
columns: string[]
): string {
const incrementalQuery = this.getIncrementalQuery(table);

switch (table.incrementalStrategy) {
case dataform.IncrementalStrategy.INSERT_OVERWRITE:
return this.insertOverwrite(
table.target,
columns,
incrementalQuery,
table.bigquery && table.bigquery.partitionBy,
table.bigquery && table.bigquery.updatePartitionFilter
);
case dataform.IncrementalStrategy.MERGE:
default:
if (table.uniqueKey && table.uniqueKey.length > 0) {
return this.mergeInto(
table.target,
columns,
incrementalQuery,
table.uniqueKey,
table.bigquery && table.bigquery.updatePartitionFilter
);
}
return this.insertInto(
table.target,
columns.map(column => `\`${column}\``),
incrementalQuery
);
}
}
}

export function collectEvaluationQueries(
Expand Down
30 changes: 30 additions & 0 deletions cli/api/execution_sql_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,34 @@ suite("ExecutionSql with 'onSchemaChange'", () => {
const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_ignore.sql", "utf8");
expect(procedureSql).to.equal(expectedSql.trim());
});

test("generates INSERT_OVERWRITE script for IGNORE strategy", () => {
const table = {
...baseTable,
incrementalStrategy: dataform.IncrementalStrategy.INSERT_OVERWRITE,
bigquery: {
partitionBy: "DATE(ts)",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have an example column that is not a function like Date. My concern is that we a re biased toward handling this method. so it its just a normal column name we might not handle it correctly.

updatePartitionFilter: "ts >= '2024-01-01'"
}
};
const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata);
const sql = tasks.build().map(t => t.statement).join("\n;\n");
const expectedSql = fs.readFileSync("cli/api/goldens/insert_overwrite_ignore.sql", "utf8");
expect(sql).to.equal(expectedSql.trim());
});

test("generates INSERT_OVERWRITE script for EXTEND strategy", () => {
const table = {
...baseTable,
incrementalStrategy: dataform.IncrementalStrategy.INSERT_OVERWRITE,
onSchemaChange: dataform.OnSchemaChange.EXTEND,
bigquery: {
partitionBy: "DATE(ts)"
}
};
const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata);
const sql = tasks.build().map(t => t.statement).join("\n;\n");
const expectedSql = fs.readFileSync("cli/api/goldens/insert_overwrite_extend.sql", "utf8");
expect(sql).to.equal(expectedSql.trim());
});
});
97 changes: 97 additions & 0 deletions cli/api/goldens/insert_overwrite_extend.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
CREATE OR REPLACE PROCEDURE `project-id.dataset-id.df_osc_test_uuid`()
OPTIONS(strict_mode=false)
BEGIN

-- Create empty table to extract schema of new query.
CREATE OR REPLACE TABLE `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty` AS (
SELECT * FROM (select 1 as id, 'a' as field1, 'new' as field2) AS insertions LIMIT 0
);


-- Compare schemas
DECLARE dataform_columns ARRAY<STRING>;
DECLARE temp_table_columns ARRAY<STRUCT<column_name STRING, data_type STRING>>;
DECLARE columns_added ARRAY<STRUCT<column_name STRING, data_type STRING>>;
DECLARE columns_removed ARRAY<STRING>;

SET dataform_columns = (
SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), [])
FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'incremental_on_schema_change'
);

SET temp_table_columns = (
SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), [])
FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'incremental_on_schema_change_df_temp_test_uuid_empty'
);

SET columns_added = (
SELECT IFNULL(ARRAY_AGG(column_info), [])
FROM UNNEST(temp_table_columns) AS column_info
WHERE column_info.column_name NOT IN UNNEST(dataform_columns)
);
SET columns_removed = (
SELECT IFNULL(ARRAY_AGG(column_name), [])
FROM UNNEST(dataform_columns) AS column_name
WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col)
);


-- Apply schema change strategy (EXTEND).
IF ARRAY_LENGTH(columns_removed) > 0 THEN
RAISE USING MESSAGE = FORMAT(
"Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T",
columns_removed
);
END IF;

IF ARRAY_LENGTH(columns_added) > 0 THEN
EXECUTE IMMEDIATE (
"ALTER TABLE `project-id.dataset-id.incremental_on_schema_change` " ||
(
SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ")
FROM UNNEST(columns_added) AS column_info
)
);
END IF;



-- Cleanup temporary tables.
DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`;

END
;
BEGIN
CALL `project-id.dataset-id.df_osc_test_uuid`();
EXCEPTION WHEN ERROR THEN
DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`;
DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`;
RAISE;
END;
DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`
;
CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS (
select 1 as id, 'a' as field1, 'new' as field2
);

BEGIN
DECLARE partitions_for_replacement DEFAULT (
ARRAY(
SELECT DISTINCT DATE(ts)
FROM `staging_table_temp_test_uuid`
WHERE DATE(ts) IS NOT NULL
)
);

MERGE `project-id.dataset-id.incremental_on_schema_change` T
USING `staging_table_temp_test_uuid` S
ON FALSE
WHEN NOT MATCHED BY SOURCE AND DATE(ts) IN UNNEST(partitions_for_replacement) THEN
DELETE
WHEN NOT MATCHED BY TARGET THEN
INSERT (`id`,`field1`) VALUES (`id`,`field1`);
END;

DROP TABLE IF EXISTS `staging_table_temp_test_uuid`
23 changes: 23 additions & 0 deletions cli/api/goldens/insert_overwrite_ignore.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS (
select 1 as id, 'a' as field1, 'new' as field2
);

BEGIN
DECLARE partitions_for_replacement DEFAULT (
ARRAY(
SELECT DISTINCT DATE(ts)
FROM `staging_table_temp_test_uuid`
WHERE DATE(ts) IS NOT NULL
)
);

MERGE `project-id.dataset-id.incremental_on_schema_change` T
USING `staging_table_temp_test_uuid` S
ON FALSE
WHEN NOT MATCHED BY SOURCE AND DATE(ts) IN UNNEST(partitions_for_replacement) and T.ts >= '2024-01-01' THEN
DELETE
WHEN NOT MATCHED BY TARGET THEN
INSERT (`id`,`field1`) VALUES (`id`,`field1`);
END;

DROP TABLE IF EXISTS `staging_table_temp_test_uuid`

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read the execution_sql.ts file correctly, there should be a ";" at the end no? same in insert_overwrite_extend.sql.

60 changes: 60 additions & 0 deletions core/actions/incremental_table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ export class IncrementalTable extends ActionBuilder<dataform.Table> {
} : {}),
});
this.proto.onSchemaChange = this.mapOnSchemaChange(config.onSchemaChange);
this.proto.incrementalStrategy = this.mapIncrementalStrategy(config.incrementalStrategy);

this.checkIncrementalStrategyRequirements(config);

if (config.reservation) {
if (!this.proto.actionDescriptor) {
Expand Down Expand Up @@ -736,6 +739,63 @@ export class IncrementalTable extends ActionBuilder<dataform.Table> {
throw new Error(`OnSchemaChange value "${onSchemaChange}" is not supported`);
}
}

private mapIncrementalStrategy(
incrementalStrategy?: string | number
): dataform.IncrementalStrategy {
if (!incrementalStrategy) {
return dataform.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED;
}

if (typeof incrementalStrategy === "number") {
switch (incrementalStrategy) {
case dataform.ActionConfig.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED:
return dataform.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED;
case dataform.ActionConfig.IncrementalStrategy.MERGE:
return dataform.IncrementalStrategy.MERGE;
case dataform.ActionConfig.IncrementalStrategy.INSERT_OVERWRITE:
return dataform.IncrementalStrategy.INSERT_OVERWRITE;
default:
throw new Error(`IncrementalStrategy value "${incrementalStrategy}" is not supported`);
}
}

switch (incrementalStrategy.toString().toUpperCase()) {
case "INCREMENTAL_STRATEGY_UNSPECIFIED":
return dataform.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED;
case "MERGE":
return dataform.IncrementalStrategy.MERGE;
case "INSERT_OVERWRITE":
return dataform.IncrementalStrategy.INSERT_OVERWRITE;
default:
throw new Error(`IncrementalStrategy value "${incrementalStrategy}" is not supported`);
}
}

private checkIncrementalStrategyRequirements(config: dataform.ActionConfig.IIncrementalTableConfig) {
switch (this.proto.incrementalStrategy) {
case dataform.IncrementalStrategy.INSERT_OVERWRITE:
if (!this.proto.bigquery || !this.proto.bigquery.partitionBy) {
this.session.compileError(
new Error("incrementalStrategy 'insert_overwrite' requires 'partitionBy' to be set."),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Inconsistent style. in some places we have capital first letter and in others we don't. We also use "." in some places and miss it in others. for example on line 759.

config.filename,
this.proto.target
);
}
break;
case dataform.IncrementalStrategy.MERGE:
if (!this.proto.uniqueKey || this.proto.uniqueKey.length === 0) {
this.session.compileError(
new Error("incrementalStrategy 'merge' requires 'uniqueKey' to be set."),
config.filename,
this.proto.target
);
}
break;
default:
break;
}
}
}

/**
Expand Down
Loading
Loading