Skip to content

feat: BigQuery Data Warehouse Connector — Phase 1 (Schema, Types, Validation)#391

Open
Shrotriya-lalit wants to merge 8 commits into
Openpanel-dev:mainfrom
Shrotriya-lalit:feat/bigquery-connector-phase1
Open

feat: BigQuery Data Warehouse Connector — Phase 1 (Schema, Types, Validation)#391
Shrotriya-lalit wants to merge 8 commits into
Openpanel-dev:mainfrom
Shrotriya-lalit:feat/bigquery-connector-phase1

Conversation

@Shrotriya-lalit

@Shrotriya-lalit Shrotriya-lalit commented Jun 8, 2026

Copy link
Copy Markdown

BigQuery Data Warehouse Connector — Phase 1: Schema, Types, Validation

Adds the foundational schema for a multi-provider Data Warehouse Connector — BigQuery first, with Snowflake, Redshift, Databricks, and Postgres ready to be added without any new tables.


What's in this PR

Three shared Prisma tables

Table Purpose
warehouse_connections One row per named connection (any provider)
warehouse_syncs One sync job per connection — source table, column mapping, schedule
warehouse_sync_runs One run record per execution — row count, bytes, status, errors

Enums

  • WarehouseType: bigquery, snowflake, redshift, databricks, postgres
  • WarehouseSyncMode: append, full, onetime
  • WarehouseSyncSchedule: hourly, daily, weekly
  • WarehouseSyncRunStatus: pending, running, completed, failed
  • WarehouseSyncMappingType: events, profiles

Key design decisions

Security

  • configEncrypted (AES-256-GCM) stores provider credentials — same encrypt()/decrypt() as GSC connector
  • displayIdentifier + displayEmail extracted at creation time as plain text — connection list never requires decryption
  • Composite FK warehouse_syncs(projectId, connectionId) → warehouse_connections(projectId, id) — blocks cross-tenant exploit at DB level (project A cannot reference project B's connection)
  • name_nonempty_check constraint — DB-level guard against empty connection names

Sync modes

  • append — cursor-based incremental via insertTime column; cursorOverlapMinutes (default 10) rewinds cursor to catch late-arriving rows; dedup handles re-ingested events
  • full — full reload every run + stale event cleanup via __op_sync_id stamp (handles BQ row deletions and updates)
  • onetime — historical backfill, runs once then disables automatically

Operational fields

  • cursorOverlapMinutes Int @default(10) — late-arriving row safety window for append mode
  • syncDelayMinutes Int @default(0) — delay after cron fires before executing (allows upstream BQ pipelines to land)
  • errorRetryCount + isErrorPaused — auto-pause circuit breaker
  • failureCount BigInt — tracks individually failed rows (bad data/type mismatch), separate from rowCount
  • bytesProcessed BigInt — tracks GCP billing units per run
  • createdBy String? — userId of sync creator for avatar display in UI

Performance

  • Explicit indexes on all FK columns — PostgreSQL does not auto-create these
    • warehouse_syncs(projectId), warehouse_syncs(connectionId), warehouse_sync_runs(syncId)

Validation (packages/validation/src/index.ts)

  • zBigQueryWarehouseConfig — GCP project ID + region + SA JSON (validated for required fields)
  • zWarehouseConfig — discriminated union on type, ready for Snowflake arm
  • zBigQueryColumnMappingEvents — all event column fields: profileId, deviceId, eventId, eventName, eventNameStatic, timestamp, insertTime, revenue, jsonProperties
  • zBigQueryColumnMappingProfilesprofileIdColumn, firstName, lastName, email, avatar, createdAt, jsonProperties
  • zBigQuerySyncConfig with superRefine cross-field rules:
    • schedule required for append and full modes, not for onetime
    • insertTime required when syncMode = 'append' with events mapping
    • eventName and eventNameStatic are mutually exclusive
  • jsonProperties on both mappings — maps a BQ JSON column whose keys are flattened into event/profile properties at ingest time

Migrations applied

Migration What it does
20260610115042_warehouse_restructure 3 tables + 5 enums
20260610115043_warehouse_security_fks Composite FK + name_nonempty_check
20260610115044_warehouse_phase1_finalize 3 FK indexes + failureCount + createdBy
20260610115045_warehouse_onetime_mode onetime enum value + schedule made nullable
20260610115046_warehouse_sync_overlap_delay cursorOverlapMinutes + syncDelayMinutes

Verification

  • prisma migrate diff — no schema drift
  • ✅ 21/21 Zod validator probes pass
  • ✅ All FK constraints + composite FK confirmed in live DB
  • ✅ Cross-tenant attack blocked (composite FK probe)
  • name_nonempty_check blocks empty names at DB level
  • ✅ All 3 performance indexes present

What comes next

Phase Title
Phase 2 Connection Management — SA JSON paste, test connection, rotate key, settings tab UI
Phase 3 Sync Configuration — add-sync modal, column mapping UI, sync list
Phase 4 Sync Execution — BullMQ worker, ClickHouse write, manual trigger
Phase 5 Cron Scheduling — hourly cron, isSyncDue(), syncDelayMinutes
Phase 6 Run History UI — live polling, status badges, interrupted run detection

Summary by CodeRabbit

  • New Features

    • Provider-agnostic Warehouse integrations with named connections per project, BigQuery support, dataset/table syncs, mapping/modes (including one-time), schedules, partition filters, overlap and delay controls, region/health metadata.
    • Enhanced sync runs and metrics: statuses, retries, error pause, failure counts, row/byte metrics, richer diagnostics.
    • Event metadata now supports descriptions.
    • Validation added for GCP/BigQuery IDs, service-account JSON, and column-mapping configs.
  • Bug Fixes

    • Stronger referential, uniqueness, and non-empty constraints to improve tenant isolation and data integrity.

Phase 1 of the BigQuery warehouse connector. Adds Prisma models for
BigQueryConnection, BigQuerySync and BigQuerySyncRun with four supporting
enums, two Prisma migrations, typed JSON column via PrismaJson namespace,
Zod schemas (zBigQuerySyncConfig + column mapping variants) in
@openpanel/validation, and the @google-cloud/bigquery dependency.
- Add zBqColRef validator for column references (supports dot-notation
  for STRUCT nested fields like user.profile.email)
- Add mappingType discriminator to both mapping schemas so the union is
  discriminated and TypeScript can narrow the type cleanly
- Add superRefine cross-validation: append mode events syncs must declare
  an insertTime column (the TIMESTAMP cursor)
- Update plan with verified BigQuery Node.js client type mappings:
  INT64 needs wrapIntegers:true, TIMESTAMP/.value not .toISOString(),
  DATETIME has no timezone, BYTES→Buffer, Big for NUMERIC/BIGNUMERIC
Real-world orgs connect multiple data sources to one project (e.g.
jm-ebg and jm-ebg-cdp on the same ROAS project). The original
@unique on projectId wrongly enforced a single connection per project.

- Remove @unique from BigQueryConnection.projectId
- Add name String field (user label, e.g. "CDP Source")
- Replace with @@unique([projectId, name]) — names unique within project
- Change Project.bigQueryConnection? → bigQueryConnections[]
Schema additions:
- BigQueryConnection: gcpRegion (GDPR region compliance), lastTestedAt/lastTestStatus (connection health)
- BigQuerySync: lastSyncStatus typed as enum (was String?), errorRetryCount+isErrorPaused circuit breaker, partitionFilter for cost-safe full-refresh on partitioned tables
- BigQuerySyncRun: rowCount BigInt (INT max ~2.1B insufficient), bytesProcessed for cost tracking

Zod additions:
- zGcpProjectId: GCP project ID format regex (rejects project numbers and display names)
- zBqIdentifier: dataset/table name validator (no hyphens, per BQ naming rules)
- zServiceAccountJson: SA JSON structure check (rejects authorized_user creds before encryption)
- zBigQueryConnectionCreate: connection creation schema with name/region/SA JSON
- zBigQuerySyncConfig: dataset/tableName now use zBqIdentifier, partitionFilter field added
@coderabbitai

coderabbitai Bot commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds BigQuery connector migrations, then generalizes them into a provider-agnostic Warehouse schema; updates Prisma models/types; adds Zod validation for warehouse/BigQuery configs and mappings; and adds the BigQuery runtime dependency.

Changes

Warehouse & BigQuery connector

Layer / File(s) Summary
Prisma schema updates
packages/db/prisma/schema.prisma
Adds Project relations, EventMeta.description, and new WarehouseConnection/WarehouseSync/WarehouseSyncRun models plus enums.
EventMeta migration
packages/db/prisma/migrations/20260607120000_add_event_meta_description/migration.sql
Adds nullable description column to public.event_meta.
BigQuery foundation schema
packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql
Introduces BigQuery enums and creates bigquery_connections, bigquery_syncs, bigquery_sync_runs tables with FKs and an index.
BigQuery schema evolution
packages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sql, packages/db/prisma/migrations/20260608120000_bigquery_harden/migration.sql
Switches to named connections, adds region/health/retry/partition fields, converts status to enum, and widens run counters + bytesProcessed.
BigQuery referential integrity
packages/db/prisma/migrations/20260608140000_bigquery_referential_integrity/migration.sql
Adds FK from bigquery_sync_runs.projectId to projects, composite FK/index to enforce tenant isolation, backfills empty names, and adds CHECK on names.
Warehouse restructure
packages/db/prisma/migrations/20260610115042_warehouse_restructure/migration.sql
Drops BigQuery-specific types/tables and creates provider-agnostic warehouse_* enums and tables with constraints and composite FKs.
Warehouse FK & finalization
packages/db/prisma/migrations/20260610115043_warehouse_security_fks/migration.sql, packages/db/prisma/migrations/20260610115044_warehouse_phase1_finalize/migration.sql
Removes single-column FK in favor of composite FK; adds indexes, failureCount, and createdBy.
Modes and scheduling tweaks
packages/db/prisma/migrations/20260610115045_warehouse_onetime_mode/migration.sql, packages/db/prisma/migrations/20260610115046_warehouse_sync_overlap_delay/migration.sql
Adds onetime mode (makes schedule nullable) and cursorOverlapMinutes/syncDelayMinutes.
Warehouse run composite FK
packages/db/prisma/migrations/20260611000000_warehouse_run_composite_fk/migration.sql
Enforces composite FK (syncId, projectId) on warehouse_sync_runs to prevent cross-tenant references.
Validation schemas and types
packages/validation/src/index.ts
Adds GCP/BigQuery identifier validators, service account JSON parsing, BigQuery warehouse connection/config schemas, column mapping schemas, zBigQuerySyncConfig with cross-field rules, and exports inferred TS types.
Dependencies and type wiring
packages/db/package.json, packages/db/src/types.ts
Adds @google-cloud/bigquery dependency and exposes warehouse column-mapping type in global PrismaJson namespace.
sequenceDiagram
  participant Client
  participant API
  participant Database
  participant WarehouseProvider
  Client->>API: request create/trigger sync (projectId, connectionName)
  API->>Database: read WarehouseConnection / WarehouseSync
  API->>WarehouseProvider: authenticate using serviceAccountJson / run extract
  WarehouseProvider->>API: return rows and bytesProcessed
  API->>Database: insert WarehouseSyncRun (status, rowCount, bytesProcessed)
Loading

🎯 4 (Complex) | ⏱️ ~45 minutes

🐰 I tugged at schema roots tonight,
New names and checks lined up in rows,
Validators munched the service-key light,
Migrations marched where the connector goes,
Sync runs counting bytes beneath moon-glows.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title clearly and specifically describes the main change: introducing a BigQuery Data Warehouse Connector with foundational schema, types, and validation for Phase 1.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/db/package.json`:
- Line 17: Update the `@google-cloud/bigquery` dependency in
packages/db/package.json from ^7.9.1 to ^8.3.1 (look for the dependency key
"`@google-cloud/bigquery`") and run your test suite and any integration checks
that exercise BigQuery calls to catch breaking API changes; after updating,
re-run npm audit / GitHub advisory checks and address any newly surfaced
advisories or required code adjustments in functions that call BigQuery client
methods (e.g., places creating BigQuery clients or invoking methods like
dataset/table/query) to align with the v8 API.

In
`@packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql`:
- Around line 50-53: The migration creates bigquery_sync_runs with a projectId
column but no FK, allowing runs to be assigned to the wrong project; add
referential integrity by adding a foreign key on bigquery_sync_runs.projectId
referencing the canonical projects table (projects.id) and also ensure
run-to-sync consistency by adding a composite foreign key (syncId, projectId)
referencing bigquery_syncs(id, projectId) (first add a unique constraint on
bigquery_syncs(id, projectId) if needed); update the CREATE TABLE for
bigquery_sync_runs to include these constraints (names like
fk_bigquery_sync_runs_project and fk_bigquery_sync_runs_sync_project) so every
run is tied to an existing project and the syncId matches that project.
- Around line 69-73: The two separate FKs allow mismatched project vs connection
tenants on bigquery_syncs; drop the existing bigquery_syncs_connectionId_fkey
and replace it with a composite foreign key (e.g.
bigquery_syncs_projectId_connectionId_fkey) on (projectId, connectionId) that
REFERENCES "public"."bigquery_connections"(projectId, id) WITH ON DELETE CASCADE
ON UPDATE CASCADE so the referenced connection must belong to the same project;
keep or leave the existing projectId -> projects FK
(bigquery_syncs_projectId_fkey) as-is.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 90092f45-30c5-4c2c-aaa2-84bd90a711d7

📥 Commits

Reviewing files that changed from the base of the PR and between b94d256 and 154e5e2.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (8)
  • packages/db/package.json
  • packages/db/prisma/migrations/20260607120000_add_event_meta_description/migration.sql
  • packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql
  • packages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sql
  • packages/db/prisma/migrations/20260608120000_bigquery_harden/migration.sql
  • packages/db/prisma/schema.prisma
  • packages/db/src/types.ts
  • packages/validation/src/index.ts

Comment thread packages/db/package.json Outdated
Comment on lines +50 to +53
CREATE TABLE "public"."bigquery_sync_runs" (
"id" UUID NOT NULL DEFAULT gen_random_uuid(),
"syncId" UUID NOT NULL,
"projectId" TEXT NOT NULL,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Constrain bigquery_sync_runs.projectId to prevent mismatched run attribution.

Line 53 stores projectId, but there is no FK for it, and Line 76 only validates syncId. A run row can carry a wrong project id, breaking run-history correctness and tenant-scoped queries.

Suggested migration direction
+-- Keep sync/project coupling enforceable
+ALTER TABLE "public"."bigquery_syncs"
+  ADD CONSTRAINT "bigquery_syncs_id_projectId_key" UNIQUE ("id", "projectId");
+
+-- Enforce run project consistency with parent sync
+ALTER TABLE "public"."bigquery_sync_runs"
+  ADD CONSTRAINT "bigquery_sync_runs_syncId_projectId_fkey"
+  FOREIGN KEY ("syncId", "projectId")
+  REFERENCES "public"."bigquery_syncs"("id", "projectId")
+  ON DELETE CASCADE ON UPDATE CASCADE;
+
+-- Optional explicit FK for direct project integrity
+ALTER TABLE "public"."bigquery_sync_runs"
+  ADD CONSTRAINT "bigquery_sync_runs_projectId_fkey"
+  FOREIGN KEY ("projectId")
+  REFERENCES "public"."projects"("id")
+  ON DELETE CASCADE ON UPDATE CASCADE;

Also applies to: 75-76

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql`
around lines 50 - 53, The migration creates bigquery_sync_runs with a projectId
column but no FK, allowing runs to be assigned to the wrong project; add
referential integrity by adding a foreign key on bigquery_sync_runs.projectId
referencing the canonical projects table (projects.id) and also ensure
run-to-sync consistency by adding a composite foreign key (syncId, projectId)
referencing bigquery_syncs(id, projectId) (first add a unique constraint on
bigquery_syncs(id, projectId) if needed); update the CREATE TABLE for
bigquery_sync_runs to include these constraints (names like
fk_bigquery_sync_runs_project and fk_bigquery_sync_runs_sync_project) so every
run is tied to an existing project and the syncId matches that project.

Comment on lines +69 to +73
-- AddForeignKey
ALTER TABLE "public"."bigquery_syncs" ADD CONSTRAINT "bigquery_syncs_connectionId_fkey" FOREIGN KEY ("connectionId") REFERENCES "public"."bigquery_connections"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "public"."bigquery_syncs" ADD CONSTRAINT "bigquery_syncs_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "public"."projects"("id") ON DELETE CASCADE ON UPDATE CASCADE;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Enforce project/connection tenant consistency on bigquery_syncs.

Line 70 and Line 73 create independent foreign keys, so a sync can reference a connectionId from project B while storing projectId from project A. That breaks tenant isolation and can misroute data.

Suggested migration direction
+-- Ensure referenced columns are jointly unique
+ALTER TABLE "public"."bigquery_connections"
+  ADD CONSTRAINT "bigquery_connections_id_projectId_key" UNIQUE ("id", "projectId");
+
+-- Enforce sync belongs to the same project as its connection
+ALTER TABLE "public"."bigquery_syncs"
+  ADD CONSTRAINT "bigquery_syncs_connectionId_projectId_fkey"
+  FOREIGN KEY ("connectionId", "projectId")
+  REFERENCES "public"."bigquery_connections"("id", "projectId")
+  ON DELETE CASCADE ON UPDATE CASCADE;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql`
around lines 69 - 73, The two separate FKs allow mismatched project vs
connection tenants on bigquery_syncs; drop the existing
bigquery_syncs_connectionId_fkey and replace it with a composite foreign key
(e.g. bigquery_syncs_projectId_connectionId_fkey) on (projectId, connectionId)
that REFERENCES "public"."bigquery_connections"(projectId, id) WITH ON DELETE
CASCADE ON UPDATE CASCADE so the referenced connection must belong to the same
project; keep or leave the existing projectId -> projects FK
(bigquery_syncs_projectId_fkey) as-is.

@Shrotriya-lalit Shrotriya-lalit force-pushed the feat/bigquery-connector-phase1 branch from 154e5e2 to 7920219 Compare June 8, 2026 10:24

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@packages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sql`:
- Around line 6-9: The migration creates bigquery_connections.name with an
empty-string backfill which leaves existing rows with '' and still allows future
empty-string inserts; update the migration to backfill a meaningful non-empty
value for existing rows (e.g., update "bigquery_connections" set "name" =
concat('connection_', id) or another deterministic non-empty token for rows
where name = '') and then add a DB-level constraint to prevent empty names
(e.g., ALTER TABLE "bigquery_connections" ADD CONSTRAINT ... CHECK
(char_length(name) > 0) or name <> '') so future inserts cannot use ''. Ensure
the ALTER that drops the default comes after the backfill and the CHECK
constraint is added as part of the migration so both existing and new rows are
validated.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ce3b42d9-fd05-4483-8001-e2ff1812b45c

📥 Commits

Reviewing files that changed from the base of the PR and between 154e5e2 and 7920219.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (8)
  • packages/db/package.json
  • packages/db/prisma/migrations/20260607120000_add_event_meta_description/migration.sql
  • packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql
  • packages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sql
  • packages/db/prisma/migrations/20260608120000_bigquery_harden/migration.sql
  • packages/db/prisma/schema.prisma
  • packages/db/src/types.ts
  • packages/validation/src/index.ts
✅ Files skipped from review due to trivial changes (1)
  • packages/db/src/types.ts
🚧 Files skipped from review as they are similar to previous changes (6)
  • packages/db/package.json
  • packages/db/prisma/migrations/20260607120000_add_event_meta_description/migration.sql
  • packages/db/prisma/schema.prisma
  • packages/validation/src/index.ts
  • packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql
  • packages/db/prisma/migrations/20260608120000_bigquery_harden/migration.sql

Comment on lines +6 to +9
ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT NOT NULL DEFAULT '';

-- Remove the default now that column exists
ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" DROP DEFAULT;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Backfill leaves invalid empty names and the DB still permits future empty-string inserts.

Line 6 seeds existing rows with '', and after Line 9 those rows remain empty. That conflicts with the non-empty connection-name contract in validation and can leak invalid records into future flows. Please backfill to a non-empty value and enforce non-empty at the database layer.

Suggested migration adjustment
-ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT NOT NULL DEFAULT '';
-
--- Remove the default now that column exists
-ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" DROP DEFAULT;
+ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT;
+UPDATE "public"."bigquery_connections"
+SET "name" = 'default'
+WHERE "name" IS NULL OR btrim("name") = '';
+ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" SET NOT NULL;
+ALTER TABLE "public"."bigquery_connections"
+ADD CONSTRAINT "bigquery_connections_name_nonempty_chk"
+CHECK (btrim("name") <> '');
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT NOT NULL DEFAULT '';
-- Remove the default now that column exists
ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" DROP DEFAULT;
ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT;
UPDATE "public"."bigquery_connections"
SET "name" = 'default'
WHERE "name" IS NULL OR btrim("name") = '';
ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" SET NOT NULL;
ALTER TABLE "public"."bigquery_connections"
ADD CONSTRAINT "bigquery_connections_name_nonempty_chk"
CHECK (btrim("name") <> '');
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@packages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sql`
around lines 6 - 9, The migration creates bigquery_connections.name with an
empty-string backfill which leaves existing rows with '' and still allows future
empty-string inserts; update the migration to backfill a meaningful non-empty
value for existing rows (e.g., update "bigquery_connections" set "name" =
concat('connection_', id) or another deterministic non-empty token for rows
where name = '') and then add a DB-level constraint to prevent empty names
(e.g., ALTER TABLE "bigquery_connections" ADD CONSTRAINT ... CHECK
(char_length(name) > 0) or name <> '') so future inserts cannot use ''. Ensure
the ALTER that drops the default comes after the backfill and the CHECK
constraint is added as part of the migration so both existing and new rows are
validated.

…d dependency upgrade

- Upgrade @google-cloud/bigquery from ^7.9.1 to ^8.3.1 (latest stable)
- Add FK bigquery_sync_runs.projectId -> projects(id) (was missing, allowing orphan runs)
- Add composite FK bigquery_syncs(projectId, connectionId) -> bigquery_connections(projectId, id)
  to prevent cross-tenant data: a sync can no longer reference a connection from a different project
- Add UNIQUE INDEX bigquery_connections(projectId, id) to back the composite FK
- Add CHECK(char_length(name) > 0) on bigquery_connections to enforce non-empty names at DB level
- Backfill any dev rows with empty name using concat('connection_', id)
…Zod validators

Adds the foundational schema for a multi-provider Data Warehouse Connector
(BigQuery first, extensible to Snowflake/Redshift/Databricks/Postgres).

Three shared Prisma tables:
- warehouse_connections: one row per named connection, any provider
- warehouse_syncs: one sync job per connection
- warehouse_sync_runs: one run record per execution

Key design decisions:
- configEncrypted (AES-256-GCM) + displayIdentifier/displayEmail for
  UI display without decryption
- Composite FK warehouse_syncs(projectId,connectionId) →
  warehouse_connections(projectId,id) blocks cross-tenant exploit at DB level
- Three sync modes: append (cursor), full (reload + stale cleanup), onetime (backfill)
- cursorOverlapMinutes (default 10) rewinds append cursor to catch late-arriving rows
- syncDelayMinutes (default 0) delays cron execution to allow BQ pipelines to land
- Performance indexes on all FK columns (PostgreSQL does not create these automatically)
- jsonProperties column mapping flattens a JSON column into event/profile properties

Validation (packages/validation/src/index.ts):
- zBigQueryWarehouseConfig + zWarehouseConfig discriminated union
- zBigQuerySyncConfig with superRefine cross-field rules:
  schedule required for append/full, insertTime required for append,
  eventName/eventNameStatic mutually exclusive

Migrations applied: 20260610115042–20260610115046
21/21 Zod validator probes pass. Zero schema drift (prisma migrate diff).

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
packages/db/prisma/schema.prisma (1)

794-810: ⚖️ Poor tradeoff

Consider adding composite FK for tenant isolation consistency.

WarehouseSync uses a composite FK (projectId, connectionId) → WarehouseConnection(projectId, id) to prevent cross-tenant references. WarehouseSyncRun has separate single-column FKs for syncId and projectId, so the database does not enforce that projectId matches the parent sync's projectId.

Since runs are created by backend workers (not user input), the risk is limited to application bugs, not direct exploitation. However, adding a composite FK would maintain consistency with the sync→connection pattern and prevent any future data integrity drift.

This would require:

  1. A unique index on WarehouseSync(projectId, id) (similar to what exists on WarehouseConnection)
  2. A composite FK on WarehouseSyncRun(projectId, syncId) → WarehouseSync(projectId, id)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/db/prisma/schema.prisma` around lines 794 - 810, Add a composite
foreign-key relation to enforce tenant isolation by making WarehouseSync expose
a unique (or indexed+unique) key on (projectId, id) and updating
WarehouseSyncRun to reference WarehouseSync via the pair (projectId, syncId);
specifically, add a @@unique or @@index+@@unique for WarehouseSync on
(projectId, id) (if not already present) and change the relation in
WarehouseSyncRun to use relation(fields: [projectId, syncId], references:
[projectId, id]) so the run row cannot reference a sync from a different project
(update the relation name/attributes on the WarehouseSyncRun model accordingly
and remove the single-column-only FK behavior).
packages/db/src/types.ts (1)

31-31: ⚡ Quick win

Naming inconsistency: IWarehouseColumnMapping lacks IPrisma prefix.

All other types in the PrismaJson namespace follow the pattern IPrisma<TypeName> (e.g., IPrismaImportConfig, IPrismaNotificationRuleConfig), but this new type is named IWarehouseColumnMapping without the prefix.

While this works (because it matches the Prisma schema annotation /// [IWarehouseColumnMapping]), the inconsistency may confuse maintainers. Consider renaming both the schema annotation and this type to IPrismaWarehouseColumnMapping to maintain consistency with the established pattern.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/db/src/types.ts` at line 31, Rename the inconsistent type and its
Prisma schema annotation to use the established IPrisma prefix: change the alias
type IWarehouseColumnMapping (currently pointing to IWarehouseColumnMappingType)
to IPrismaWarehouseColumnMapping and update the underlying type name if needed
(IWarehouseColumnMappingType → IPrismaWarehouseColumnMappingType or keep
original type but rename the alias). Also update the Prisma schema annotation
/// [IWarehouseColumnMapping] to /// [IPrismaWarehouseColumnMapping] and
search/replace any references to IWarehouseColumnMapping (in the PrismaJson
namespace and across the codebase) so all usages reference
IPrismaWarehouseColumnMapping to maintain naming consistency.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@packages/db/prisma/schema.prisma`:
- Around line 794-810: Add a composite foreign-key relation to enforce tenant
isolation by making WarehouseSync expose a unique (or indexed+unique) key on
(projectId, id) and updating WarehouseSyncRun to reference WarehouseSync via the
pair (projectId, syncId); specifically, add a @@unique or @@index+@@unique for
WarehouseSync on (projectId, id) (if not already present) and change the
relation in WarehouseSyncRun to use relation(fields: [projectId, syncId],
references: [projectId, id]) so the run row cannot reference a sync from a
different project (update the relation name/attributes on the WarehouseSyncRun
model accordingly and remove the single-column-only FK behavior).

In `@packages/db/src/types.ts`:
- Line 31: Rename the inconsistent type and its Prisma schema annotation to use
the established IPrisma prefix: change the alias type IWarehouseColumnMapping
(currently pointing to IWarehouseColumnMappingType) to
IPrismaWarehouseColumnMapping and update the underlying type name if needed
(IWarehouseColumnMappingType → IPrismaWarehouseColumnMappingType or keep
original type but rename the alias). Also update the Prisma schema annotation
/// [IWarehouseColumnMapping] to /// [IPrismaWarehouseColumnMapping] and
search/replace any references to IWarehouseColumnMapping (in the PrismaJson
namespace and across the codebase) so all usages reference
IPrismaWarehouseColumnMapping to maintain naming consistency.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: cdddf040-421e-4c69-afaf-2bcb03567596

📥 Commits

Reviewing files that changed from the base of the PR and between 632bde3 and ca03135.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (8)
  • packages/db/prisma/migrations/20260610115042_warehouse_restructure/migration.sql
  • packages/db/prisma/migrations/20260610115043_warehouse_security_fks/migration.sql
  • packages/db/prisma/migrations/20260610115044_warehouse_phase1_finalize/migration.sql
  • packages/db/prisma/migrations/20260610115045_warehouse_onetime_mode/migration.sql
  • packages/db/prisma/migrations/20260610115046_warehouse_sync_overlap_delay/migration.sql
  • packages/db/prisma/schema.prisma
  • packages/db/src/types.ts
  • packages/validation/src/index.ts
✅ Files skipped from review due to trivial changes (2)
  • packages/db/prisma/migrations/20260610115046_warehouse_sync_overlap_delay/migration.sql
  • packages/db/prisma/migrations/20260610115045_warehouse_onetime_mode/migration.sql

…v#391

- Rename PrismaJson type alias IWarehouseColumnMapping →
  IPrismaWarehouseColumnMapping to match the established IPrisma* prefix
  convention used by all other types in the namespace
- Update schema.prisma annotation to match (/// [IPrismaWarehouseColumnMapping])
- Add @@unique([id, projectId]) on WarehouseSync to back a composite FK
- Replace single-column syncId FK on WarehouseSyncRun with composite
  (syncId, projectId) → warehouse_syncs(id, projectId) to enforce that
  a run's projectId always matches its parent sync's projectId — closes
  the same cross-tenant gap that the existing composite FK on WarehouseSync
  already closes one level up

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@packages/db/prisma/migrations/20260611000000_warehouse_run_composite_fk/migration.sql`:
- Around line 14-18: Before adding the composite FK ("syncId","projectId") →
warehouse_syncs("id","projectId") in the migration, run a pre-check that counts
mismatched rows between warehouse_sync_runs and warehouse_syncs (use the
provided SELECT with IS DISTINCT FROM) and, if >0, perform a normalization
UPDATE to set warehouse_sync_runs.projectId = warehouse_syncs.projectId for rows
where they differ; place this normalization step after the DROP CONSTRAINT
"warehouse_sync_runs_syncId_fkey" and before ADD CONSTRAINT
"warehouse_sync_runs_syncId_projectId_fkey" so the new FK add will not fail
(keep the unique index creation as-is).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3de3952b-f359-4897-8bff-2f7f2775e8af

📥 Commits

Reviewing files that changed from the base of the PR and between ca03135 and 3e7dde4.

📒 Files selected for processing (3)
  • packages/db/prisma/migrations/20260611000000_warehouse_run_composite_fk/migration.sql
  • packages/db/prisma/schema.prisma
  • packages/db/src/types.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • packages/db/src/types.ts
  • packages/db/prisma/schema.prisma

Before adding the (syncId, projectId) composite FK, normalize any runs
whose projectId doesn't match their parent sync's projectId. Without this
the ADD CONSTRAINT fails if mismatched rows exist. Tables are dev-only now
so this is a no-op, but makes the migration safe to apply to any environment.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant