Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/fix-replication-leader-lock-per-slot.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Key the logical-replication leader lock on the slot name (not the client name) so consumers of the same replication slot serialize correctly across restarts and rolling deploys
21 changes: 12 additions & 9 deletions apps/webapp/app/routes/admin.api.v1.runs-replication.status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import { getRunsReplicationConfiguredSources } from "~/services/runsReplicationG
/**
* Probes per-source replication leadership via the redlock leader-lock key, which
* is DOUBLE-PREFIXED with `logical-replication-client:` — once from the connection's
* keyPrefix and once from redlock's resource string. So we prefix this connection
* with `runs-replication:logical-replication-client:` and EXISTS on the resource
* `logical-replication-client:runs-replication:<id>`, resolving to:
* runs-replication:logical-replication-client:logical-replication-client:runs-replication:<id>
* keyPrefix and once from redlock's resource string. The lock is keyed on the
* replication slot, so we prefix this connection with
* `runs-replication:logical-replication-client:` and EXISTS on the resource
* `logical-replication-client:<slotName>`, resolving to:
* runs-replication:logical-replication-client:logical-replication-client:<slotName>
*/
async function probeLeadership(sourceIds: string[]): Promise<Map<string, boolean>> {
async function probeLeadership(
sources: { id: string; slotName: string }[]
): Promise<Map<string, boolean>> {
const leaders = new Map<string, boolean>();

const redis = new Redis({
Expand All @@ -26,9 +29,9 @@ async function probeLeadership(sourceIds: string[]): Promise<Map<string, boolean
});

try {
for (const id of sourceIds) {
const exists = await redis.exists(`logical-replication-client:runs-replication:${id}`);
leaders.set(id, exists === 1);
for (const source of sources) {
const exists = await redis.exists(`logical-replication-client:${source.slotName}`);
leaders.set(source.id, exists === 1);
}
} finally {
await redis.quit();
Expand All @@ -46,7 +49,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
return json({ enabled: false, sources: [] });
}

const leaders = await probeLeadership(sources.map((s) => s.id));
const leaders = await probeLeadership(sources);

return json({
enabled: env.RUN_REPLICATION_ENABLED === "1" && sources.length > 0,
Expand Down
9 changes: 6 additions & 3 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ export class RunsReplicationService {
table: "TaskRun",
redisOptions: options.redisOptions,
autoAcknowledge: false,
resubscribeOnFailure: true,
publicationActions: ["insert", "update", "delete"],
logger:
options.logger ?? new Logger("LogicalReplicationClient", options.logLevel ?? "info"),
Expand Down Expand Up @@ -428,7 +429,9 @@ export class RunsReplicationService {

if (!hasCurrentTransaction) {
this.logger.info("No transaction to commit, shutting down immediately");
await Promise.all(Array.from(this._sources.values()).map((runtime) => runtime.client.stop()));
await Promise.all(
Array.from(this._sources.values()).map((runtime) => runtime.client.shutdown())
);
this._isShutDownComplete = true;
return;
}
Expand Down Expand Up @@ -458,7 +461,7 @@ export class RunsReplicationService {
for (const runtime of this._sources.values()) {
this.logger.info("Stopping replication client", { sourceId: runtime.source.id });

await runtime.client.stop();
await runtime.client.shutdown();

if (runtime.acknowledgeInterval) {
clearInterval(runtime.acknowledgeInterval);
Expand Down Expand Up @@ -636,7 +639,7 @@ export class RunsReplicationService {
// swallow client.stop() rejections so they don't surface as unhandled.
if (!this._shutdownStopInFlight) {
this._shutdownStopInFlight = true;
Promise.all(Array.from(this._sources.values()).map((r) => r.client.stop()))
Promise.all(Array.from(this._sources.values()).map((r) => r.client.shutdown()))
.catch((error) => {
this.logger.error("Error stopping replication clients during shutdown", { error });
})
Expand Down
18 changes: 12 additions & 6 deletions apps/webapp/app/services/sessionsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ export class SessionsReplicationService {
table: "Session",
redisOptions: options.redisOptions,
autoAcknowledge: false,
resubscribeOnFailure: true,
publicationActions: ["insert", "update", "delete"],
logger: options.logger ?? new Logger("LogicalReplicationClient", options.logLevel ?? "info"),
leaderLockTimeoutMs: options.leaderLockTimeoutMs ?? 30_000,
Expand Down Expand Up @@ -265,7 +266,7 @@ export class SessionsReplicationService {

if (!this._currentTransaction) {
this.logger.info("No transaction to commit, shutting down immediately");
await this._replicationClient.stop();
await this._replicationClient.shutdown();
this._isSubscribed = false;
this._isShutDownComplete = true;
return;
Expand Down Expand Up @@ -294,7 +295,7 @@ export class SessionsReplicationService {
async stop() {
this.logger.info("Stopping replication client");

await this._replicationClient.stop();
await this._replicationClient.shutdown();

if (this._acknowledgeInterval) {
clearInterval(this._acknowledgeInterval);
Expand Down Expand Up @@ -430,10 +431,15 @@ export class SessionsReplicationService {
if (this._isShutDownComplete) return;

if (this._isShuttingDown) {
this._replicationClient.stop().finally(() => {
this._isSubscribed = false;
this._isShutDownComplete = true;
});
this._replicationClient
.shutdown()
.catch((error) => {
this.logger.error("Error stopping replication client during shutdown", { error });
})
.finally(() => {
this._isSubscribed = false;
this._isShutDownComplete = true;
});
}

// If there are no events, do nothing
Expand Down
6 changes: 4 additions & 2 deletions apps/webapp/test/runsReplicationInstance.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
...baseArgs,
splitEnabled: true,
// Simulates env.RUN_OPS_DATABASE_URL ?? env.TASK_RUN_DATABASE_URL with RUN_OPS unset
newUrl: undefined ?? taskRunUrl,

Check warning on line 152 in apps/webapp/test/runsReplicationInstance.test.ts

View workflow job for this annotation

GitHub Actions / code-quality / code-quality

eslint(no-constant-binary-expression)

Unexpected constant nullishness on the left-hand side of a "??" expression
});

expect(sources).toHaveLength(2);
Expand Down Expand Up @@ -408,10 +408,12 @@

probe = new Redis(redisOptions);

// Leader lock is keyed on the slot, so each source holds a distinct
// slot-keyed lock (double-prefixed: connection keyPrefix + redlock resource).
const legacyKey =
"runs-replication:logical-replication-client:logical-replication-client:runs-replication:legacy";
"runs-replication:logical-replication-client:logical-replication-client:tr_legacy_wiring";
const newKey =
"runs-replication:logical-replication-client:logical-replication-client:runs-replication:new";
"runs-replication:logical-replication-client:logical-replication-client:tr_new_wiring";

expect(await probe.exists(legacyKey)).toBe(1);
expect(await probe.exists(newKey)).toBe(1);
Expand Down
Loading