From cbe81ef8f2cbc653b158c69f190b901259a8e0c3 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Sat, 4 Jul 2026 12:58:24 +0100 Subject: [PATCH 1/5] fix(replication): key logical replication leader lock on slot name The LogicalReplicationClient leader lock (Redlock) was keyed on the client name, but a Postgres logical replication slot allows exactly one consumer, so the lock must serialize consumers of a given slot. When two clients target the same slot with different names (e.g. across a rolling deploy where the name changed but the slot did not), each acquired a distinct lock, both became leader, and the second to run START_REPLICATION failed with 'replication slot is active'. Since START_REPLICATION is fire-and-forget and only logged on error, that consumer stopped and replication stalled until restarted. Key the lock on slotName instead. Adds a regression test (two clients, same slot, different name) verified to fail before and pass after. --- .../fix-replication-leader-lock-per-slot.md | 6 +++ .../replication/src/client.test.ts | 54 +++++++++++++++++++ internal-packages/replication/src/client.ts | 8 ++- 3 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 .server-changes/fix-replication-leader-lock-per-slot.md diff --git a/.server-changes/fix-replication-leader-lock-per-slot.md b/.server-changes/fix-replication-leader-lock-per-slot.md new file mode 100644 index 00000000000..d9637b92193 --- /dev/null +++ b/.server-changes/fix-replication-leader-lock-per-slot.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Key the logical-replication leader lock on the slot name (not the client name) so consumers of the same replication slot serialize correctly across restarts and rolling deploys diff --git a/internal-packages/replication/src/client.test.ts b/internal-packages/replication/src/client.test.ts index 2f5c1d3e488..4574dcc19e7 100644 --- a/internal-packages/replication/src/client.test.ts +++ b/internal-packages/replication/src/client.test.ts @@ -181,4 +181,58 @@ describe("Replication Client", () => { expect(slotExists[0].exists).toBe(false); } ); + + postgresAndRedisTest( + "two clients on the same slot must not both lead (rolling-deploy handoff)", + async ({ postgresContainer, prisma, redisOptions }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const shared = { + slotName: "handoff_slot", + publicationName: "handoff_publication", + redisOptions, + table: "TaskRun", + pgConfig: { connectionString: postgresContainer.getConnectionUri() }, + }; + + // Leader on the shared slot. + const a = new LogicalReplicationClient({ ...shared, name: "runs-replication" }); + const aElections: boolean[] = []; + a.events.on("leaderElection", (won) => aElections.push(won)); + a.events.on("error", () => {}); + await a.subscribe(); + // Let A's walsender actually attach to the slot before B races it. + await setTimeout(1000); + + // Second client, SAME slot, DIFFERENT name — the rolling-deploy shape that + // regressed (name changed "runs-replication" -> "runs-replication:legacy"). + const b = new LogicalReplicationClient({ + ...shared, + name: "runs-replication:legacy", + leaderLockTimeoutMs: 1000, + leaderLockAcquireAdditionalTimeMs: 250, + leaderLockRetryIntervalMs: 200, + }); + const bElections: boolean[] = []; + const bErrors: Array = []; + b.events.on("leaderElection", (won) => bElections.push(won)); + b.events.on("error", (error) => bErrors.push(error)); + await b.subscribe(); + await setTimeout(500); + + expect(aElections).toContain(true); + // B must not also win leadership on the same slot, nor race START_REPLICATION + // into a "slot is active" error. With a name-keyed lock it did both. + expect(bElections).not.toContain(true); + expect(bElections).toContain(false); + expect( + bErrors + .map((e) => String((e as Error)?.message ?? e)) + .some((m) => /replication slot .* is active|already active/i.test(m)) + ).toBe(false); + + await a.stop(); + await b.stop(); + } + ); }); diff --git a/internal-packages/replication/src/client.ts b/internal-packages/replication/src/client.ts index e43a099df3e..8fd0c0ab971 100644 --- a/internal-packages/replication/src/client.ts +++ b/internal-packages/replication/src/client.ts @@ -19,7 +19,8 @@ export interface LogicalReplicationClientOptions { pgConfig: ClientConfig; /** - * The name of this LogicalReplicationClient instance, used for leader election. + * The name of this LogicalReplicationClient instance, used for logging and the + * Postgres application_name. Leader election is keyed on `slotName`. */ name: string; /** @@ -703,8 +704,11 @@ export class LogicalReplicationClient { while (Date.now() - startTime < maxWaitTime) { try { + // Key the leader lock on the SLOT, not `name`: Postgres allows one + // consumer per slot, so consumers of the same slot must contend on the + // same lock (a name-keyed lock lets old+new pods race it across a deploy). this.leaderLock = await this.redlock.acquire( - [`logical-replication-client:${this.options.name}`], + [`logical-replication-client:${this.options.slotName}`], this.leaderLockTimeoutMs ); From abab6cdc156d1a1eb17ad23e6c3e8444b025fefb Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Sat, 4 Jul 2026 14:34:26 +0100 Subject: [PATCH 2/5] fix(replication): probe leadership + assert lock keys by slot name Follow-on to keying the leader lock on slotName: the admin runs-replication status route probed the old name-keyed Redis key (would report leader:false for every source), and the multi-source wiring test asserted the old name-keyed lock keys (CI-red). Both updated to the slot-keyed format. --- .../admin.api.v1.runs-replication.status.ts | 21 +++++++++++-------- .../test/runsReplicationInstance.test.ts | 6 ++++-- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.status.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.status.ts index 6d570418cc7..26a9d64068b 100644 --- a/apps/webapp/app/routes/admin.api.v1.runs-replication.status.ts +++ b/apps/webapp/app/routes/admin.api.v1.runs-replication.status.ts @@ -7,12 +7,15 @@ import { getRunsReplicationConfiguredSources } from "~/services/runsReplicationG /** * Probes per-source replication leadership via the redlock leader-lock key, which * is DOUBLE-PREFIXED with `logical-replication-client:` — once from the connection's - * keyPrefix and once from redlock's resource string. So we prefix this connection - * with `runs-replication:logical-replication-client:` and EXISTS on the resource - * `logical-replication-client:runs-replication:`, resolving to: - * runs-replication:logical-replication-client:logical-replication-client:runs-replication: + * keyPrefix and once from redlock's resource string. The lock is keyed on the + * replication slot, so we prefix this connection with + * `runs-replication:logical-replication-client:` and EXISTS on the resource + * `logical-replication-client:`, resolving to: + * runs-replication:logical-replication-client:logical-replication-client: */ -async function probeLeadership(sourceIds: string[]): Promise> { +async function probeLeadership( + sources: { id: string; slotName: string }[] +): Promise> { const leaders = new Map(); const redis = new Redis({ @@ -26,9 +29,9 @@ async function probeLeadership(sourceIds: string[]): Promise s.id)); + const leaders = await probeLeadership(sources); return json({ enabled: env.RUN_REPLICATION_ENABLED === "1" && sources.length > 0, diff --git a/apps/webapp/test/runsReplicationInstance.test.ts b/apps/webapp/test/runsReplicationInstance.test.ts index 34a6f379934..39f57497e7f 100644 --- a/apps/webapp/test/runsReplicationInstance.test.ts +++ b/apps/webapp/test/runsReplicationInstance.test.ts @@ -408,10 +408,12 @@ describe("RunsReplication multi-source wiring (integration)", () => { probe = new Redis(redisOptions); + // Leader lock is keyed on the slot, so each source holds a distinct + // slot-keyed lock (double-prefixed: connection keyPrefix + redlock resource). const legacyKey = - "runs-replication:logical-replication-client:logical-replication-client:runs-replication:legacy"; + "runs-replication:logical-replication-client:logical-replication-client:tr_legacy_wiring"; const newKey = - "runs-replication:logical-replication-client:logical-replication-client:runs-replication:new"; + "runs-replication:logical-replication-client:logical-replication-client:tr_new_wiring"; expect(await probe.exists(legacyKey)).toBe(1); expect(await probe.exists(newKey)).toBe(1); From 1ae941329e3682fb73b7c9563de2dd0456416af5 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Sat, 4 Jul 2026 15:17:04 +0100 Subject: [PATCH 3/5] feat(replication): auto-resubscribe on a failed handoff (self-healing deploys) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replication clients now optionally re-subscribe (with exponential backoff) after a lost leader election or a failed START_REPLICATION, instead of logging once and staying dead. This makes a rolling deploy self-heal: the incoming pod retries until the draining pod releases the slot, then takes over — no stop-before-start. Safety: - #cleanupAttempt() unconditionally ends the pg client (freeing the walsender) and releases the leader lock before rescheduling, so retries never leak connections/locks (the plain stop() early-returns while _isStopped is set). - shutdown() (used for intentional stop) sets an intentional-stop latch that is re-checked after every await in subscribe() and aborts #acquireLeaderLock's spin, so a resubscribe can never race or outlive a shutdown. - backoff resets only on genuine stream start (replicationStart), so a permanently stuck slot backs off to the ceiling and logs loudly rather than tight-looping; a subscribeEpoch neutralises stale START_REPLICATION catches. Runs- and sessions-replication opt in and use shutdown() for all intentional stops. Adds container tests for the leak, shutdown-race, reset, and self-heal. --- .../services/runsReplicationService.server.ts | 9 +- .../sessionsReplicationService.server.ts | 7 +- .../replication/src/client.test.ts | 230 ++++++++++ internal-packages/replication/src/client.ts | 394 +++++++++++++----- 4 files changed, 526 insertions(+), 114 deletions(-) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index e37ff257671..732665461dd 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -286,6 +286,7 @@ export class RunsReplicationService { table: "TaskRun", redisOptions: options.redisOptions, autoAcknowledge: false, + resubscribeOnFailure: true, publicationActions: ["insert", "update", "delete"], logger: options.logger ?? new Logger("LogicalReplicationClient", options.logLevel ?? "info"), @@ -428,7 +429,9 @@ export class RunsReplicationService { if (!hasCurrentTransaction) { this.logger.info("No transaction to commit, shutting down immediately"); - await Promise.all(Array.from(this._sources.values()).map((runtime) => runtime.client.stop())); + await Promise.all( + Array.from(this._sources.values()).map((runtime) => runtime.client.shutdown()) + ); this._isShutDownComplete = true; return; } @@ -458,7 +461,7 @@ export class RunsReplicationService { for (const runtime of this._sources.values()) { this.logger.info("Stopping replication client", { sourceId: runtime.source.id }); - await runtime.client.stop(); + await runtime.client.shutdown(); if (runtime.acknowledgeInterval) { clearInterval(runtime.acknowledgeInterval); @@ -636,7 +639,7 @@ export class RunsReplicationService { // swallow client.stop() rejections so they don't surface as unhandled. if (!this._shutdownStopInFlight) { this._shutdownStopInFlight = true; - Promise.all(Array.from(this._sources.values()).map((r) => r.client.stop())) + Promise.all(Array.from(this._sources.values()).map((r) => r.client.shutdown())) .catch((error) => { this.logger.error("Error stopping replication clients during shutdown", { error }); }) diff --git a/apps/webapp/app/services/sessionsReplicationService.server.ts b/apps/webapp/app/services/sessionsReplicationService.server.ts index da202458ca8..5e20a36f98e 100644 --- a/apps/webapp/app/services/sessionsReplicationService.server.ts +++ b/apps/webapp/app/services/sessionsReplicationService.server.ts @@ -187,6 +187,7 @@ export class SessionsReplicationService { table: "Session", redisOptions: options.redisOptions, autoAcknowledge: false, + resubscribeOnFailure: true, publicationActions: ["insert", "update", "delete"], logger: options.logger ?? new Logger("LogicalReplicationClient", options.logLevel ?? "info"), leaderLockTimeoutMs: options.leaderLockTimeoutMs ?? 30_000, @@ -265,7 +266,7 @@ export class SessionsReplicationService { if (!this._currentTransaction) { this.logger.info("No transaction to commit, shutting down immediately"); - await this._replicationClient.stop(); + await this._replicationClient.shutdown(); this._isSubscribed = false; this._isShutDownComplete = true; return; @@ -294,7 +295,7 @@ export class SessionsReplicationService { async stop() { this.logger.info("Stopping replication client"); - await this._replicationClient.stop(); + await this._replicationClient.shutdown(); if (this._acknowledgeInterval) { clearInterval(this._acknowledgeInterval); @@ -430,7 +431,7 @@ export class SessionsReplicationService { if (this._isShutDownComplete) return; if (this._isShuttingDown) { - this._replicationClient.stop().finally(() => { + this._replicationClient.shutdown().finally(() => { this._isSubscribed = false; this._isShutDownComplete = true; }); diff --git a/internal-packages/replication/src/client.test.ts b/internal-packages/replication/src/client.test.ts index 4574dcc19e7..2277ae3ca88 100644 --- a/internal-packages/replication/src/client.test.ts +++ b/internal-packages/replication/src/client.test.ts @@ -235,4 +235,234 @@ describe("Replication Client", () => { await b.stop(); } ); + + postgresAndRedisTest( + "resubscribeOnFailure self-heals once the leader releases the slot", + async ({ postgresContainer, prisma, redisOptions }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const shared = { + slotName: "resub_slot", + publicationName: "resub_pub", + redisOptions, + table: "TaskRun", + pgConfig: { connectionString: postgresContainer.getConnectionUri() }, + }; + + // Leader holds the slot. + const a = new LogicalReplicationClient({ ...shared, name: "leader-a" }); + a.events.on("error", () => {}); + await a.subscribe(); + await setTimeout(1000); + + // Contender with resubscribe on: loses the election while A holds the slot, + // then must self-heal (win) once A releases it — the rolling-deploy handoff. + const b = new LogicalReplicationClient({ + ...shared, + name: "contender-b", + resubscribeOnFailure: true, + resubscribeMinDelayMs: 200, + resubscribeMaxDelayMs: 400, + leaderLockTimeoutMs: 500, + leaderLockAcquireAdditionalTimeMs: 100, + leaderLockRetryIntervalMs: 100, + }); + const bElections: boolean[] = []; + b.events.on("leaderElection", (won) => bElections.push(won)); + b.events.on("error", () => {}); + await b.subscribe(); + await setTimeout(1500); + + // Still contending, not leader, while A holds the slot. + expect(bElections).toContain(false); + expect(bElections).not.toContain(true); + + // Release the leader — a scheduled resubscribe should now win. + await a.shutdown(); + + let becameLeader = false; + for (let i = 0; i < 40; i++) { + if (bElections.includes(true)) { + becameLeader = true; + break; + } + await setTimeout(250); + } + expect(becameLeader).toBe(true); + + await b.shutdown(); + } + ); + + postgresAndRedisTest( + "a failing START_REPLICATION retry loop must not leak connections or locks", + async ({ postgresContainer, prisma, redisOptions }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const shared = { + slotName: "leak_slot", + publicationName: "leak_pub", + table: "TaskRun", + pgConfig: { connectionString: postgresContainer.getConnectionUri() }, + }; + + const a = new LogicalReplicationClient({ ...shared, redisOptions, name: "leak-leader" }); + a.events.on("error", () => {}); + await a.subscribe(); + await setTimeout(1000); + + // B elects on a separate lock namespace so every attempt reaches + // START_REPLICATION and dies there ("slot is active") — the stuck-slot shape. + const b = new LogicalReplicationClient({ + ...shared, + redisOptions: { ...redisOptions, keyPrefix: `${redisOptions.keyPrefix ?? ""}other:` }, + name: "leak-contender", + resubscribeOnFailure: true, + resubscribeMinDelayMs: 200, + resubscribeMaxDelayMs: 400, + leaderLockTimeoutMs: 1000, + leaderLockAcquireAdditionalTimeMs: 300, + leaderLockRetryIntervalMs: 100, + }); + const bErrors: Array = []; + b.events.on("error", (error) => bErrors.push(error)); + await b.subscribe(); + + for (let i = 0; i < 80 && bErrors.length < 3; i++) { + await setTimeout(250); + } + expect(bErrors.length).toBeGreaterThanOrEqual(3); + + // Every failed attempt must end its pg client: at most the one in-flight + // attempt's backend may exist, never an accrual across cycles. + const backends = await prisma.$queryRaw<{ count: bigint }[]>` + SELECT count(*) AS count FROM pg_stat_activity WHERE application_name = 'leak-contender' + `; + expect(Number(backends[0].count)).toBeLessThanOrEqual(1); + + const active = await prisma.$queryRaw<{ count: bigint }[]>` + SELECT count(*) AS count FROM pg_replication_slots WHERE slot_name = 'leak_slot' AND active + `; + expect(Number(active[0].count)).toBe(1); + + await b.shutdown(); + await a.shutdown(); + } + ); + + postgresAndRedisTest( + "shutdown during an in-flight subscribe must not leave a zombie leader", + async ({ postgresContainer, prisma, redisOptions }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const shared = { + slotName: "zombie_slot", + publicationName: "zombie_pub", + redisOptions, + table: "TaskRun", + pgConfig: { connectionString: postgresContainer.getConnectionUri() }, + }; + + const a = new LogicalReplicationClient({ ...shared, name: "zombie-leader" }); + a.events.on("error", () => {}); + await a.subscribe(); + await setTimeout(1000); + + // B's election spins against A's held lock; shut it down mid-subscribe. + const b = new LogicalReplicationClient({ + ...shared, + name: "zombie-contender", + resubscribeOnFailure: true, + leaderLockTimeoutMs: 5000, + leaderLockAcquireAdditionalTimeMs: 5000, + leaderLockRetryIntervalMs: 100, + }); + const bElections: boolean[] = []; + b.events.on("leaderElection", (won) => bElections.push(won)); + b.events.on("error", () => {}); + + const inflight = b.subscribe(); + await setTimeout(300); + await b.shutdown(); + + // Release the real leader; a zombie B would now win the lock and the slot. + await a.shutdown(); + await inflight.catch(() => {}); + await setTimeout(1500); + + const zombieWon = bElections.includes(true); + const active = await prisma.$queryRaw<{ count: bigint }[]>` + SELECT count(*) AS count FROM pg_replication_slots WHERE slot_name = 'zombie_slot' AND active + `; + const backends = await prisma.$queryRaw<{ count: bigint }[]>` + SELECT count(*) AS count FROM pg_stat_activity WHERE application_name = 'zombie-contender' + `; + // Reap a zombie (if any) so the test exits cleanly, then assert. + await b.shutdown(); + + expect(zombieWon).toBe(false); + expect(Number(active[0].count)).toBe(0); + expect(Number(backends[0].count)).toBe(0); + } + ); + + postgresAndRedisTest( + "subscribe after shutdown re-arms resubscribeOnFailure", + async ({ postgresContainer, prisma, redisOptions }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const shared = { + slotName: "rearm_slot", + publicationName: "rearm_pub", + redisOptions, + table: "TaskRun", + pgConfig: { connectionString: postgresContainer.getConnectionUri() }, + }; + + const b = new LogicalReplicationClient({ + ...shared, + name: "rearm-client", + resubscribeOnFailure: true, + resubscribeMinDelayMs: 200, + resubscribeMaxDelayMs: 400, + leaderLockTimeoutMs: 500, + leaderLockAcquireAdditionalTimeMs: 100, + leaderLockRetryIntervalMs: 100, + }); + const bElections: boolean[] = []; + b.events.on("leaderElection", (won) => bElections.push(won)); + b.events.on("error", () => {}); + + // Admin stop -> start: shutdown latches the intentional stop... + await b.subscribe(); + await setTimeout(500); + await b.shutdown(); + + const a = new LogicalReplicationClient({ ...shared, name: "rearm-leader" }); + a.events.on("error", () => {}); + await a.subscribe(); + await setTimeout(1000); + + // ...then an explicit re-subscribe loses the election and must self-heal + // once the leader goes away (self-heal re-armed by the subscribe). + bElections.length = 0; + await b.subscribe(); + expect(bElections).toContain(false); + expect(bElections).not.toContain(true); + + await a.shutdown(); + + let becameLeader = false; + for (let i = 0; i < 40; i++) { + if (bElections.includes(true)) { + becameLeader = true; + break; + } + await setTimeout(250); + } + expect(becameLeader).toBe(true); + + await b.shutdown(); + } + ); }); diff --git a/internal-packages/replication/src/client.ts b/internal-packages/replication/src/client.ts index 8fd0c0ab971..36ba0e61924 100644 --- a/internal-packages/replication/src/client.ts +++ b/internal-packages/replication/src/client.ts @@ -66,6 +66,17 @@ export interface LogicalReplicationClientOptions { */ leaderLockAcquireAdditionalTimeMs?: number; + /** + * Auto re-subscribe (with backoff) after a lost election / failed + * START_REPLICATION instead of stopping. Off by default; when on, use + * `shutdown()` (not `stop()`) for intentional shutdown. + */ + resubscribeOnFailure?: boolean; + /** Base delay for the resubscribe backoff (default: 1000ms). */ + resubscribeMinDelayMs?: number; + /** Max delay for the resubscribe backoff (default: 30000ms). */ + resubscribeMaxDelayMs?: number; + /** * The interval in seconds to automatically acknowledge the last LSN if no ack has been sent (default: 10) */ @@ -109,6 +120,13 @@ export class LogicalReplicationClient { private ackIntervalTimer: NodeJS.Timeout | null = null; private _isStopped: boolean = false; private _tracer: Tracer; + private resubscribeOnFailure: boolean; + private resubscribeMinDelayMs: number; + private resubscribeMaxDelayMs: number; + private resubscribeTimer: NodeJS.Timeout | null = null; + private resubscribeAttempts: number = 0; + private _intentionalStop: boolean = false; + private subscribeEpoch: number = 0; public get lastLsn(): string { return this.lastAcknowledgedLsn ?? "0/00000000"; @@ -131,6 +149,9 @@ export class LogicalReplicationClient { this.leaderLockAcquireAdditionalTimeMs = options.leaderLockAcquireAdditionalTimeMs ?? 1000; this.leaderLockRetryIntervalMs = options.leaderLockRetryIntervalMs ?? 500; this.ackIntervalSeconds = options.ackIntervalSeconds ?? 10; + this.resubscribeOnFailure = options.resubscribeOnFailure ?? false; + this.resubscribeMinDelayMs = options.resubscribeMinDelayMs ?? 1000; + this.resubscribeMaxDelayMs = options.resubscribeMaxDelayMs ?? 30000; this.redis = createRedisClient( { @@ -155,6 +176,11 @@ export class LogicalReplicationClient { public async stop(): Promise { return await startSpan(this._tracer, "logical_replication_client.stop", async (span) => { + if (this.resubscribeTimer) { + clearTimeout(this.resubscribeTimer); + this.resubscribeTimer = null; + } + if (this._isStopped) return this; span.setAttribute("replication_client.name", this.options.name); @@ -212,11 +238,100 @@ export class LogicalReplicationClient { }); } + /** + * Permanently stop the client and disable auto-resubscribe. Use this (not + * stop()) for intentional shutdown so a failure-triggered resubscribe can't + * race it. + */ + public async shutdown(): Promise { + this._intentionalStop = true; + return this.stop(); + } + + /** + * Unconditionally release the current attempt's timers, pg client and leader + * lock. Unlike stop() this doesn't no-op when `_isStopped` is set — a failed + * subscribe runs entirely in that state and would otherwise leak them. + */ + async #cleanupAttempt(): Promise { + this._isStopped = true; + + if (this.leaderLockHeartbeatTimer) { + clearInterval(this.leaderLockHeartbeatTimer); + this.leaderLockHeartbeatTimer = null; + } + + if (this.ackIntervalTimer) { + clearInterval(this.ackIntervalTimer); + this.ackIntervalTimer = null; + } + + this.connection?.removeAllListeners(); + this.connection = null; + + if (this.client) { + this.client.removeAllListeners(); + + const [endError] = await tryCatch(this.client.end()); + + if (endError) { + this.logger.error("Failed to end client", { + name: this.options.name, + error: endError, + }); + } + this.client = null; + } + + await this.#releaseLeaderLock(); + } + + #scheduleResubscribe(reason: string): void { + if (!this.resubscribeOnFailure || this._intentionalStop) return; + if (this.resubscribeTimer) return; + + const delay = Math.min( + this.resubscribeMinDelayMs * 2 ** this.resubscribeAttempts, + this.resubscribeMaxDelayMs + ); + this.resubscribeAttempts += 1; + + const payload = { + name: this.options.name, + slotName: this.options.slotName, + reason, + attempt: this.resubscribeAttempts, + delayMs: delay, + }; + // At the ceiling the stream isn't recovering — log loudly so a genuinely + // stuck slot surfaces instead of hiding behind silent retries. + if (delay >= this.resubscribeMaxDelayMs) { + this.logger.error("Replication resubscribe scheduled (at max backoff)", payload); + } else { + this.logger.warn("Replication resubscribe scheduled", payload); + } + + this.resubscribeTimer = setTimeout(() => { + this.resubscribeTimer = null; + if (this._intentionalStop) return; + this.subscribe(this.lastAcknowledgedLsn ?? undefined).catch((error) => { + this.logger.error("Replication resubscribe attempt failed", { + name: this.options.name, + error, + }); + this.#scheduleResubscribe("resubscribe-threw"); + }); + }, delay); + } + public async teardown(): Promise { + this._intentionalStop = true; + this.subscribeEpoch += 1; await this.stop(); + await this.#cleanupAttempt(); - // Acquire the leaderLock - const leaderLockAcquired = await this.#acquireLeaderLock(); + // Acquire the leaderLock (teardown itself is an intentional stop) + const leaderLockAcquired = await this.#acquireLeaderLock(false); if (!leaderLockAcquired) { return false; @@ -242,7 +357,13 @@ export class LogicalReplicationClient { } public async subscribe(startLsn?: string): Promise { + // An explicit subscribe is intent to run: re-arm self-heal after shutdown(). + this._intentionalStop = false; + const attemptEpoch = ++this.subscribeEpoch; + await this.stop(); + // stop() no-ops once stopped; a failed attempt can leave a client/lock behind. + await this.#cleanupAttempt(); this.lastAcknowledgedLsn = startLsn ?? this.lastAcknowledgedLsn; @@ -257,9 +378,16 @@ export class LogicalReplicationClient { // 1. Leader election const leaderLockAcquired = await this.#acquireLeaderLock(); + if (this._intentionalStop) { + await this.#cleanupAttempt(); + return this; + } + if (!leaderLockAcquired) { this.events.emit("leaderElection", false); - return this.stop(); + await this.#cleanupAttempt(); + this.#scheduleResubscribe("leader-election-failed"); + return this; } this.events.emit("leaderElection", true); @@ -278,135 +406,175 @@ export class LogicalReplicationClient { // Start auto-acknowledge interval this.#startAckInterval(); - // 2. Connect pg client - this.client = new Client({ - ...this.options.pgConfig, - // @ts-expect-error - replication: "database", - application_name: this.options.name, - }); - await this.client.connect(); - // @ts-ignore - this.connection = this.client.connection; + try { + // 2. Connect pg client + this.client = new Client({ + ...this.options.pgConfig, + // @ts-expect-error + replication: "database", + application_name: this.options.name, + }); + await this.client.connect(); + // @ts-ignore + this.connection = this.client.connection; - const publicationCreated = await this.#createPublication(); + if (this._intentionalStop) { + await this.#cleanupAttempt(); + return this; + } - if (!publicationCreated) { - return this.stop(); - } + const publicationCreated = await this.#createPublication(); - this.logger.info("Publication created", { - name: this.options.name, - table: this.options.table, - slotName: this.options.slotName, - publicationName: this.options.publicationName, - startLsn, - }); + if (!publicationCreated) { + await this.#cleanupAttempt(); + this.#scheduleResubscribe("create-publication-failed"); + return this; + } - const slotCreated = await this.#createSlot(); + this.logger.info("Publication created", { + name: this.options.name, + table: this.options.table, + slotName: this.options.slotName, + publicationName: this.options.publicationName, + startLsn, + }); - if (!slotCreated) { - return this.stop(); - } + const slotCreated = await this.#createSlot(); - this.logger.info("Slot created", { - name: this.options.name, - table: this.options.table, - slotName: this.options.slotName, - publicationName: this.options.publicationName, - startLsn, - }); + if (!slotCreated) { + await this.#cleanupAttempt(); + this.#scheduleResubscribe("create-slot-failed"); + return this; + } - // 5. Start replication (pgoutput) - const parser = new PgoutputParser(); - const sql = getPgoutputStartReplicationSQL(this.options.slotName, this.lastLsn, { - protoVersion: 1, - publicationNames: [this.options.publicationName], - messages: false, - }); + this.logger.info("Slot created", { + name: this.options.name, + table: this.options.table, + slotName: this.options.slotName, + publicationName: this.options.publicationName, + startLsn, + }); - // 6. Listen for replication events (copyData, etc.) - if (!this.connection) { - this.events.emit( - "error", - new LogicalReplicationClientError("No connection after starting replication") - ); - return this.stop(); - } + if (this._intentionalStop) { + await this.#cleanupAttempt(); + return this; + } - this.connection.once("replicationStart", () => { - this._isStopped = false; - this.events.emit("start"); - }); + // 5. Start replication (pgoutput) + const parser = new PgoutputParser(); + const sql = getPgoutputStartReplicationSQL(this.options.slotName, this.lastLsn, { + protoVersion: 1, + publicationNames: [this.options.publicationName], + messages: false, + }); + + // 6. Listen for replication events (copyData, etc.) + if (!this.connection) { + this.events.emit( + "error", + new LogicalReplicationClientError("No connection after starting replication") + ); + await this.#cleanupAttempt(); + this.#scheduleResubscribe("no-connection"); + return this; + } - this.connection.on( - "copyData", - async ({ chunk: buffer }: { length: number; chunk: Buffer; name: string }) => { - // pgoutput protocol: 0x77 = XLogData, 0x6b = Primary keepalive - if (buffer[0] !== 0x77 && buffer[0] !== 0x6b) { - this.logger.warn("Unknown replication message type", { byte: buffer[0] }); + this.connection.once("replicationStart", () => { + if (this._intentionalStop) { + // shutdown() raced the stream start — tear this attempt down. + void this.#cleanupAttempt(); return; } - const lsn = - buffer.readUInt32BE(1).toString(16).toUpperCase() + - "/" + - buffer.readUInt32BE(5).toString(16).toUpperCase(); - - if (buffer[0] === 0x77) { - // XLogData - try { - const start = process.hrtime.bigint(); - const log = parser.parse(buffer.subarray(25)); - const duration = process.hrtime.bigint() - start; - this.events.emit("data", { lsn, log, parseDuration: duration }); - await this.#acknowledge(lsn); - } catch (err) { - this.logger.error("Failed to parse XLogData", { error: err }); - this.events.emit("error", err instanceof Error ? err : new Error(String(err))); + this._isStopped = false; + this.resubscribeAttempts = 0; + this.events.emit("start"); + }); + + this.connection.on( + "copyData", + async ({ chunk: buffer }: { length: number; chunk: Buffer; name: string }) => { + // pgoutput protocol: 0x77 = XLogData, 0x6b = Primary keepalive + if (buffer[0] !== 0x77 && buffer[0] !== 0x6b) { + this.logger.warn("Unknown replication message type", { byte: buffer[0] }); + return; } - } else if (buffer[0] === 0x6b) { - // Primary keepalive message - const timestamp = Math.floor( - buffer.readUInt32BE(9) * 4294967.296 + buffer.readUInt32BE(13) / 1000 + 946080000000 - ); - const shouldRespond = !!buffer.readInt8(17); - this.events.emit("heartbeat", { lsn, timestamp, shouldRespond }); - if (shouldRespond) { - await this.#acknowledge(lsn); + const lsn = + buffer.readUInt32BE(1).toString(16).toUpperCase() + + "/" + + buffer.readUInt32BE(5).toString(16).toUpperCase(); + + if (buffer[0] === 0x77) { + // XLogData + try { + const start = process.hrtime.bigint(); + const log = parser.parse(buffer.subarray(25)); + const duration = process.hrtime.bigint() - start; + this.events.emit("data", { lsn, log, parseDuration: duration }); + await this.#acknowledge(lsn); + } catch (err) { + this.logger.error("Failed to parse XLogData", { error: err }); + this.events.emit("error", err instanceof Error ? err : new Error(String(err))); + } + } else if (buffer[0] === 0x6b) { + // Primary keepalive message + const timestamp = Math.floor( + buffer.readUInt32BE(9) * 4294967.296 + buffer.readUInt32BE(13) / 1000 + 946080000000 + ); + const shouldRespond = !!buffer.readInt8(17); + this.events.emit("heartbeat", { lsn, timestamp, shouldRespond }); + if (shouldRespond) { + await this.#acknowledge(lsn); + } } + + this.lastAcknowledgedLsn = lsn; } + ); - this.lastAcknowledgedLsn = lsn; - } - ); + // 7. Handle errors and cleanup + this.client.on("error", (err) => { + this.events.emit("error", err); + }); - // 7. Handle errors and cleanup - this.client.on("error", (err) => { - this.events.emit("error", err); - }); + this.logger.info("Started replication", { + name: this.options.name, + table: this.options.table, + slotName: this.options.slotName, + publicationName: this.options.publicationName, + startLsn, + sql: sql.replace(/\s+/g, " "), + }); - this.logger.info("Started replication", { - name: this.options.name, - table: this.options.table, - slotName: this.options.slotName, - publicationName: this.options.publicationName, - startLsn, - sql: sql.replace(/\s+/g, " "), - }); + // Start the replication stream + this.client.query(sql).catch(async (err) => { + // A newer subscribe owns the client/lock now; don't tear it down. + if (attemptEpoch !== this.subscribeEpoch) return; - // Start the replication stream - this.client.query(sql).catch((err) => { - this.logger.error("Failed to start replication", { + this.logger.error("Failed to start replication", { + name: this.options.name, + table: this.options.table, + slotName: this.options.slotName, + publicationName: this.options.publicationName, + error: err, + }); + + this.events.emit("error", err); + await this.#cleanupAttempt(); + this.#scheduleResubscribe("start-replication-failed"); + }); + } catch (error) { + this.logger.error("Subscribe failed after leader election", { name: this.options.name, table: this.options.table, slotName: this.options.slotName, publicationName: this.options.publicationName, - error: err, + error, }); - this.events.emit("error", err); - return this.stop(); - }); + await this.#cleanupAttempt(); + this.#scheduleResubscribe("subscribe-failed"); + throw error; + } return this; } @@ -689,7 +857,7 @@ export class LogicalReplicationClient { }); } - async #acquireLeaderLock(): Promise { + async #acquireLeaderLock(abortOnIntentionalStop = true): Promise { const startTime = Date.now(); const maxWaitTime = this.leaderLockTimeoutMs + this.leaderLockAcquireAdditionalTimeMs; @@ -703,6 +871,16 @@ export class LogicalReplicationClient { let attempt = 0; while (Date.now() - startTime < maxWaitTime) { + if (abortOnIntentionalStop && this._intentionalStop) { + this.logger.info("Leader lock acquisition aborted by shutdown", { + name: this.options.name, + slotName: this.options.slotName, + publicationName: this.options.publicationName, + attempt, + }); + return false; + } + try { // Key the leader lock on the SLOT, not `name`: Postgres allows one // consumer per slot, so consumers of the same slot must contend on the From 5e1341260730e0b6990b6d64fcf1148ab66bbab6 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Sat, 4 Jul 2026 15:36:00 +0100 Subject: [PATCH 4/5] fix(replication): swallow shutdown() rejection in sessions #handleTransaction The fire-and-forget shutdown() in the shutdown branch used .finally() with no .catch(), so a rejection (e.g. leader-lock release throwing) would surface as an unhandled rejection. Mirror the runs service, which swallows and logs it. --- .../services/sessionsReplicationService.server.ts | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/services/sessionsReplicationService.server.ts b/apps/webapp/app/services/sessionsReplicationService.server.ts index 5e20a36f98e..b3bc9abf3ab 100644 --- a/apps/webapp/app/services/sessionsReplicationService.server.ts +++ b/apps/webapp/app/services/sessionsReplicationService.server.ts @@ -431,10 +431,15 @@ export class SessionsReplicationService { if (this._isShutDownComplete) return; if (this._isShuttingDown) { - this._replicationClient.shutdown().finally(() => { - this._isSubscribed = false; - this._isShutDownComplete = true; - }); + this._replicationClient + .shutdown() + .catch((error) => { + this.logger.error("Error stopping replication client during shutdown", { error }); + }) + .finally(() => { + this._isSubscribed = false; + this._isShutDownComplete = true; + }); } // If there are no events, do nothing From 4ab8c7d6af3f46de635d68d74c17683b4f7c5edf Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Sat, 4 Jul 2026 15:42:16 +0100 Subject: [PATCH 5/5] fix(replication): release lock + client if teardown() throws (try/finally) teardown() acquired the (slot-keyed) leader lock and opened a pg client, then dropped the slot and released both. A throw from connect()/#dropSlot() bypassed the release, stranding the lock (blocking the slot's next leader until TTL) and leaking the pg backend. Wrap the acquire->release body in try/finally so both are always released. --- internal-packages/replication/src/client.ts | 35 +++++++++++---------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/internal-packages/replication/src/client.ts b/internal-packages/replication/src/client.ts index 36ba0e61924..a45831dc57e 100644 --- a/internal-packages/replication/src/client.ts +++ b/internal-packages/replication/src/client.ts @@ -337,23 +337,26 @@ export class LogicalReplicationClient { return false; } - this.client = new Client({ - ...this.options.pgConfig, - // @ts-expect-error - replication: "database", - application_name: this.options.name, - }); - await this.client.connect(); - - // Drop the slot - const slotDropped = await this.#dropSlot(); - - await this.client.end(); - this.client = null; - - await this.#releaseLeaderLock(); + try { + this.client = new Client({ + ...this.options.pgConfig, + // @ts-expect-error + replication: "database", + application_name: this.options.name, + }); + await this.client.connect(); - return slotDropped; + // Drop the slot + return await this.#dropSlot(); + } finally { + // Release the client + slot-keyed lock on both success and throw, so a + // mid-teardown failure can't strand the lock (blocking the slot's leader). + if (this.client) { + await tryCatch(this.client.end()); + this.client = null; + } + await this.#releaseLeaderLock(); + } } public async subscribe(startLsn?: string): Promise {