Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/db/package.json
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
},
"dependencies": {
"@clickhouse/client": "^1.18.5",
"@google-cloud/bigquery": "^8.3.1",
"@openpanel/common": "workspace:*",
"@openpanel/constants": "workspace:*",
"@openpanel/json": "workspace:*",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "public"."event_meta" ADD COLUMN "description" TEXT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
-- CreateEnum
CREATE TYPE "public"."BigQuerySyncMappingType" AS ENUM ('events', 'profiles');

-- CreateEnum
CREATE TYPE "public"."BigQuerySyncMode" AS ENUM ('append', 'full');

-- CreateEnum
CREATE TYPE "public"."BigQuerySyncSchedule" AS ENUM ('hourly', 'daily', 'weekly');

-- CreateEnum
CREATE TYPE "public"."BigQuerySyncRunStatus" AS ENUM ('pending', 'running', 'completed', 'failed');

-- CreateTable
CREATE TABLE "public"."bigquery_connections" (
"id" UUID NOT NULL DEFAULT gen_random_uuid(),
"projectId" TEXT NOT NULL,
"gcpProjectId" TEXT NOT NULL,
"serviceAccountEmail" TEXT NOT NULL,
"serviceAccountJsonEncrypted" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

CONSTRAINT "bigquery_connections_pkey" PRIMARY KEY ("id")
);

-- CreateTable
CREATE TABLE "public"."bigquery_syncs" (
"id" UUID NOT NULL DEFAULT gen_random_uuid(),
"connectionId" UUID NOT NULL,
"projectId" TEXT NOT NULL,
"displayName" TEXT NOT NULL,
"dataset" TEXT NOT NULL,
"tableName" TEXT NOT NULL,
"mappingType" "public"."BigQuerySyncMappingType" NOT NULL,
"syncMode" "public"."BigQuerySyncMode" NOT NULL,
"schedule" "public"."BigQuerySyncSchedule" NOT NULL,
"columnMapping" JSONB NOT NULL,
"lastCursor" TEXT,
"lastSyncedAt" TIMESTAMP(3),
"lastSyncStatus" TEXT,
"lastSyncError" TEXT,
"isEnabled" BOOLEAN NOT NULL DEFAULT true,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

CONSTRAINT "bigquery_syncs_pkey" PRIMARY KEY ("id")
);

-- CreateTable
CREATE TABLE "public"."bigquery_sync_runs" (
"id" UUID NOT NULL DEFAULT gen_random_uuid(),
"syncId" UUID NOT NULL,
"projectId" TEXT NOT NULL,
"status" "public"."BigQuerySyncRunStatus" NOT NULL DEFAULT 'pending',
"rowCount" INTEGER NOT NULL DEFAULT 0,
"errorMessage" TEXT,
"startedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"completedAt" TIMESTAMP(3),

CONSTRAINT "bigquery_sync_runs_pkey" PRIMARY KEY ("id")
);

-- CreateIndex
CREATE UNIQUE INDEX "bigquery_connections_projectId_key" ON "public"."bigquery_connections"("projectId");

-- AddForeignKey
ALTER TABLE "public"."bigquery_connections" ADD CONSTRAINT "bigquery_connections_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "public"."projects"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "public"."bigquery_syncs" ADD CONSTRAINT "bigquery_syncs_connectionId_fkey" FOREIGN KEY ("connectionId") REFERENCES "public"."bigquery_connections"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "public"."bigquery_syncs" ADD CONSTRAINT "bigquery_syncs_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "public"."projects"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "public"."bigquery_sync_runs" ADD CONSTRAINT "bigquery_sync_runs_syncId_fkey" FOREIGN KEY ("syncId") REFERENCES "public"."bigquery_syncs"("id") ON DELETE CASCADE ON UPDATE CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Allow multiple BigQuery connections per project (named connections)
-- Drop single-column unique index on projectId
DROP INDEX IF EXISTS "public"."bigquery_connections_projectId_key";

-- Add name column (default '' for any pre-existing rows during dev)
ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT NOT NULL DEFAULT '';

-- Remove the default now that column exists
ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" DROP DEFAULT;

-- Add composite unique constraint: name must be unique within a project
CREATE UNIQUE INDEX "bigquery_connections_projectId_name_key" ON "public"."bigquery_connections"("projectId", "name");
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- Phase 1 hardening: production-grade fields for BigQuery connector

-- BigQueryConnection: region (GDPR compliance) + connection health tracking
ALTER TABLE "public"."bigquery_connections" ADD COLUMN "gcpRegion" TEXT NOT NULL DEFAULT 'US';
ALTER TABLE "public"."bigquery_connections" ADD COLUMN "lastTestedAt" TIMESTAMP(3);
ALTER TABLE "public"."bigquery_connections" ADD COLUMN "lastTestStatus" BOOLEAN;

-- BigQuerySync: typed status enum, circuit-breaker fields, partition filter
ALTER TABLE "public"."bigquery_syncs" ALTER COLUMN "lastSyncStatus" TYPE "public"."BigQuerySyncRunStatus" USING "lastSyncStatus"::"public"."BigQuerySyncRunStatus";
ALTER TABLE "public"."bigquery_syncs" ADD COLUMN "errorRetryCount" INTEGER NOT NULL DEFAULT 0;
ALTER TABLE "public"."bigquery_syncs" ADD COLUMN "isErrorPaused" BOOLEAN NOT NULL DEFAULT false;
ALTER TABLE "public"."bigquery_syncs" ADD COLUMN "partitionFilter" TEXT;

-- BigQuerySyncRun: BigInt rowCount (INT max ~2.1B insufficient for large tables), bytes for cost tracking
ALTER TABLE "public"."bigquery_sync_runs" ALTER COLUMN "rowCount" TYPE BIGINT USING "rowCount"::BIGINT;
ALTER TABLE "public"."bigquery_sync_runs" ADD COLUMN "bytesProcessed" BIGINT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- Fix 1: Add FK on bigquery_sync_runs.projectId (was missing, allowing orphan runs)
ALTER TABLE "public"."bigquery_sync_runs"
ADD CONSTRAINT "bigquery_sync_runs_projectId_fkey"
FOREIGN KEY ("projectId") REFERENCES "public"."projects"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- Fix 2: Cross-tenant vulnerability — enforce connection belongs to same project as sync
-- Requires a unique index on (projectId, id) in bigquery_connections to back the composite FK
CREATE UNIQUE INDEX "bigquery_connections_projectId_id_key"
ON "public"."bigquery_connections"("projectId", "id");

-- Add composite FK on bigquery_syncs: (projectId, connectionId) must match a connection
-- that belongs to the same project. This is an additional constraint on top of Prisma's
-- single-column connectionId FK — both coexist for defence in depth.
ALTER TABLE "public"."bigquery_syncs"
ADD CONSTRAINT "bigquery_syncs_projectId_connectionId_fkey"
FOREIGN KEY ("projectId", "connectionId")
REFERENCES "public"."bigquery_connections"("projectId", "id")
ON DELETE CASCADE ON UPDATE CASCADE;

-- Fix 3: Prevent empty-string connection names at the DB level
-- (Zod already blocks this at the application layer; this is belt-and-suspenders)
-- Backfill any dev rows that got the empty-string default from the prior migration
UPDATE "public"."bigquery_connections"
SET "name" = concat('connection_', "id")
WHERE char_length("name") = 0;

ALTER TABLE "public"."bigquery_connections"
ADD CONSTRAINT "bigquery_connections_name_nonempty_check"
CHECK (char_length("name") > 0);
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
-- Restructure: BigQuery-specific tables → generic Warehouse tables
-- Phase 1 tables are dev-only (no production data) — drop and recreate cleanly.
-- This makes the schema ready for BigQuery, Snowflake, Redshift, Databricks, Postgres.

-- Drop old tables (CASCADE removes all FKs and indexes automatically)
DROP TABLE IF EXISTS "public"."bigquery_sync_runs";
DROP TABLE IF EXISTS "public"."bigquery_syncs";
DROP TABLE IF EXISTS "public"."bigquery_connections";

-- Drop old BigQuery-specific enums
DROP TYPE IF EXISTS "public"."BigQuerySyncMappingType";
DROP TYPE IF EXISTS "public"."BigQuerySyncMode";
DROP TYPE IF EXISTS "public"."BigQuerySyncSchedule";
DROP TYPE IF EXISTS "public"."BigQuerySyncRunStatus";

-- WarehouseType: all supported warehouse providers
CREATE TYPE "public"."WarehouseType" AS ENUM ('bigquery', 'snowflake', 'redshift', 'databricks', 'postgres');

-- Shared sync enums (provider-agnostic)
CREATE TYPE "public"."WarehouseSyncMappingType" AS ENUM ('events', 'profiles');
CREATE TYPE "public"."WarehouseSyncMode" AS ENUM ('append', 'full');
CREATE TYPE "public"."WarehouseSyncSchedule" AS ENUM ('hourly', 'daily', 'weekly');
CREATE TYPE "public"."WarehouseSyncRunStatus" AS ENUM ('pending', 'running', 'completed', 'failed');

-- warehouse_connections: one row per named connection, any provider
-- configEncrypted: AES-256-GCM encrypted JSON (shape validated by zWarehouseConfig discriminated union)
-- displayIdentifier: plain-text for UI display without decryption (GCP project ID / Snowflake account / etc.)
-- displayEmail: plain-text for UI display (SA email / username)
CREATE TABLE "public"."warehouse_connections" (
"id" UUID NOT NULL DEFAULT gen_random_uuid(),
"projectId" TEXT NOT NULL,
"name" TEXT NOT NULL,
"type" "public"."WarehouseType" NOT NULL,
"configEncrypted" TEXT NOT NULL,
"displayIdentifier" TEXT,
"displayEmail" TEXT,
"lastTestedAt" TIMESTAMP(3),
"lastTestStatus" BOOLEAN,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "warehouse_connections_pkey" PRIMARY KEY ("id")
);

CREATE TABLE "public"."warehouse_syncs" (
"id" UUID NOT NULL DEFAULT gen_random_uuid(),
"connectionId" UUID NOT NULL,
"projectId" TEXT NOT NULL,
"displayName" TEXT NOT NULL,
"dataset" TEXT NOT NULL,
"tableName" TEXT NOT NULL,
"mappingType" "public"."WarehouseSyncMappingType" NOT NULL,
"syncMode" "public"."WarehouseSyncMode" NOT NULL,
"schedule" "public"."WarehouseSyncSchedule" NOT NULL,
"columnMapping" JSONB NOT NULL,
"lastCursor" TEXT,
"lastSyncedAt" TIMESTAMP(3),
"lastSyncStatus" "public"."WarehouseSyncRunStatus",
"lastSyncError" TEXT,
"isEnabled" BOOLEAN NOT NULL DEFAULT true,
"errorRetryCount" INTEGER NOT NULL DEFAULT 0,
"isErrorPaused" BOOLEAN NOT NULL DEFAULT false,
"partitionFilter" TEXT,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "warehouse_syncs_pkey" PRIMARY KEY ("id")
);

CREATE TABLE "public"."warehouse_sync_runs" (
"id" UUID NOT NULL DEFAULT gen_random_uuid(),
"syncId" UUID NOT NULL,
"projectId" TEXT NOT NULL,
"status" "public"."WarehouseSyncRunStatus" NOT NULL DEFAULT 'pending',
"rowCount" BIGINT NOT NULL DEFAULT 0,
"bytesProcessed" BIGINT,
"errorMessage" TEXT,
"startedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"completedAt" TIMESTAMP(3),
CONSTRAINT "warehouse_sync_runs_pkey" PRIMARY KEY ("id")
);

-- Unique indexes
-- (projectId, name): connection name must be unique per project, across ALL warehouse types
CREATE UNIQUE INDEX "warehouse_connections_projectId_name_key"
ON "public"."warehouse_connections"("projectId", "name");

-- (projectId, id): backs the composite FK on warehouse_syncs for cross-tenant protection
CREATE UNIQUE INDEX "warehouse_connections_projectId_id_key"
ON "public"."warehouse_connections"("projectId", "id");

-- DB-level name nonempty (belt-and-suspenders; Zod blocks empty strings at app layer)
ALTER TABLE "public"."warehouse_connections"
ADD CONSTRAINT "warehouse_connections_name_nonempty_check"
CHECK (char_length("name") > 0);

-- FKs on warehouse_connections
ALTER TABLE "public"."warehouse_connections"
ADD CONSTRAINT "warehouse_connections_projectId_fkey"
FOREIGN KEY ("projectId") REFERENCES "public"."projects"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- FKs on warehouse_syncs
ALTER TABLE "public"."warehouse_syncs"
ADD CONSTRAINT "warehouse_syncs_connectionId_fkey"
FOREIGN KEY ("connectionId") REFERENCES "public"."warehouse_connections"("id") ON DELETE CASCADE ON UPDATE CASCADE;

ALTER TABLE "public"."warehouse_syncs"
ADD CONSTRAINT "warehouse_syncs_projectId_fkey"
FOREIGN KEY ("projectId") REFERENCES "public"."projects"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- Composite FK: prevents cross-tenant exploit where a sync in project A
-- could reference a connection belonging to project B
ALTER TABLE "public"."warehouse_syncs"
ADD CONSTRAINT "warehouse_syncs_projectId_connectionId_fkey"
FOREIGN KEY ("projectId", "connectionId")
REFERENCES "public"."warehouse_connections"("projectId", "id")
ON DELETE CASCADE ON UPDATE CASCADE;

-- FKs on warehouse_sync_runs
ALTER TABLE "public"."warehouse_sync_runs"
ADD CONSTRAINT "warehouse_sync_runs_syncId_fkey"
FOREIGN KEY ("syncId") REFERENCES "public"."warehouse_syncs"("id") ON DELETE CASCADE ON UPDATE CASCADE;

ALTER TABLE "public"."warehouse_sync_runs"
ADD CONSTRAINT "warehouse_sync_runs_projectId_fkey"
FOREIGN KEY ("projectId") REFERENCES "public"."projects"("id") ON DELETE CASCADE ON UPDATE CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Drop the single-field connectionId FK on warehouse_syncs.
-- The composite FK (projectId, connectionId) → warehouse_connections(projectId, id)
-- from 20260610115042_warehouse_restructure already enforces the same referential
-- integrity PLUS prevents cross-tenant access — keeping both would be redundant.
ALTER TABLE "public"."warehouse_syncs"
DROP CONSTRAINT "warehouse_syncs_connectionId_fkey";
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- Phase 1 finalizing migration: performance indexes + column additions
-- These were identified as missing after the initial restructure.

-- Performance indexes on FK columns (PostgreSQL does NOT auto-create these).
-- Required for list queries: syncs-by-project, syncs-by-connection, runs-by-sync.
CREATE INDEX "warehouse_syncs_projectId_idx"
ON "public"."warehouse_syncs"("projectId");

CREATE INDEX "warehouse_syncs_connectionId_idx"
ON "public"."warehouse_syncs"("connectionId");

CREATE INDEX "warehouse_sync_runs_syncId_idx"
ON "public"."warehouse_sync_runs"("syncId");

-- failureCount: tracks rows that were fetched but failed to import individually
-- (bad data, type mismatch, etc.) — separate from rowCount (successfully written rows).
-- Matches Mixpanel's Sync History "Failures" column in the UI.
ALTER TABLE "public"."warehouse_sync_runs"
ADD COLUMN "failureCount" BIGINT NOT NULL DEFAULT 0;

-- createdBy: userId of the person who created this sync.
-- Used for displaying creator avatar + name in the sync list (matching Mixpanel UI).
ALTER TABLE "public"."warehouse_syncs"
ADD COLUMN "createdBy" TEXT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Add 'onetime' sync mode: run once immediately, then disable automatically.
-- Used for historical backfills — import all historical data once, then set up
-- an Append sync for ongoing data.
ALTER TYPE "public"."WarehouseSyncMode" ADD VALUE 'onetime';

-- Make schedule nullable: onetime syncs have no recurring schedule.
ALTER TABLE "public"."warehouse_syncs"
ALTER COLUMN "schedule" DROP NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- cursorOverlapMinutes: overlap window for append-mode cursor.
-- Rewinds the cursor by this many minutes to catch late-arriving rows that share
-- the same insertTime as the previous high-water mark. Worker handles dedup via
-- eventId (sha256 hash or user-mapped column) so no duplicate events are created.
ALTER TABLE "public"."warehouse_syncs"
ADD COLUMN "cursorOverlapMinutes" INTEGER NOT NULL DEFAULT 10;

-- syncDelayMinutes: delay after scheduled fire time before running.
-- Allows pipeline data to fully land in BigQuery before the sync executes.
-- e.g. a daily sync firing at midnight waits N minutes for the ETL job to finish.
ALTER TABLE "public"."warehouse_syncs"
ADD COLUMN "syncDelayMinutes" INTEGER NOT NULL DEFAULT 0;
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- Tenant isolation: enforce that a sync run's projectId must match
-- its parent sync's projectId. Mirrors the same pattern used on
-- warehouse_syncs → warehouse_connections (composite FK there too).

-- Step 1: unique index on warehouse_syncs(id, projectId) to back the FK
CREATE UNIQUE INDEX "warehouse_syncs_id_projectId_key"
ON "public"."warehouse_syncs"("id", "projectId");

-- Step 2: replace single-column syncId FK with a composite (syncId, projectId) FK
-- so that a run row cannot carry a projectId that differs from its sync
ALTER TABLE "public"."warehouse_sync_runs"
DROP CONSTRAINT "warehouse_sync_runs_syncId_fkey";

-- Backfill: normalize any runs whose projectId doesn't match their parent sync.
-- Defensive — tables are dev-only at this stage, but makes the migration
-- safe to apply to any environment that might have mismatched rows.
UPDATE "public"."warehouse_sync_runs" r
SET "projectId" = s."projectId"
FROM "public"."warehouse_syncs" s
WHERE s."id" = r."syncId"
AND r."projectId" IS DISTINCT FROM s."projectId";

ALTER TABLE "public"."warehouse_sync_runs"
ADD CONSTRAINT "warehouse_sync_runs_syncId_projectId_fkey"
FOREIGN KEY ("syncId", "projectId")
REFERENCES "public"."warehouse_syncs"("id", "projectId")
ON DELETE CASCADE ON UPDATE CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Tighten the name nonempty check on warehouse_connections.
-- The previous constraint used char_length(name) > 0, which passes
-- whitespace-only strings like ' ' (char_length returns 3).
-- btrim strips leading/trailing whitespace before the length check,
-- so ' ' → '' → char_length 0 → rejected.
ALTER TABLE "public"."warehouse_connections"
DROP CONSTRAINT "warehouse_connections_name_nonempty_check";

ALTER TABLE "public"."warehouse_connections"
ADD CONSTRAINT "warehouse_connections_name_nonempty_check"
CHECK (char_length(btrim("name")) > 0);
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Add lastTestError to warehouse_connections so that failed connectivity tests
-- can surface a human-readable reason (permission denied, project not found, etc.)
-- rather than just a boolean false in lastTestStatus.
ALTER TABLE "public"."warehouse_connections"
ADD COLUMN "lastTestError" TEXT;
Loading