Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions frontend/src/components/actors/workflow/workflow-types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions frontend/src/components/actors/workflow/xyflow-nodes.tsx

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,28 @@ export function writeWorkflowRemovedEntry(
write1(bc, x.originalName);
}

export type WorkflowVersionCheckEntry = {
readonly resolved: u32;
readonly latest: u32;
};

export function readWorkflowVersionCheckEntry(
bc: bare.ByteCursor,
): WorkflowVersionCheckEntry {
return {
resolved: bare.readU32(bc),
latest: bare.readU32(bc),
};
}

export function writeWorkflowVersionCheckEntry(
bc: bare.ByteCursor,
x: WorkflowVersionCheckEntry,
): void {
bare.writeU32(bc, x.resolved);
bare.writeU32(bc, x.latest);
}

export type WorkflowEntryKind =
| { readonly tag: "WorkflowStepEntry"; readonly val: WorkflowStepEntry }
| { readonly tag: "WorkflowLoopEntry"; readonly val: WorkflowLoopEntry }
Expand All @@ -538,6 +560,10 @@ export type WorkflowEntryKind =
| {
readonly tag: "WorkflowRemovedEntry";
readonly val: WorkflowRemovedEntry;
}
| {
readonly tag: "WorkflowVersionCheckEntry";
readonly val: WorkflowVersionCheckEntry;
};

export function readWorkflowEntryKind(bc: bare.ByteCursor): WorkflowEntryKind {
Expand Down Expand Up @@ -572,6 +598,11 @@ export function readWorkflowEntryKind(bc: bare.ByteCursor): WorkflowEntryKind {
tag: "WorkflowRemovedEntry",
val: readWorkflowRemovedEntry(bc),
};
case 8:
return {
tag: "WorkflowVersionCheckEntry",
val: readWorkflowVersionCheckEntry(bc),
};
default: {
bc.offset = offset;
throw new bare.BareError(offset, "invalid tag");
Expand Down Expand Up @@ -624,6 +655,11 @@ export function writeWorkflowEntryKind(
writeWorkflowRemovedEntry(bc, x.val);
break;
}
case "WorkflowVersionCheckEntry": {
bare.writeU8(bc, 8);
writeWorkflowVersionCheckEntry(bc, x.val);
break;
}
}
}

Expand Down
12 changes: 12 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/registry/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,10 @@ function serializeWorkflowEntryKind(
| {
tag: "WorkflowRemovedEntry";
val: { originalType: string; originalName: string | null };
}
| {
tag: "WorkflowVersionCheckEntry";
val: { resolved: number; latest: number };
} {
switch (kind.tag) {
case "WorkflowStepEntry":
Expand Down Expand Up @@ -972,6 +976,14 @@ function serializeWorkflowEntryKind(
originalName: kind.val.originalName,
},
};
case "WorkflowVersionCheckEntry":
return {
tag: kind.tag,
val: {
resolved: kind.val.resolved,
latest: kind.val.latest,
},
};
}
}

Expand Down
4 changes: 4 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/workflow/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,10 @@ export class WorkflowContext<
await this.#wrapActive(() => this.#inner.removed(name, originalType));
}

getVersion(name: string, latest: number): Promise<number> {
return this.#wrapActive(() => this.#inner.getVersion(name, latest));
}

isEvicted(): boolean {
return this.#inner.isEvicted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,14 @@ function toWorkflowEntryKind(kind: EntryKind): transport.WorkflowEntryKind {
originalName: kind.data.originalName ?? null,
},
};
case "version_check":
return {
tag: "WorkflowVersionCheckEntry",
val: {
resolved: kind.data.resolved,
latest: kind.data.latest,
},
};
default:
assertUnreachable(kind as never);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,17 @@ describe("inspector workflow transport", () => {
},
},
},
{
id: "entry-2",
location: [{ tag: "WorkflowNameIndex", val: 1 }],
kind: {
tag: "WorkflowVersionCheckEntry",
val: {
resolved: 1,
latest: 2,
},
},
},
],
entryMetadata: new Map([
[
Expand Down
16 changes: 16 additions & 0 deletions rivetkit-typescript/packages/workflow-engine/schemas/serde.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,14 @@ function entryKindToBare(kind: InternalEntryKind): v1.EntryKind {
originalName: kind.data.originalName ?? null,
},
};
case "version_check":
return {
tag: "VersionCheckEntry",
val: {
resolved: kind.data.resolved,
latest: kind.data.latest,
},
};
}
}

Expand Down Expand Up @@ -396,6 +404,14 @@ function entryKindFromBare(kind: v1.EntryKind): InternalEntryKind {
originalName: kind.val.originalName ?? undefined,
},
};
case "VersionCheckEntry":
return {
type: "version_check",
data: {
resolved: kind.val.resolved,
latest: kind.val.latest,
},
};
default:
throw new Error(
`Unknown entry kind: ${(kind as { tag: string }).tag}`,
Expand Down
11 changes: 10 additions & 1 deletion rivetkit-typescript/packages/workflow-engine/schemas/v1.bare
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ type RemovedEntry struct {
originalName: optional<str>
}

# MARK: Version Check Entry
type VersionCheckEntry struct {
# The version this instance resolved to at this location
resolved: u32
# The `latest` value seen when first resolved (diagnostics)
latest: u32
}

# MARK: Entry Kind
# Type-specific entry data
type EntryKind union {
Expand All @@ -132,7 +140,8 @@ type EntryKind union {
RollbackCheckpointEntry |
JoinEntry |
RaceEntry |
RemovedEntry
RemovedEntry |
VersionCheckEntry
}

# MARK: Entry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export type {
RemovedEntry,
SleepEntry,
StepEntry,
VersionCheckEntry,
WorkflowMetadata,
} from "../dist/schemas/v1.js";

Expand Down
93 changes: 93 additions & 0 deletions rivetkit-typescript/packages/workflow-engine/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2581,4 +2581,97 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
setEntry(this.storage, location, entry);
await this.flushStorage();
}

// === Version ===

async getVersion(name: string, latest: number): Promise<number> {
this.assertNotInProgress();
this.checkEvicted();

this.entryInProgress = true;
try {
return await this.executeGetVersion(name, latest);
} finally {
this.entryInProgress = false;
}
}

private async executeGetVersion(
name: string,
latest: number,
): Promise<number> {
if (!Number.isInteger(latest) || latest < 1) {
throw new Error(
`getVersion("${name}", ${latest}): latest must be an integer >= 1`,
);
}

// Check for duplicate name in current execution
this.checkDuplicateName(name);

const location = appendName(this.storage, this.currentLocation, name);
const key = locationToKey(this.storage, location);
const existing = this.storage.history.entries.get(key);

// Mark this entry as visited for validateComplete
this.markVisited(key);

this.stopRollbackIfMissing(existing);

if (existing) {
if (existing.kind.type !== "version_check") {
throw new HistoryDivergedError(
`Expected version_check at ${key}, found ${existing.kind.type}`,
);
}
// Pure replay: this instance is already pinned at this location.
return existing.kind.data.resolved;
}

// No recorded version at this location. Decide whether this instance
// already executed past this point under older code (old in-flight) or
// is reaching it fresh at the live frontier.
//
// The discriminator is "is there any unvisited history entry under the
// current scope?". Entries created earlier in this same run are already
// marked visited, so the only unvisited entries under the scope are
// leftovers from a prior run, which proves this scope already executed
// under code that predates this gate. Such instances resolve to the
// implicit floor version 1 (old branch); fresh instances resolve to
// `latest`.
const resolved = this.hasUnvisitedUnderCurrentScope(key) ? 1 : latest;

const entry = createEntry(location, {
type: "version_check",
data: { resolved, latest },
});
setEntry(this.storage, location, entry);
await this.flushStorage();

return resolved;
}

/**
* Returns true if any history entry under the current location scope
* (other than `excludeKey`) has not yet been visited this run. Used by
* getVersion to detect an old in-flight instance whose scope was already
* executed by code that predates a version gate.
*/
private hasUnvisitedUnderCurrentScope(excludeKey: string): boolean {
const prefix = locationToKey(this.storage, this.currentLocation);

for (const key of this.storage.history.entries.keys()) {
if (key === excludeKey) {
continue;
}
const isUnderPrefix =
prefix === ""
? true
: key.startsWith(`${prefix}/`) || key === prefix;
if (isUnderPrefix && !this.visitedKeys.has(key)) {
return true;
}
}
return false;
}
}
Loading
Loading