diff --git a/frontend/src/components/actors/workflow/transform-workflow-history.ts b/frontend/src/components/actors/workflow/transform-workflow-history.ts index d6d8d78ab1..3a860031db 100644 --- a/frontend/src/components/actors/workflow/transform-workflow-history.ts +++ b/frontend/src/components/actors/workflow/transform-workflow-history.ts @@ -144,6 +144,14 @@ function transformEntryKind(kind: TransportWorkflowEntry["kind"]): EntryKind { originalName: kind.val.originalName ?? undefined, }, }; + case "WorkflowVersionCheckEntry": + return { + type: "version_check", + data: { + resolved: kind.val.resolved, + latest: kind.val.latest, + }, + }; } } diff --git a/frontend/src/components/actors/workflow/workflow-types.ts b/frontend/src/components/actors/workflow/workflow-types.ts index 70f60531fa..0005639d29 100644 --- a/frontend/src/components/actors/workflow/workflow-types.ts +++ b/frontend/src/components/actors/workflow/workflow-types.ts @@ -73,6 +73,11 @@ export interface RemovedEntry { originalName?: string; } +export interface VersionCheckEntry { + resolved: number; + latest: number; +} + export type EntryKindType = | "step" | "loop" @@ -81,7 +86,8 @@ export type EntryKindType = | "rollback_checkpoint" | "join" | "race" - | "removed"; + | "removed" + | "version_check"; export type EntryKind = | { type: "step"; data: StepEntry } @@ -91,7 +97,8 @@ export type EntryKind = | { type: "rollback_checkpoint"; data: RollbackCheckpointEntry } | { type: "join"; data: JoinEntry } | { type: "race"; data: RaceEntry } - | { type: "removed"; data: RemovedEntry }; + | { type: "removed"; data: RemovedEntry } + | { type: "version_check"; data: VersionCheckEntry }; export type EntryStatus = | "pending" diff --git a/frontend/src/components/actors/workflow/xyflow-nodes.tsx b/frontend/src/components/actors/workflow/xyflow-nodes.tsx index 981c35fa01..043e280dc0 100644 --- a/frontend/src/components/actors/workflow/xyflow-nodes.tsx +++ b/frontend/src/components/actors/workflow/xyflow-nodes.tsx @@ -89,6 +89,12 @@ export const TYPE_COLORS: Record< icon: "#71717a", iconBg: "#71717a15", }, + version_check: { + bg: "hsl(var(--card))", + border: "hsl(var(--border))", + icon: "#6366f1", + iconBg: "#6366f115", + }, input: { bg: "hsl(var(--card))", border: "hsl(var(--border))", diff --git a/rivetkit-typescript/packages/rivetkit/src/common/bare/transport/v1.ts b/rivetkit-typescript/packages/rivetkit/src/common/bare/transport/v1.ts index 3cb911ea76..6245b39309 100644 --- a/rivetkit-typescript/packages/rivetkit/src/common/bare/transport/v1.ts +++ b/rivetkit-typescript/packages/rivetkit/src/common/bare/transport/v1.ts @@ -521,6 +521,28 @@ export function writeWorkflowRemovedEntry( write1(bc, x.originalName); } +export type WorkflowVersionCheckEntry = { + readonly resolved: u32; + readonly latest: u32; +}; + +export function readWorkflowVersionCheckEntry( + bc: bare.ByteCursor, +): WorkflowVersionCheckEntry { + return { + resolved: bare.readU32(bc), + latest: bare.readU32(bc), + }; +} + +export function writeWorkflowVersionCheckEntry( + bc: bare.ByteCursor, + x: WorkflowVersionCheckEntry, +): void { + bare.writeU32(bc, x.resolved); + bare.writeU32(bc, x.latest); +} + export type WorkflowEntryKind = | { readonly tag: "WorkflowStepEntry"; readonly val: WorkflowStepEntry } | { readonly tag: "WorkflowLoopEntry"; readonly val: WorkflowLoopEntry } @@ -538,6 +560,10 @@ export type WorkflowEntryKind = | { readonly tag: "WorkflowRemovedEntry"; readonly val: WorkflowRemovedEntry; + } + | { + readonly tag: "WorkflowVersionCheckEntry"; + readonly val: WorkflowVersionCheckEntry; }; export function readWorkflowEntryKind(bc: bare.ByteCursor): WorkflowEntryKind { @@ -572,6 +598,11 @@ export function readWorkflowEntryKind(bc: bare.ByteCursor): WorkflowEntryKind { tag: "WorkflowRemovedEntry", val: readWorkflowRemovedEntry(bc), }; + case 8: + return { + tag: "WorkflowVersionCheckEntry", + val: readWorkflowVersionCheckEntry(bc), + }; default: { bc.offset = offset; throw new bare.BareError(offset, "invalid tag"); @@ -624,6 +655,11 @@ export function writeWorkflowEntryKind( writeWorkflowRemovedEntry(bc, x.val); break; } + case "WorkflowVersionCheckEntry": { + bare.writeU8(bc, 8); + writeWorkflowVersionCheckEntry(bc, x.val); + break; + } } } diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts index 3c94653463..6c93ed5d67 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts @@ -907,6 +907,10 @@ function serializeWorkflowEntryKind( | { tag: "WorkflowRemovedEntry"; val: { originalType: string; originalName: string | null }; + } + | { + tag: "WorkflowVersionCheckEntry"; + val: { resolved: number; latest: number }; } { switch (kind.tag) { case "WorkflowStepEntry": @@ -972,6 +976,14 @@ function serializeWorkflowEntryKind( originalName: kind.val.originalName, }, }; + case "WorkflowVersionCheckEntry": + return { + tag: kind.tag, + val: { + resolved: kind.val.resolved, + latest: kind.val.latest, + }, + }; } } diff --git a/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts b/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts index 34d867af08..69145be571 100644 --- a/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts +++ b/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts @@ -960,6 +960,10 @@ export class WorkflowContext< await this.#wrapActive(() => this.#inner.removed(name, originalType)); } + getVersion(name: string, latest: number): Promise { + return this.#wrapActive(() => this.#inner.getVersion(name, latest)); + } + isEvicted(): boolean { return this.#inner.isEvicted(); } diff --git a/rivetkit-typescript/packages/rivetkit/src/workflow/inspector.ts b/rivetkit-typescript/packages/rivetkit/src/workflow/inspector.ts index c4617e4d25..155a65154f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/workflow/inspector.ts +++ b/rivetkit-typescript/packages/rivetkit/src/workflow/inspector.ts @@ -184,6 +184,14 @@ function toWorkflowEntryKind(kind: EntryKind): transport.WorkflowEntryKind { originalName: kind.data.originalName ?? null, }, }; + case "version_check": + return { + tag: "WorkflowVersionCheckEntry", + val: { + resolved: kind.data.resolved, + latest: kind.data.latest, + }, + }; default: assertUnreachable(kind as never); } diff --git a/rivetkit-typescript/packages/rivetkit/tests/inspector-versioned.test.ts b/rivetkit-typescript/packages/rivetkit/tests/inspector-versioned.test.ts index 827e8b4649..d9189d537d 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/inspector-versioned.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/inspector-versioned.test.ts @@ -188,6 +188,17 @@ describe("inspector workflow transport", () => { }, }, }, + { + id: "entry-2", + location: [{ tag: "WorkflowNameIndex", val: 1 }], + kind: { + tag: "WorkflowVersionCheckEntry", + val: { + resolved: 1, + latest: 2, + }, + }, + }, ], entryMetadata: new Map([ [ diff --git a/rivetkit-typescript/packages/workflow-engine/schemas/serde.ts b/rivetkit-typescript/packages/workflow-engine/schemas/serde.ts index 0a2b92bfb6..3566d9bcb8 100644 --- a/rivetkit-typescript/packages/workflow-engine/schemas/serde.ts +++ b/rivetkit-typescript/packages/workflow-engine/schemas/serde.ts @@ -307,6 +307,14 @@ function entryKindToBare(kind: InternalEntryKind): v1.EntryKind { originalName: kind.data.originalName ?? null, }, }; + case "version_check": + return { + tag: "VersionCheckEntry", + val: { + resolved: kind.data.resolved, + latest: kind.data.latest, + }, + }; } } @@ -396,6 +404,14 @@ function entryKindFromBare(kind: v1.EntryKind): InternalEntryKind { originalName: kind.val.originalName ?? undefined, }, }; + case "VersionCheckEntry": + return { + type: "version_check", + data: { + resolved: kind.val.resolved, + latest: kind.val.latest, + }, + }; default: throw new Error( `Unknown entry kind: ${(kind as { tag: string }).tag}`, diff --git a/rivetkit-typescript/packages/workflow-engine/schemas/v1.bare b/rivetkit-typescript/packages/workflow-engine/schemas/v1.bare index 037fea95a9..d319437c8a 100644 --- a/rivetkit-typescript/packages/workflow-engine/schemas/v1.bare +++ b/rivetkit-typescript/packages/workflow-engine/schemas/v1.bare @@ -122,6 +122,14 @@ type RemovedEntry struct { originalName: optional } +# MARK: Version Check Entry +type VersionCheckEntry struct { + # The version this instance resolved to at this location + resolved: u32 + # The `latest` value seen when first resolved (diagnostics) + latest: u32 +} + # MARK: Entry Kind # Type-specific entry data type EntryKind union { @@ -132,7 +140,8 @@ type EntryKind union { RollbackCheckpointEntry | JoinEntry | RaceEntry | - RemovedEntry + RemovedEntry | + VersionCheckEntry } # MARK: Entry diff --git a/rivetkit-typescript/packages/workflow-engine/schemas/versioned.ts b/rivetkit-typescript/packages/workflow-engine/schemas/versioned.ts index 35907eff47..f1698ad98d 100644 --- a/rivetkit-typescript/packages/workflow-engine/schemas/versioned.ts +++ b/rivetkit-typescript/packages/workflow-engine/schemas/versioned.ts @@ -19,6 +19,7 @@ export type { RemovedEntry, SleepEntry, StepEntry, + VersionCheckEntry, WorkflowMetadata, } from "../dist/schemas/v1.js"; diff --git a/rivetkit-typescript/packages/workflow-engine/src/context.ts b/rivetkit-typescript/packages/workflow-engine/src/context.ts index 96cee97213..c4ee278bf7 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/context.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/context.ts @@ -2581,4 +2581,97 @@ export class WorkflowContextImpl implements WorkflowContextInterface { setEntry(this.storage, location, entry); await this.flushStorage(); } + + // === Version === + + async getVersion(name: string, latest: number): Promise { + this.assertNotInProgress(); + this.checkEvicted(); + + this.entryInProgress = true; + try { + return await this.executeGetVersion(name, latest); + } finally { + this.entryInProgress = false; + } + } + + private async executeGetVersion( + name: string, + latest: number, + ): Promise { + if (!Number.isInteger(latest) || latest < 1) { + throw new Error( + `getVersion("${name}", ${latest}): latest must be an integer >= 1`, + ); + } + + // Check for duplicate name in current execution + this.checkDuplicateName(name); + + const location = appendName(this.storage, this.currentLocation, name); + const key = locationToKey(this.storage, location); + const existing = this.storage.history.entries.get(key); + + // Mark this entry as visited for validateComplete + this.markVisited(key); + + this.stopRollbackIfMissing(existing); + + if (existing) { + if (existing.kind.type !== "version_check") { + throw new HistoryDivergedError( + `Expected version_check at ${key}, found ${existing.kind.type}`, + ); + } + // Pure replay: this instance is already pinned at this location. + return existing.kind.data.resolved; + } + + // No recorded version at this location. Decide whether this instance + // already executed past this point under older code (old in-flight) or + // is reaching it fresh at the live frontier. + // + // The discriminator is "is there any unvisited history entry under the + // current scope?". Entries created earlier in this same run are already + // marked visited, so the only unvisited entries under the scope are + // leftovers from a prior run, which proves this scope already executed + // under code that predates this gate. Such instances resolve to the + // implicit floor version 1 (old branch); fresh instances resolve to + // `latest`. + const resolved = this.hasUnvisitedUnderCurrentScope(key) ? 1 : latest; + + const entry = createEntry(location, { + type: "version_check", + data: { resolved, latest }, + }); + setEntry(this.storage, location, entry); + await this.flushStorage(); + + return resolved; + } + + /** + * Returns true if any history entry under the current location scope + * (other than `excludeKey`) has not yet been visited this run. Used by + * getVersion to detect an old in-flight instance whose scope was already + * executed by code that predates a version gate. + */ + private hasUnvisitedUnderCurrentScope(excludeKey: string): boolean { + const prefix = locationToKey(this.storage, this.currentLocation); + + for (const key of this.storage.history.entries.keys()) { + if (key === excludeKey) { + continue; + } + const isUnderPrefix = + prefix === "" + ? true + : key.startsWith(`${prefix}/`) || key === prefix; + if (isUnderPrefix && !this.visitedKeys.has(key)) { + return true; + } + } + return false; + } } diff --git a/rivetkit-typescript/packages/workflow-engine/src/types.ts b/rivetkit-typescript/packages/workflow-engine/src/types.ts index 6d2d3ed321..af5ca70cb6 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/types.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/types.ts @@ -135,6 +135,17 @@ export interface RemovedEntry { originalName?: string; } +/** + * Version check entry data - records which version of a code path this + * workflow instance is pinned to at a given location. + */ +export interface VersionCheckEntry { + /** The version this instance resolved to at this location. */ + resolved: number; + /** The `latest` value seen when this entry was first resolved (diagnostics). */ + latest: number; +} + /** * All possible entry kind types. */ @@ -146,7 +157,8 @@ export type EntryKindType = | "rollback_checkpoint" | "join" | "race" - | "removed"; + | "removed" + | "version_check"; /** * Type-specific entry data. @@ -159,7 +171,8 @@ export type EntryKind = | { type: "rollback_checkpoint"; data: RollbackCheckpointEntry } | { type: "join"; data: JoinEntry } | { type: "race"; data: RaceEntry } - | { type: "removed"; data: RemovedEntry }; + | { type: "removed"; data: RemovedEntry } + | { type: "version_check"; data: VersionCheckEntry }; /** * An entry in the workflow history. @@ -539,6 +552,8 @@ export interface WorkflowContextInterface { removed(name: string, originalType: EntryKindType): Promise; + getVersion(name: string, latest: number): Promise; + isEvicted(): boolean; } diff --git a/rivetkit-typescript/packages/workflow-engine/tests/version.test.ts b/rivetkit-typescript/packages/workflow-engine/tests/version.test.ts new file mode 100644 index 0000000000..7350565dd0 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/tests/version.test.ts @@ -0,0 +1,247 @@ +import { beforeEach, describe, expect, it } from "vitest"; +import { + HistoryDivergedError, + InMemoryDriver, + Loop, + runWorkflow, + type WorkflowContextInterface, +} from "../src/testing.js"; + +const modes = ["yield", "live"] as const; + +for (const mode of modes) { + describe( + `Workflow Engine getVersion (${mode})`, + { sequential: true }, + () => { + let driver: InMemoryDriver; + + beforeEach(() => { + driver = new InMemoryDriver(); + driver.latency = 0; + }); + + it("resolves a fresh instance to latest and pins it on replay", async () => { + const workflow = async (ctx: WorkflowContextInterface) => { + const v = await ctx.getVersion("gate", 2); + await ctx.step("record", async () => v); + return v; + }; + + const first = await runWorkflow( + "wf-fresh", + workflow, + undefined, + driver, + { mode }, + ).result; + expect(first.state).toBe("completed"); + expect(first.output).toBe(2); + + // Re-running replays the recorded version_check and returns the + // same pinned value. + const second = await runWorkflow( + "wf-fresh", + workflow, + undefined, + driver, + { mode }, + ).result; + expect(second.state).toBe("completed"); + expect(second.output).toBe(2); + }); + + it("resolves an old in-flight instance to floor 1 and replays the old step's value", async () => { + // v1 code: records a step, no version gate. + const v1 = async (ctx: WorkflowContextInterface) => { + await ctx.step("work", async () => "did-work"); + return "v1-done"; + }; + await runWorkflow("wf-old", v1, undefined, driver, { mode }) + .result; + + // v2 code: adds a gate before the step. The old instance already + // ran `work`, so the gate must resolve to 1 (old branch) and the + // replayed step must return the originally recorded value. + const v2 = async (ctx: WorkflowContextInterface) => { + const v = await ctx.getVersion("gate", 2); + if (v === 1) { + const w = await ctx.step( + "work", + async () => "should-not-run", + ); + return `old:${w}`; + } + return "new"; + }; + const result = await runWorkflow( + "wf-old", + v2, + undefined, + driver, + { mode }, + ).result; + expect(result.state).toBe("completed"); + expect(result.output).toBe("old:did-work"); + }); + + it("resolves to latest when the gate is not the first call in its scope (fresh run)", async () => { + const workflow = async (ctx: WorkflowContextInterface) => { + await ctx.step("a", async () => "a"); + return await ctx.getVersion("gate", 3); + }; + const result = await runWorkflow( + "wf-notfirst", + workflow, + undefined, + driver, + { mode }, + ).result; + expect(result.state).toBe("completed"); + expect(result.output).toBe(3); + }); + + it("can be retired with removed() once a migration is finished", async () => { + const withGate = async (ctx: WorkflowContextInterface) => { + const v = await ctx.getVersion("gate", 2); + return `v${v}`; + }; + const gated = await runWorkflow( + "wf-retire", + withGate, + undefined, + driver, + { mode }, + ).result; + expect(gated.output).toBe("v2"); + + const removeGate = async (ctx: WorkflowContextInterface) => { + await ctx.removed("gate", "version_check"); + return "cleaned"; + }; + const result = await runWorkflow( + "wf-retire", + removeGate, + undefined, + driver, + { mode }, + ).result; + expect(result.state).toBe("completed"); + expect(result.output).toBe("cleaned"); + }); + + it("throws HistoryDiverged when a non-version entry occupies the gate location", async () => { + const asStep = async (ctx: WorkflowContextInterface) => { + await ctx.step("gate", async () => "x"); + return "step"; + }; + await runWorkflow("wf-diverge", asStep, undefined, driver, { + mode, + }).result; + + const asVersion = async (ctx: WorkflowContextInterface) => { + await ctx.getVersion("gate", 2); + return "version"; + }; + await expect( + runWorkflow("wf-diverge", asVersion, undefined, driver, { + mode, + }).result, + ).rejects.toThrow(HistoryDivergedError); + }); + }, + ); +} + +describe("Workflow Engine getVersion per-iteration cutover", { sequential: true }, () => { + it("resolves each loop iteration independently across a redeploy", async () => { + const driver = new InMemoryDriver(); + driver.latency = 0; + const mode = "yield" as const; + + // v1 loop: records a `pre` step before waiting for a message, so an + // iteration can be left suspended mid-body with history already present. + const v1 = async (ctx: WorkflowContextInterface) => { + return await ctx.loop({ + name: "consume", + state: { i: 0, processed: 0 }, + run: async (lctx, state) => { + await lctx.step("pre", async () => `pre-${state.i}`); + const msg = await lctx.queue.next("in", { + names: ["work"], + }); + await lctx.step("post", async () => `post:${msg.body}`); + const processed = state.processed + 1; + if (processed >= 3) { + return Loop.break(processed); + } + return Loop.continue({ i: state.i + 1, processed }); + }, + }); + }; + + // First message is processed in iteration 0; iteration 1 records `pre` + // then suspends at queue.next (in-flight under v1). + await driver.messageDriver.addMessage({ + id: "m1", + name: "work", + data: "one", + sentAt: Date.now(), + }); + const r1 = await runWorkflow("wf-loop", v1, undefined, driver, { mode }) + .result; + expect(r1.state).toBe("sleeping"); + + // Redeploy v2: adds a gate at the top of the loop body. + const seenByIter = new Map(); + const v2 = async (ctx: WorkflowContextInterface) => { + return await ctx.loop({ + name: "consume", + state: { i: 0, processed: 0 }, + run: async (lctx, state) => { + const v = await lctx.getVersion("gate", 2); + seenByIter.set(state.i, v); + await lctx.step("pre", async () => `pre-${state.i}`); + const msg = await lctx.queue.next("in", { + names: ["work"], + }); + await lctx.step("post", async () => `post:${msg.body}`); + const processed = state.processed + 1; + if (processed >= 3) { + return Loop.break(processed); + } + return Loop.continue({ i: state.i + 1, processed }); + }, + }); + }; + + // Second message resumes the in-flight iteration 1 (whose `pre` step + // predates the gate) and lets iteration 2 begin fresh. + await driver.messageDriver.addMessage({ + id: "m2", + name: "work", + data: "two", + sentAt: Date.now(), + }); + const r2 = await runWorkflow("wf-loop", v2, undefined, driver, { mode }) + .result; + expect(r2.state).toBe("sleeping"); + + // Third message completes iteration 2. + await driver.messageDriver.addMessage({ + id: "m3", + name: "work", + data: "three", + sentAt: Date.now(), + }); + const r3 = await runWorkflow("wf-loop", v2, undefined, driver, { mode }) + .result; + expect(r3.state).toBe("completed"); + + // Iteration 1 was in-flight under v1 (its `pre` step predates the gate), + // so it resolves to floor version 1. Iteration 2 is fresh, so it + // resolves to latest (2). + expect(seenByIter.get(1)).toBe(1); + expect(seenByIter.get(2)).toBe(2); + }); +});