-
Notifications
You must be signed in to change notification settings - Fork 200
Support incremental_strategy parameter and new insert_overwrite strategy #2195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -154,24 +154,8 @@ from (${query}) as insertions`; | |
| this.buildIncrementalSchemaChangeTasks(tasks, table); | ||
| // Fall through to run the static DML after the procedure alters the schema | ||
| case dataform.OnSchemaChange.IGNORE: | ||
| default: | ||
| tasks.add( | ||
| Task.statement( | ||
| table.uniqueKey && table.uniqueKey.length > 0 | ||
| ? this.mergeInto( | ||
| table.target, | ||
| tableMetadata?.fields.map(f => f.name), | ||
| this.getIncrementalQuery(table), | ||
| table.uniqueKey, | ||
| table.bigquery && table.bigquery.updatePartitionFilter | ||
| ) | ||
| : this.insertInto( | ||
| table.target, | ||
| tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``), | ||
| this.getIncrementalQuery(table) | ||
| ) | ||
| ) | ||
| ); | ||
| const columns = tableMetadata?.fields.map(f => f.name) || []; | ||
| tasks.add(Task.statement(this.getIncrementalDmlStatement(table, columns))); | ||
| break; | ||
| } | ||
| } | ||
|
|
@@ -451,7 +435,7 @@ DROP TABLE IF EXISTS ${emptyTempTableName}; | |
| create or replace view ${this.resolveTarget(target)} as ${query}`; | ||
| } | ||
|
|
||
| private mergeInto( | ||
| private mergeInto( | ||
| target: dataform.ITarget, | ||
| columns: string[], | ||
| query: string, | ||
|
|
@@ -470,6 +454,77 @@ when matched then | |
| when not matched then | ||
| insert (${backtickedColumns.join(",")}) values (${backtickedColumns.join(",")})`; | ||
| } | ||
|
|
||
| private insertOverwrite( | ||
| target: dataform.ITarget, | ||
| columns: string[], | ||
| query: string, | ||
| partitionBy: string, | ||
| updatePartitionFilter: string | ||
| ): string { | ||
| const uniqueId = this.uniqueIdGenerator(); | ||
| const stagingTableUnqualified = `staging_table_temp_${uniqueId}`; | ||
| const backtickedColumns = columns.map(column => `\`${column}\``); | ||
| const resolveTargetTable = this.resolveTarget(target); | ||
|
|
||
| return `CREATE OR REPLACE TEMP TABLE \`${stagingTableUnqualified}\` AS ( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you split separate SQL statements into granular
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we can't split this SQL because we are using TEMP table. We could in case we would use persistent table.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think what Nick means here is to split the sql creation into statements not the sql itself. |
||
| ${query} | ||
| ); | ||
|
|
||
| BEGIN | ||
| DECLARE partitions_for_replacement DEFAULT ( | ||
| ARRAY( | ||
| SELECT DISTINCT ${partitionBy} | ||
| FROM \`${stagingTableUnqualified}\` | ||
| WHERE ${partitionBy} IS NOT NULL | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What will this statement look like if we have empty partitionBy? also will it have `` around the column name somehow? Do we have an example for final sql that will be generated. |
||
| ) | ||
| ); | ||
|
|
||
| MERGE ${resolveTargetTable} T | ||
| USING \`${stagingTableUnqualified}\` S | ||
| ON FALSE | ||
| WHEN NOT MATCHED BY SOURCE AND ${partitionBy} IN UNNEST(partitions_for_replacement) ${updatePartitionFilter ? `and T.${updatePartitionFilter}` : ""} THEN | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Such code
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, you are correct. It is a known limitation that T.${updatePartitionFilter} only works for simple expressions (and fails on multi-expression SQL). Current implementation is designed to match the existing behavior of the standard MERGE strategy to maintain consistency between the two strategies for now. The fix of using updatePartitionFilter with several expression will be introduced in a separate PR. |
||
| DELETE | ||
| WHEN NOT MATCHED BY TARGET THEN | ||
| INSERT (${backtickedColumns.join(",")}) VALUES (${backtickedColumns.join(",")}); | ||
| END; | ||
|
|
||
| DROP TABLE IF EXISTS \`${stagingTableUnqualified}\`;`; | ||
| } | ||
|
|
||
| private getIncrementalDmlStatement( | ||
| table: dataform.ITable, | ||
| columns: string[] | ||
| ): string { | ||
| const incrementalQuery = this.getIncrementalQuery(table); | ||
|
|
||
| switch (table.incrementalStrategy) { | ||
| case dataform.IncrementalStrategy.INSERT_OVERWRITE: | ||
| return this.insertOverwrite( | ||
| table.target, | ||
| columns, | ||
| incrementalQuery, | ||
| table.bigquery && table.bigquery.partitionBy, | ||
| table.bigquery && table.bigquery.updatePartitionFilter | ||
| ); | ||
| case dataform.IncrementalStrategy.MERGE: | ||
| default: | ||
| if (table.uniqueKey && table.uniqueKey.length > 0) { | ||
| return this.mergeInto( | ||
| table.target, | ||
| columns, | ||
| incrementalQuery, | ||
| table.uniqueKey, | ||
| table.bigquery && table.bigquery.updatePartitionFilter | ||
| ); | ||
| } | ||
| return this.insertInto( | ||
| table.target, | ||
| columns.map(column => `\`${column}\``), | ||
| incrementalQuery | ||
| ); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| export function collectEvaluationQueries( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,4 +86,34 @@ suite("ExecutionSql with 'onSchemaChange'", () => { | |
| const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_ignore.sql", "utf8"); | ||
| expect(procedureSql).to.equal(expectedSql.trim()); | ||
| }); | ||
|
|
||
| test("generates INSERT_OVERWRITE script for IGNORE strategy", () => { | ||
| const table = { | ||
| ...baseTable, | ||
| incrementalStrategy: dataform.IncrementalStrategy.INSERT_OVERWRITE, | ||
| bigquery: { | ||
| partitionBy: "DATE(ts)", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have an example column that is not a function like Date. My concern is that we a re biased toward handling this method. so it its just a normal column name we might not handle it correctly. |
||
| updatePartitionFilter: "ts >= '2024-01-01'" | ||
| } | ||
| }; | ||
| const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); | ||
| const sql = tasks.build().map(t => t.statement).join("\n;\n"); | ||
| const expectedSql = fs.readFileSync("cli/api/goldens/insert_overwrite_ignore.sql", "utf8"); | ||
| expect(sql).to.equal(expectedSql.trim()); | ||
| }); | ||
|
|
||
| test("generates INSERT_OVERWRITE script for EXTEND strategy", () => { | ||
| const table = { | ||
| ...baseTable, | ||
| incrementalStrategy: dataform.IncrementalStrategy.INSERT_OVERWRITE, | ||
| onSchemaChange: dataform.OnSchemaChange.EXTEND, | ||
| bigquery: { | ||
| partitionBy: "DATE(ts)" | ||
| } | ||
| }; | ||
| const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); | ||
| const sql = tasks.build().map(t => t.statement).join("\n;\n"); | ||
| const expectedSql = fs.readFileSync("cli/api/goldens/insert_overwrite_extend.sql", "utf8"); | ||
| expect(sql).to.equal(expectedSql.trim()); | ||
| }); | ||
| }); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,97 @@ | ||
| CREATE OR REPLACE PROCEDURE `project-id.dataset-id.df_osc_test_uuid`() | ||
| OPTIONS(strict_mode=false) | ||
| BEGIN | ||
|
|
||
| -- Create empty table to extract schema of new query. | ||
| CREATE OR REPLACE TABLE `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty` AS ( | ||
| SELECT * FROM (select 1 as id, 'a' as field1, 'new' as field2) AS insertions LIMIT 0 | ||
| ); | ||
|
|
||
|
|
||
| -- Compare schemas | ||
| DECLARE dataform_columns ARRAY<STRING>; | ||
| DECLARE temp_table_columns ARRAY<STRUCT<column_name STRING, data_type STRING>>; | ||
| DECLARE columns_added ARRAY<STRUCT<column_name STRING, data_type STRING>>; | ||
| DECLARE columns_removed ARRAY<STRING>; | ||
|
|
||
| SET dataform_columns = ( | ||
| SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), []) | ||
| FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` | ||
| WHERE table_name = 'incremental_on_schema_change' | ||
| ); | ||
|
|
||
| SET temp_table_columns = ( | ||
| SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), []) | ||
| FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` | ||
| WHERE table_name = 'incremental_on_schema_change_df_temp_test_uuid_empty' | ||
| ); | ||
|
|
||
| SET columns_added = ( | ||
| SELECT IFNULL(ARRAY_AGG(column_info), []) | ||
| FROM UNNEST(temp_table_columns) AS column_info | ||
| WHERE column_info.column_name NOT IN UNNEST(dataform_columns) | ||
| ); | ||
| SET columns_removed = ( | ||
| SELECT IFNULL(ARRAY_AGG(column_name), []) | ||
| FROM UNNEST(dataform_columns) AS column_name | ||
| WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col) | ||
| ); | ||
|
|
||
|
|
||
| -- Apply schema change strategy (EXTEND). | ||
| IF ARRAY_LENGTH(columns_removed) > 0 THEN | ||
| RAISE USING MESSAGE = FORMAT( | ||
| "Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T", | ||
| columns_removed | ||
| ); | ||
| END IF; | ||
|
|
||
| IF ARRAY_LENGTH(columns_added) > 0 THEN | ||
| EXECUTE IMMEDIATE ( | ||
| "ALTER TABLE `project-id.dataset-id.incremental_on_schema_change` " || | ||
| ( | ||
| SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ") | ||
| FROM UNNEST(columns_added) AS column_info | ||
| ) | ||
| ); | ||
| END IF; | ||
|
|
||
|
|
||
|
|
||
| -- Cleanup temporary tables. | ||
| DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; | ||
|
|
||
| END | ||
| ; | ||
| BEGIN | ||
| CALL `project-id.dataset-id.df_osc_test_uuid`(); | ||
| EXCEPTION WHEN ERROR THEN | ||
| DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; | ||
| DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`; | ||
| RAISE; | ||
| END; | ||
| DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid` | ||
| ; | ||
| CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS ( | ||
| select 1 as id, 'a' as field1, 'new' as field2 | ||
| ); | ||
|
|
||
| BEGIN | ||
| DECLARE partitions_for_replacement DEFAULT ( | ||
| ARRAY( | ||
| SELECT DISTINCT DATE(ts) | ||
| FROM `staging_table_temp_test_uuid` | ||
| WHERE DATE(ts) IS NOT NULL | ||
| ) | ||
| ); | ||
|
|
||
| MERGE `project-id.dataset-id.incremental_on_schema_change` T | ||
| USING `staging_table_temp_test_uuid` S | ||
| ON FALSE | ||
| WHEN NOT MATCHED BY SOURCE AND DATE(ts) IN UNNEST(partitions_for_replacement) THEN | ||
| DELETE | ||
| WHEN NOT MATCHED BY TARGET THEN | ||
| INSERT (`id`,`field1`) VALUES (`id`,`field1`); | ||
| END; | ||
|
|
||
| DROP TABLE IF EXISTS `staging_table_temp_test_uuid` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS ( | ||
| select 1 as id, 'a' as field1, 'new' as field2 | ||
| ); | ||
|
|
||
| BEGIN | ||
| DECLARE partitions_for_replacement DEFAULT ( | ||
| ARRAY( | ||
| SELECT DISTINCT DATE(ts) | ||
| FROM `staging_table_temp_test_uuid` | ||
| WHERE DATE(ts) IS NOT NULL | ||
| ) | ||
| ); | ||
|
|
||
| MERGE `project-id.dataset-id.incremental_on_schema_change` T | ||
| USING `staging_table_temp_test_uuid` S | ||
| ON FALSE | ||
| WHEN NOT MATCHED BY SOURCE AND DATE(ts) IN UNNEST(partitions_for_replacement) and T.ts >= '2024-01-01' THEN | ||
| DELETE | ||
| WHEN NOT MATCHED BY TARGET THEN | ||
| INSERT (`id`,`field1`) VALUES (`id`,`field1`); | ||
| END; | ||
|
|
||
| DROP TABLE IF EXISTS `staging_table_temp_test_uuid` | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I read the execution_sql.ts file correctly, there should be a ";" at the end no? same in |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -217,6 +217,9 @@ export class IncrementalTable extends ActionBuilder<dataform.Table> { | |
| } : {}), | ||
| }); | ||
| this.proto.onSchemaChange = this.mapOnSchemaChange(config.onSchemaChange); | ||
| this.proto.incrementalStrategy = this.mapIncrementalStrategy(config.incrementalStrategy); | ||
|
|
||
| this.checkIncrementalStrategyRequirements(config); | ||
|
|
||
| if (config.reservation) { | ||
| if (!this.proto.actionDescriptor) { | ||
|
|
@@ -736,6 +739,63 @@ export class IncrementalTable extends ActionBuilder<dataform.Table> { | |
| throw new Error(`OnSchemaChange value "${onSchemaChange}" is not supported`); | ||
| } | ||
| } | ||
|
|
||
| private mapIncrementalStrategy( | ||
| incrementalStrategy?: string | number | ||
| ): dataform.IncrementalStrategy { | ||
| if (!incrementalStrategy) { | ||
| return dataform.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED; | ||
| } | ||
|
|
||
| if (typeof incrementalStrategy === "number") { | ||
| switch (incrementalStrategy) { | ||
| case dataform.ActionConfig.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED: | ||
| return dataform.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED; | ||
| case dataform.ActionConfig.IncrementalStrategy.MERGE: | ||
| return dataform.IncrementalStrategy.MERGE; | ||
| case dataform.ActionConfig.IncrementalStrategy.INSERT_OVERWRITE: | ||
| return dataform.IncrementalStrategy.INSERT_OVERWRITE; | ||
| default: | ||
| throw new Error(`IncrementalStrategy value "${incrementalStrategy}" is not supported`); | ||
| } | ||
| } | ||
|
|
||
| switch (incrementalStrategy.toString().toUpperCase()) { | ||
| case "INCREMENTAL_STRATEGY_UNSPECIFIED": | ||
| return dataform.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED; | ||
| case "MERGE": | ||
| return dataform.IncrementalStrategy.MERGE; | ||
| case "INSERT_OVERWRITE": | ||
| return dataform.IncrementalStrategy.INSERT_OVERWRITE; | ||
| default: | ||
| throw new Error(`IncrementalStrategy value "${incrementalStrategy}" is not supported`); | ||
| } | ||
| } | ||
|
|
||
| private checkIncrementalStrategyRequirements(config: dataform.ActionConfig.IIncrementalTableConfig) { | ||
| switch (this.proto.incrementalStrategy) { | ||
| case dataform.IncrementalStrategy.INSERT_OVERWRITE: | ||
| if (!this.proto.bigquery || !this.proto.bigquery.partitionBy) { | ||
| this.session.compileError( | ||
| new Error("incrementalStrategy 'insert_overwrite' requires 'partitionBy' to be set."), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Inconsistent style. in some places we have capital first letter and in others we don't. We also use "." in some places and miss it in others. for example on line 759. |
||
| config.filename, | ||
| this.proto.target | ||
| ); | ||
| } | ||
| break; | ||
| case dataform.IncrementalStrategy.MERGE: | ||
| if (!this.proto.uniqueKey || this.proto.uniqueKey.length === 0) { | ||
| this.session.compileError( | ||
| new Error("incrementalStrategy 'merge' requires 'uniqueKey' to be set."), | ||
| config.filename, | ||
| this.proto.target | ||
| ); | ||
| } | ||
| break; | ||
| default: | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the table.bigquery or table.bigquery.partitionBy is not provided? will it be an empty string or 'null'?