From bfba4099f33ad98c61f7f8991f0cf518b5e69436 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Fri, 12 Jun 2026 08:29:18 +0000 Subject: [PATCH] canary environment: Add iceberg sinks to GCS --- bin/ci-builder | 3 + misc/python/materialize/biglake.py | 203 ++++++++++++++++++ test/canary-environment/README.md | 6 + .../loadgen/customer_gcs_iceberg_sink.sql | 15 ++ .../models/loadgen/sales_gcs_iceberg_sink.sql | 15 ++ .../mysql_cdc/mysql_wmr_gcs_iceberg_sink.sql | 15 ++ .../pg_relationships_gcs_iceberg_sink.sql | 15 ++ .../table/table_mv_gcs_iceberg_sink.sql | 15 ++ .../models/tpch/tpch_q18_gcs_iceberg_sink.sql | 15 ++ test/canary-environment/mzcompose.py | 36 ++++ test/gcp/mzcompose.py | 181 +++------------- 11 files changed, 362 insertions(+), 157 deletions(-) create mode 100644 misc/python/materialize/biglake.py create mode 100644 test/canary-environment/models/loadgen/customer_gcs_iceberg_sink.sql create mode 100644 test/canary-environment/models/loadgen/sales_gcs_iceberg_sink.sql create mode 100644 test/canary-environment/models/mysql_cdc/mysql_wmr_gcs_iceberg_sink.sql create mode 100644 test/canary-environment/models/pg_cdc/pg_relationships_gcs_iceberg_sink.sql create mode 100644 test/canary-environment/models/table/table_mv_gcs_iceberg_sink.sql create mode 100644 test/canary-environment/models/tpch/tpch_q18_gcs_iceberg_sink.sql diff --git a/bin/ci-builder b/bin/ci-builder index 19dd9aa69bc9a..ebe9497c5cf01 100755 --- a/bin/ci-builder +++ b/bin/ci-builder @@ -285,6 +285,9 @@ case "$cmd" in --env ICEBERG_GCS_BUCKET --env ICEBERG_GCP_PROJECT --env ICEBERG_GCP_SA_JSON_B64 + # For test/canary-environment Iceberg GCS sinks + --env QA_CANARY_ICEBERG_GCS_BUCKET + --env QA_CANARY_ICEBERG_GCP_SA_JSON_B64 --env GITHUB_TOKEN --env GITHUB_GHCR_TOKEN --env GPG_KEY diff --git a/misc/python/materialize/biglake.py b/misc/python/materialize/biglake.py new file mode 100644 index 0000000000000..1db32fdbbd617 --- /dev/null +++ b/misc/python/materialize/biglake.py @@ -0,0 +1,203 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +"""Helpers for bootstrapping Google Cloud BigLake Iceberg REST catalogs. + +BigLake (the API is still called BigLake, though it now lives under the +"Lakehouse" umbrella) exposes an Iceberg REST catalog at +``https://biglake.googleapis.com/iceberg/v1/restcatalog``. + +Unlike some Iceberg catalogs, BigLake does **not** auto-create catalogs or +namespaces on first write. A Materialize Iceberg sink only creates the *table* +(see ``load_or_create_table`` in ``src/storage/src/sink/iceberg.rs``), not the +namespace, so a sink targeting a missing namespace fails at runtime with:: + + Failed to create Iceberg table '' in namespace '': + Tried to create a table under a namespace that does not exist + +These helpers create the catalog and namespace out of band so the sink can then +create its tables. Shared by ``test/gcp`` (per-run, throwaway ``e2e_*`` +namespaces) and ``test/canary-environment`` (a fixed, long-lived namespace). +""" + +import json +import time +import urllib.error +import urllib.parse +import urllib.request + +import jwt + +# Blanket scope used to mint tokens for both GCS and BigLake. Matches GCP_SCOPE +# in src/storage-types/src/connections/gcp.rs. +GCP_SCOPE = "https://www.googleapis.com/auth/cloud-platform" + +BIGLAKE_REST_BASE = "https://biglake.googleapis.com/iceberg/v1/restcatalog" + + +def mint_gcp_access_token(service_account: dict) -> str: + """Mint an OAuth2 access token from a GCP service-account key (JWT bearer flow).""" + now = int(time.time()) + assertion = jwt.encode( + { + "iss": service_account["client_email"], + "scope": GCP_SCOPE, + "aud": "https://oauth2.googleapis.com/token", + "iat": now, + "exp": now + 3600, + }, + service_account["private_key"], + algorithm="RS256", + ) + body = urllib.parse.urlencode( + { + "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", + "assertion": assertion, + } + ).encode() + req = urllib.request.Request( + "https://oauth2.googleapis.com/token", + data=body, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + with urllib.request.urlopen(req) as resp: + return json.loads(resp.read())["access_token"] + + +def biglake_request( + method: str, url: str, token: str, project: str +) -> urllib.request.Request: + return urllib.request.Request( + url, + method=method, + headers={ + "Authorization": f"Bearer {token}", + "x-goog-user-project": project, + }, + ) + + +def ensure_catalog(token: str, project: str, bucket: str) -> None: + """Create the BigLake Iceberg REST catalog for this bucket if it's missing. + + The Iceberg REST `/v1/config?warehouse=gs://` lookup resolves the catalog + whose id equals the bucket name; it does not provision one on demand. Without it, + every REST call returns a sanitized 403. Creating it here lets callers bootstrap + their own catalog instead of depending on out-of-band (Pulumi/console) setup. + + Credential mode is END_USER: the Materialize sink writes to GCS with the service + account's own credentials (see `connect_rest` for the GCP branch in + mz_storage_types::connections), not credentials vended by the catalog. + """ + base = f"{BIGLAKE_REST_BASE}/extensions/projects/{project}/catalogs" + + # GET returns the catalog if it exists, 404 if not. + try: + urllib.request.urlopen( + biglake_request("GET", f"{base}/{bucket}", token, project) + ) + return + except urllib.error.HTTPError as e: + if e.code != 404: + raise + + create_url = f"{base}?iceberg-catalog-id={urllib.parse.quote(bucket, safe='')}" + req = biglake_request("POST", create_url, token, project) + req.add_header("Content-Type", "application/json") + req.data = json.dumps( + { + "catalog-type": "CATALOG_TYPE_GCS_BUCKET", + "credential-mode": "CREDENTIAL_MODE_END_USER", + } + ).encode() + try: + urllib.request.urlopen(req) + print(f"created BigLake catalog for gs://{bucket}") + except urllib.error.HTTPError as e: + # A concurrent run can create the catalog between our GET and POST. + if e.code == 409: + return + body = e.read().decode("utf-8", errors="replace") + print(f"BigLake catalog create failed: HTTP {e.code}\n{body}") + raise + + +def resolve_warehouse_prefix(token: str, project: str, bucket: str) -> str: + """Return the catalog prefix BigLake assigns to this warehouse. + + Iceberg REST clients call GET /v1/config?warehouse=... before any other + operation. The catalog's response includes an `overrides.prefix` that the + client splices in between `/v1/` and resource paths for every later call: + + {uri}/v1/{prefix}/namespaces/{ns}/tables/{tbl} + + See `RestCatalogConfig::url_prefixed` in iceberg-catalog-rest. + """ + warehouse = f"gs://{bucket}" + url = ( + f"{BIGLAKE_REST_BASE}/v1/config" + f"?warehouse={urllib.parse.quote(warehouse, safe='')}" + ) + with urllib.request.urlopen(biglake_request("GET", url, token, project)) as resp: + config = json.loads(resp.read()) + print(f"BigLake /v1/config response: {json.dumps(config)}") + return config.get("overrides", {}).get("prefix", "") + + +def catalog_url(prefix: str, suffix: str) -> str: + middle = f"{prefix}/" if prefix else "" + return f"{BIGLAKE_REST_BASE}/v1/{middle}{suffix}" + + +def table_url(prefix: str, namespace: str, table: str) -> str: + ns = urllib.parse.quote(namespace, safe="") + tbl = urllib.parse.quote(table, safe="") + return catalog_url(prefix, f"namespaces/{ns}/tables/{tbl}") + + +def namespace_url(prefix: str, namespace: str) -> str: + ns = urllib.parse.quote(namespace, safe="") + return catalog_url(prefix, f"namespaces/{ns}") + + +def create_namespace( + token: str, project: str, prefix: str, namespace: str, *, exist_ok: bool = True +) -> None: + """Create the namespace. BigLake doesn't auto-create on first commit. + + With ``exist_ok`` (the default), a 409 from a namespace that already exists + is treated as success, so this is safe to call repeatedly against a fixed + long-lived namespace. + """ + req = biglake_request("POST", catalog_url(prefix, "namespaces"), token, project) + req.add_header("Content-Type", "application/json") + req.data = json.dumps({"namespace": [namespace]}).encode() + try: + urllib.request.urlopen(req) + except urllib.error.HTTPError as e: + if exist_ok and e.code == 409: + return + raise + + +def bootstrap_namespace(service_account: dict, bucket: str, namespace: str) -> str: + """Ensure the catalog and namespace for ``gs://`` exist. + + Mints a token from ``service_account``, ensures the BigLake catalog for the + bucket exists, resolves the warehouse prefix, and creates ``namespace`` if it + is missing. Idempotent: a no-op once the catalog and namespace exist. Returns + the resolved catalog prefix. + """ + project = service_account["project_id"] + token = mint_gcp_access_token(service_account) + ensure_catalog(token, project, bucket) + prefix = resolve_warehouse_prefix(token, project, bucket) + create_namespace(token, project, prefix, namespace) + print(f"BigLake namespace ready: {namespace} (gs://{bucket})") + return prefix diff --git a/test/canary-environment/README.md b/test/canary-environment/README.md index cd1b5a7248d34..5049b344f138d 100644 --- a/test/canary-environment/README.md +++ b/test/canary-environment/README.md @@ -58,6 +58,12 @@ export CONFLUENT_CLOUD_QA_CANARY_CSR_USERNAME=... export CONFLUENT_CLOUD_QA_CANARY_CSR_PASSWORD=... export CONFLUENT_CLOUD_QA_CANARY_KAFKA_USERNAME=... export CONFLUENT_CLOUD_QA_CANARY_KAFKA_PASSWORD=... + +# Used by the GCS Iceberg sinks, defined in the i2 repository (i2/buildkite.py): +# the base64-encoded GCP service account key JSON and the canary-dedicated GCS +# bucket (no age-based object expiry). +export QA_CANARY_ICEBERG_GCP_SA_JSON_B64=... +export QA_CANARY_ICEBERG_GCS_BUCKET=... ``` ## Clusters diff --git a/test/canary-environment/models/loadgen/customer_gcs_iceberg_sink.sql b/test/canary-environment/models/loadgen/customer_gcs_iceberg_sink.sql new file mode 100644 index 0000000000000..a57ab57417b94 --- /dev/null +++ b/test/canary-environment/models/loadgen/customer_gcs_iceberg_sink.sql @@ -0,0 +1,15 @@ +-- Copyright Materialize, Inc. and contributors. All rights reserved. +-- +-- Use of this software is governed by the Business Source License +-- included in the LICENSE file at the root of this repository. +-- +-- As of the Change Date specified in that file, in accordance with +-- the Business Source License, use of this software will be governed +-- by the Apache License, Version 2.0. + +{{ config(materialized='sink', cluster='qa_canary_environment_storage') }} +FROM {{ ref('customer_tbl') }} +INTO ICEBERG CATALOG CONNECTION qa_canary_gcs_iceberg_catalog (NAMESPACE = 'qa_canary_environment', TABLE = 'customer') +KEY (key) +MODE UPSERT +WITH (COMMIT INTERVAL = '60s'); diff --git a/test/canary-environment/models/loadgen/sales_gcs_iceberg_sink.sql b/test/canary-environment/models/loadgen/sales_gcs_iceberg_sink.sql new file mode 100644 index 0000000000000..2a024d6470df4 --- /dev/null +++ b/test/canary-environment/models/loadgen/sales_gcs_iceberg_sink.sql @@ -0,0 +1,15 @@ +-- Copyright Materialize, Inc. and contributors. All rights reserved. +-- +-- Use of this software is governed by the Business Source License +-- included in the LICENSE file at the root of this repository. +-- +-- As of the Change Date specified in that file, in accordance with +-- the Business Source License, use of this software will be governed +-- by the Apache License, Version 2.0. + +{{ config(materialized='sink', cluster='qa_canary_environment_storage') }} +FROM {{ ref('sales_tbl') }} +INTO ICEBERG CATALOG CONNECTION qa_canary_gcs_iceberg_catalog (NAMESPACE = 'qa_canary_environment', TABLE = 'sales') +KEY (key) +MODE UPSERT +WITH (COMMIT INTERVAL = '60s'); diff --git a/test/canary-environment/models/mysql_cdc/mysql_wmr_gcs_iceberg_sink.sql b/test/canary-environment/models/mysql_cdc/mysql_wmr_gcs_iceberg_sink.sql new file mode 100644 index 0000000000000..7083da256b169 --- /dev/null +++ b/test/canary-environment/models/mysql_cdc/mysql_wmr_gcs_iceberg_sink.sql @@ -0,0 +1,15 @@ +-- Copyright Materialize, Inc. and contributors. All rights reserved. +-- +-- Use of this software is governed by the Business Source License +-- included in the LICENSE file at the root of this repository. +-- +-- As of the Change Date specified in that file, in accordance with +-- the Business Source License, use of this software will be governed +-- by the Apache License, Version 2.0. + +{{ config(materialized='sink', cluster='qa_canary_environment_storage') }} +FROM {{ ref('mysql_wmr') }} +INTO ICEBERG CATALOG CONNECTION qa_canary_gcs_iceberg_catalog (NAMESPACE = 'qa_canary_environment', TABLE = 'mysql_wmr') +KEY (a_name, b_name) NOT ENFORCED +MODE UPSERT +WITH (COMMIT INTERVAL = '60s'); diff --git a/test/canary-environment/models/pg_cdc/pg_relationships_gcs_iceberg_sink.sql b/test/canary-environment/models/pg_cdc/pg_relationships_gcs_iceberg_sink.sql new file mode 100644 index 0000000000000..5abc60ee8ffe1 --- /dev/null +++ b/test/canary-environment/models/pg_cdc/pg_relationships_gcs_iceberg_sink.sql @@ -0,0 +1,15 @@ +-- Copyright Materialize, Inc. and contributors. All rights reserved. +-- +-- Use of this software is governed by the Business Source License +-- included in the LICENSE file at the root of this repository. +-- +-- As of the Change Date specified in that file, in accordance with +-- the Business Source License, use of this software will be governed +-- by the Apache License, Version 2.0. + +{{ config(materialized='sink', cluster='qa_canary_environment_storage') }} +FROM {{ ref('pg_relationships') }} +INTO ICEBERG CATALOG CONNECTION qa_canary_gcs_iceberg_catalog (NAMESPACE = 'qa_canary_environment', TABLE = 'pg_relationships') +KEY (a, b) NOT ENFORCED +MODE UPSERT +WITH (COMMIT INTERVAL = '60s'); diff --git a/test/canary-environment/models/table/table_mv_gcs_iceberg_sink.sql b/test/canary-environment/models/table/table_mv_gcs_iceberg_sink.sql new file mode 100644 index 0000000000000..cbff59c2a4c16 --- /dev/null +++ b/test/canary-environment/models/table/table_mv_gcs_iceberg_sink.sql @@ -0,0 +1,15 @@ +-- Copyright Materialize, Inc. and contributors. All rights reserved. +-- +-- Use of this software is governed by the Business Source License +-- included in the LICENSE file at the root of this repository. +-- +-- As of the Change Date specified in that file, in accordance with +-- the Business Source License, use of this software will be governed +-- by the Apache License, Version 2.0. + +{{ config(materialized='sink', cluster='qa_canary_environment_storage') }} +FROM {{ ref('table_mv') }} +INTO ICEBERG CATALOG CONNECTION qa_canary_gcs_iceberg_catalog (NAMESPACE = 'qa_canary_environment', TABLE = 'table_mv') +KEY (max) +MODE UPSERT +WITH (COMMIT INTERVAL = '60s'); diff --git a/test/canary-environment/models/tpch/tpch_q18_gcs_iceberg_sink.sql b/test/canary-environment/models/tpch/tpch_q18_gcs_iceberg_sink.sql new file mode 100644 index 0000000000000..32a4046b4a3cb --- /dev/null +++ b/test/canary-environment/models/tpch/tpch_q18_gcs_iceberg_sink.sql @@ -0,0 +1,15 @@ +-- Copyright Materialize, Inc. and contributors. All rights reserved. +-- +-- Use of this software is governed by the Business Source License +-- included in the LICENSE file at the root of this repository. +-- +-- As of the Change Date specified in that file, in accordance with +-- the Business Source License, use of this software will be governed +-- by the Apache License, Version 2.0. + +{{ config(materialized='sink', cluster='qa_canary_environment_storage') }} +FROM {{ ref('tpch_q18') }} +INTO ICEBERG CATALOG CONNECTION qa_canary_gcs_iceberg_catalog (NAMESPACE = 'qa_canary_environment', TABLE = 'tpch_18') +KEY (c_custkey) NOT ENFORCED +MODE UPSERT +WITH (COMMIT INTERVAL = '60s'); diff --git a/test/canary-environment/mzcompose.py b/test/canary-environment/mzcompose.py index 76de878a0eff9..d582114b8ec58 100644 --- a/test/canary-environment/mzcompose.py +++ b/test/canary-environment/mzcompose.py @@ -7,12 +7,15 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. +import base64 +import json import os from textwrap import dedent from urllib.parse import quote import psycopg +from materialize.biglake import bootstrap_namespace from materialize.mzcompose.composition import ( Composition, Service, @@ -60,6 +63,12 @@ "CONFLUENT_CLOUD_QA_CANARY_KAFKA_PASSWORD" ) +# Base64-encoded GCP service account key JSON with access to the GCS bucket, +# defined in the i2 repository (i2/buildkite.py). The bucket is dedicated to +# the canary (no age-based object expiry, unlike ICEBERG_GCS_BUCKET). +QA_CANARY_ICEBERG_GCP_SA_JSON_B64 = os.getenv("QA_CANARY_ICEBERG_GCP_SA_JSON_B64") +QA_CANARY_ICEBERG_GCS_BUCKET = os.getenv("QA_CANARY_ICEBERG_GCS_BUCKET") + SERVICES = [ Dbt( @@ -131,6 +140,19 @@ def workflow_create(c: Composition, parser: WorkflowArgumentParser) -> None: WAREHOUSE = 'arn:aws:s3tables:us-east-1:400121260767:bucket/qa-canary-environment', AWS CONNECTION = qa_canary_aws_connection ) + + > CREATE SECRET IF NOT EXISTS gcp_service_account_key AS decode('{QA_CANARY_ICEBERG_GCP_SA_JSON_B64}', 'base64') + + > CREATE CONNECTION IF NOT EXISTS qa_canary_gcp_connection TO GCP ( + SERVICE ACCOUNT KEY = SECRET gcp_service_account_key + ) + + > CREATE CONNECTION IF NOT EXISTS qa_canary_gcs_iceberg_catalog TO ICEBERG CATALOG ( + CATALOG TYPE = 'rest', + URL = 'https://biglake.googleapis.com/iceberg/v1/restcatalog', + WAREHOUSE = 'gs://{QA_CANARY_ICEBERG_GCS_BUCKET}', + GCP CONNECTION = qa_canary_gcp_connection + ) """)) c.testdrive(input=dedent(f""" @@ -355,6 +377,20 @@ def workflow_create(c: Composition, parser: WorkflowArgumentParser) -> None: (100, 10, 'Team Shelter', 'Shelter for the team during games', 199.99); """)) + # The Materialize Iceberg sink creates tables but not namespaces, and + # BigLake does not auto-create them on first commit (unlike the S3 Tables + # bucket, whose `qa_canary_environment` namespace is provisioned alongside + # the bucket). Create the BigLake catalog + namespace out of band before + # `dbt run` creates the GCS Iceberg sinks, or they fail at runtime with + # "Tried to create a table under a namespace that does not exist". + assert QA_CANARY_ICEBERG_GCP_SA_JSON_B64 is not None + assert QA_CANARY_ICEBERG_GCS_BUCKET is not None + bootstrap_namespace( + json.loads(base64.b64decode(QA_CANARY_ICEBERG_GCP_SA_JSON_B64)), + QA_CANARY_ICEBERG_GCS_BUCKET, + "qa_canary_environment", + ) + c.exec( "dbt", "dbt", diff --git a/test/gcp/mzcompose.py b/test/gcp/mzcompose.py index 4a197d2cae27b..a1e7079cd6099 100644 --- a/test/gcp/mzcompose.py +++ b/test/gcp/mzcompose.py @@ -29,18 +29,20 @@ import urllib.request from datetime import datetime, timedelta, timezone -import jwt - +from materialize.biglake import ( + biglake_request, + catalog_url, + create_namespace, + ensure_catalog, + mint_gcp_access_token, + namespace_url, + resolve_warehouse_prefix, + table_url, +) from materialize.mzcompose.composition import Composition from materialize.mzcompose.services.materialized import Materialized from materialize.mzcompose.services.testdrive import Testdrive -# Blanket scope used to mint tokens for both GCS and BigLake. Matches GCP_SCOPE -# in src/storage-types/src/connections/gcp.rs. -GCP_SCOPE = "https://www.googleapis.com/auth/cloud-platform" - -BIGLAKE_REST_BASE = "https://biglake.googleapis.com/iceberg/v1/restcatalog" - # Per-run namespace name format: e2e__. The date lets the # pre-test sweep age out namespaces left behind by killed runs without extra API # calls. @@ -67,131 +69,6 @@ def _require_env(name: str) -> str: return value -def _mint_gcp_access_token(service_account: dict) -> str: - """Mint an OAuth2 access token from a GCP service-account key (JWT bearer flow).""" - now = int(time.time()) - assertion = jwt.encode( - { - "iss": service_account["client_email"], - "scope": GCP_SCOPE, - "aud": "https://oauth2.googleapis.com/token", - "iat": now, - "exp": now + 3600, - }, - service_account["private_key"], - algorithm="RS256", - ) - body = urllib.parse.urlencode( - { - "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", - "assertion": assertion, - } - ).encode() - req = urllib.request.Request( - "https://oauth2.googleapis.com/token", - data=body, - headers={"Content-Type": "application/x-www-form-urlencoded"}, - ) - with urllib.request.urlopen(req) as resp: - return json.loads(resp.read())["access_token"] - - -def _biglake_request( - method: str, url: str, token: str, project: str -) -> urllib.request.Request: - return urllib.request.Request( - url, - method=method, - headers={ - "Authorization": f"Bearer {token}", - "x-goog-user-project": project, - }, - ) - - -def _ensure_biglake_catalog(token: str, project: str, bucket: str) -> None: - """Create the BigLake Iceberg REST catalog for this bucket if it's missing. - - The Iceberg REST `/v1/config?warehouse=gs://` lookup resolves the catalog - whose id equals the bucket name; it does not provision one on demand. Without it, - every REST call returns a sanitized 403. Creating it here lets the test bootstrap - its own catalog instead of depending on out-of-band (Pulumi/console) setup. - - Credential mode is END_USER: the Materialize sink writes to GCS with the service - account's own credentials (see `connect_rest` for the GCP branch in - mz_storage_types::connections), not credentials vended by the catalog. - """ - base = f"{BIGLAKE_REST_BASE}/extensions/projects/{project}/catalogs" - - # GET returns the catalog if it exists, 404 if not. - try: - urllib.request.urlopen( - _biglake_request("GET", f"{base}/{bucket}", token, project) - ) - return - except urllib.error.HTTPError as e: - if e.code != 404: - raise - - create_url = f"{base}?iceberg-catalog-id={urllib.parse.quote(bucket, safe='')}" - req = _biglake_request("POST", create_url, token, project) - req.add_header("Content-Type", "application/json") - req.data = json.dumps( - { - "catalog-type": "CATALOG_TYPE_GCS_BUCKET", - "credential-mode": "CREDENTIAL_MODE_END_USER", - } - ).encode() - try: - urllib.request.urlopen(req) - print(f"created BigLake catalog for gs://{bucket}") - except urllib.error.HTTPError as e: - # A concurrent run can create the catalog between our GET and POST. - if e.code == 409: - return - body = e.read().decode("utf-8", errors="replace") - print(f"BigLake catalog create failed: HTTP {e.code}\n{body}") - raise - - -def _resolve_warehouse_prefix(token: str, project: str, bucket: str) -> str: - """Return the catalog prefix BigLake assigns to this warehouse. - - Iceberg REST clients call GET /v1/config?warehouse=... before any other - operation. The catalog's response includes an `overrides.prefix` that the - client splices in between `/v1/` and resource paths for every later call: - - {uri}/v1/{prefix}/namespaces/{ns}/tables/{tbl} - - See `RestCatalogConfig::url_prefixed` in iceberg-catalog-rest. - """ - warehouse = f"gs://{bucket}" - url = ( - f"{BIGLAKE_REST_BASE}/v1/config" - f"?warehouse={urllib.parse.quote(warehouse, safe='')}" - ) - with urllib.request.urlopen(_biglake_request("GET", url, token, project)) as resp: - config = json.loads(resp.read()) - print(f"BigLake /v1/config response: {json.dumps(config)}") - return config.get("overrides", {}).get("prefix", "") - - -def _catalog_url(prefix: str, suffix: str) -> str: - middle = f"{prefix}/" if prefix else "" - return f"{BIGLAKE_REST_BASE}/v1/{middle}{suffix}" - - -def _table_url(prefix: str, namespace: str, table: str) -> str: - ns = urllib.parse.quote(namespace, safe="") - tbl = urllib.parse.quote(table, safe="") - return _catalog_url(prefix, f"namespaces/{ns}/tables/{tbl}") - - -def _namespace_url(prefix: str, namespace: str) -> str: - ns = urllib.parse.quote(namespace, safe="") - return _catalog_url(prefix, f"namespaces/{ns}") - - def _verify_sink_committed( token: str, project: str, @@ -212,7 +89,7 @@ def _verify_sink_committed( `main` / `v1.5-variegata`. Our workspace duckdb is pinned to 1.4.x, whose iceberg extension branch (`v1.4-andium`) does not have the backport. """ - url = _table_url(prefix, namespace, table) + url = table_url(prefix, namespace, table) print(f"BigLake GET table URL: {url}") deadline = time.time() + 60 @@ -220,7 +97,7 @@ def _verify_sink_committed( while time.time() < deadline: try: with urllib.request.urlopen( - _biglake_request("GET", url, token, project) + biglake_request("GET", url, token, project) ) as resp: body = json.loads(resp.read()) except urllib.error.HTTPError as e: @@ -265,20 +142,10 @@ def _verify_sink_committed( ) -def _create_biglake_namespace( - token: str, project: str, prefix: str, namespace: str -) -> None: - """Create the namespace. BigLake doesn't auto-create on first commit.""" - req = _biglake_request("POST", _catalog_url(prefix, "namespaces"), token, project) - req.add_header("Content-Type", "application/json") - req.data = json.dumps({"namespace": [namespace]}).encode() - urllib.request.urlopen(req) - - def _delete_biglake(method_url: str, token: str, project: str) -> None: """DELETE a BigLake resource, tolerating 404 so cleanup is idempotent.""" try: - urllib.request.urlopen(_biglake_request("DELETE", method_url, token, project)) + urllib.request.urlopen(biglake_request("DELETE", method_url, token, project)) except urllib.error.HTTPError as e: if e.code != 404: raise @@ -293,7 +160,7 @@ def _paginated_get(url: str, token: str, project: str, items_key: str): sep = "&" if "?" in page_url else "?" page_url = f"{page_url}{sep}pageToken={urllib.parse.quote(page_token)}" with urllib.request.urlopen( - _biglake_request("GET", page_url, token, project) + biglake_request("GET", page_url, token, project) ) as resp: body = json.loads(resp.read()) yield from body.get(items_key, []) @@ -307,7 +174,7 @@ def _list_biglake_namespaces(token: str, project: str, prefix: str) -> list[str] return [ ns[0] for ns in _paginated_get( - _catalog_url(prefix, "namespaces"), token, project, "namespaces" + catalog_url(prefix, "namespaces"), token, project, "namespaces" ) if len(ns) == 1 ] @@ -319,7 +186,7 @@ def _list_biglake_tables( return [ ident["name"] for ident in _paginated_get( - _catalog_url( + catalog_url( prefix, f"namespaces/{urllib.parse.quote(namespace, safe='')}/tables" ), token, @@ -349,8 +216,8 @@ def _sweep_stale_biglake_namespaces(token: str, project: str, prefix: str) -> No print(f"sweeping stale BigLake namespace: {ns} (age {age})") try: for tbl in _list_biglake_tables(token, project, prefix, ns): - _delete_biglake(_table_url(prefix, ns, tbl), token, project) - _delete_biglake(_namespace_url(prefix, ns), token, project) + _delete_biglake(table_url(prefix, ns, tbl), token, project) + _delete_biglake(namespace_url(prefix, ns), token, project) except Exception as e: # Keep sweeping; a single bad namespace shouldn't block the test. print(f"warning: failed to sweep namespace {ns}: {e}") @@ -388,18 +255,18 @@ def workflow_default(c: Composition) -> None: # Mint once and reuse for verification + cleanup. Tokens last an hour; # minting up front also fails fast if the service-account key is broken. - token = _mint_gcp_access_token(service_account) + token = mint_gcp_access_token(service_account) # Bootstrap the catalog if absent; /v1/config 403s otherwise. - _ensure_biglake_catalog(token, project, bucket) + ensure_catalog(token, project, bucket) # Discover the per-warehouse catalog prefix once; it's identical for # the verify and cleanup paths. - prefix = _resolve_warehouse_prefix(token, project, bucket) + prefix = resolve_warehouse_prefix(token, project, bucket) # Garbage-collect namespaces from previous runs that were killed before # their `finally` could run. Best-effort; logged failures don't block. _sweep_stale_biglake_namespaces(token, project, prefix) # BigLake doesn't auto-create namespaces on first commit, so we have to # pre-create (matching how the AWS test pre-creates the S3 Tables namespace). - _create_biglake_namespace(token, project, prefix, namespace) + create_namespace(token, project, prefix, namespace) try: c.run_testdrive_files( @@ -426,8 +293,8 @@ def workflow_default(c: Composition) -> None: c.sql("DROP SINK demo;") finally: try: - _delete_biglake(_table_url(prefix, namespace, table), token, project) - _delete_biglake(_namespace_url(prefix, namespace), token, project) + _delete_biglake(table_url(prefix, namespace, table), token, project) + _delete_biglake(namespace_url(prefix, namespace), token, project) except Exception as cleanup_error: # Don't mask the real failure if there was one. print(f"warning: BigLake cleanup failed: {cleanup_error}")