From cef7a636113ec930d8ea76932a25b4f4f80d41de Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 24 Jun 2026 12:29:36 +0100 Subject: [PATCH 1/3] feat(run-engine,webapp): always report worker queue length metrics The runqueue.workerQueue.length gauge only observed worker queues that a dequeue had registered, so a queue's depth stopped being reported once dequeues stopped (or was never reported for a queue that backed up before anything dequeued from it). A periodic observer now refreshes the observed set from the WorkerInstanceGroup records instead, so every active worker queue (and its scheduled split variant) keeps reporting its length regardless of dequeue activity. Off by default; enable per-service via an env var. Reads from the replica and skips configured cloud providers. --- .../worker-queue-length-always-reported.md | 6 + apps/webapp/app/env.server.ts | 6 + apps/webapp/app/v3/runEngine.server.ts | 13 ++ .../run-engine/src/engine/index.ts | 97 +++++++++++- .../tests/workerQueueObservation.test.ts | 145 ++++++++++++++++++ .../run-engine/src/engine/types.ts | 25 +++ .../run-engine/src/run-queue/index.ts | 10 ++ 7 files changed, 299 insertions(+), 3 deletions(-) create mode 100644 .server-changes/worker-queue-length-always-reported.md create mode 100644 internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts diff --git a/.server-changes/worker-queue-length-always-reported.md b/.server-changes/worker-queue-length-always-reported.md new file mode 100644 index 00000000000..ccce193d9f5 --- /dev/null +++ b/.server-changes/worker-queue-length-always-reported.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Optionally report worker queue length metrics continuously (enabled per-service via the RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED env var) so a queue's depth keeps being emitted even when nothing is dequeuing from it. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index d9c97711940..efe04886222 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -810,6 +810,12 @@ const EnvironmentSchema = z RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200), RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10), RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000), + // Off by default. Enable on a single service (e.g. the engine worker) so only one + // instance reports worker queue length, rather than every replica. + RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED: z.string().default("0"), + RUN_ENGINE_WORKER_QUEUE_OBSERVER_INTERVAL_MS: z.coerce.number().int().default(30_000), + // Comma-separated cloud providers to exclude from worker queue length observation. + RUN_ENGINE_WORKER_QUEUE_OBSERVER_EXCLUDED_CLOUD_PROVIDERS: z.string().default("digitalocean"), RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10_000), RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10), RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT: z.coerce.number().int().default(10), diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index e2c8ad85e94..06cb591a0b4 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -2,6 +2,7 @@ import { RunEngine } from "@internal/run-engine"; import { $replica, prisma } from "~/db.server"; import { env } from "~/env.server"; import { createBatchGlobalRateLimiter } from "~/runEngine/concerns/batchGlobalRateLimiter.server"; +import { SCHEDULED_WORKER_QUEUE_SUFFIX } from "~/runEngine/concerns/workerQueueSplit.server"; import { logger } from "~/services/logger.server"; import { defaultMachine, getCurrentPlan } from "~/services/platform.v3.server"; import { singleton } from "~/utils/singleton"; @@ -121,6 +122,18 @@ function createRunEngine() { }, tracer, meter, + workerQueueObserver: { + enabled: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED === "1", + intervalMs: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_INTERVAL_MS, + // Also observe the scheduled split variant of each worker queue. The suffix + // naming convention lives in the webapp, so it is passed in here. + additionalQueueSuffixes: [SCHEDULED_WORKER_QUEUE_SUFFIX], + excludedCloudProviders: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_EXCLUDED_CLOUD_PROVIDERS.split( + "," + ) + .map((provider) => provider.trim()) + .filter(Boolean), + }, defaultMaxTtl: env.RUN_ENGINE_DEFAULT_MAX_TTL, heartbeatTimeoutsMs: { PENDING_EXECUTING: env.RUN_ENGINE_TIMEOUT_PENDING_EXECUTING, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 2b434a86eec..b007b48ffbb 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -33,7 +33,7 @@ import { import { Worker } from "@trigger.dev/redis-worker"; import { assertNever } from "assert-never"; import { EventEmitter } from "node:events"; -import { setTimeout } from "node:timers/promises"; +import { setInterval, setTimeout } from "node:timers/promises"; import { BatchQueue } from "../batch-queue/index.js"; import type { BatchItem, @@ -99,6 +99,7 @@ export class RunEngine { private heartbeatTimeouts: HeartbeatTimeouts; private repairSnapshotTimeoutMs: number; private batchQueue: BatchQueue; + private workerQueueObserverAbortController?: AbortController; prisma: PrismaClient; readOnlyPrisma: PrismaReplicaClient; @@ -470,6 +471,90 @@ export class RunEngine { machines: this.options.machines, billingCache: this.billingCache, }); + + this.#startWorkerQueueObserver(); + } + + /** + * Refreshes the set of worker queues observed by the `runqueue.workerQueue.length` + * gauge from the WorkerInstanceGroup records, so the gauge reports each worker queue's + * length even when nothing is dequeuing from it. Includes hidden groups; excludes + * groups whose cloud provider is configured to be excluded (groups with no cloud + * provider are always included). + */ + async refreshWorkerQueueObservation() { + const suffixes = this.options.workerQueueObserver?.additionalQueueSuffixes ?? []; + const excludedCloudProviders = new Set( + (this.options.workerQueueObserver?.excludedCloudProviders ?? []).map((p) => p.toLowerCase()) + ); + + // Read from the replica: this is a periodic metrics-only read and worker groups change + // rarely, so a little replication lag is fine and keeps it off the primary. + const workerGroups = await this.readOnlyPrisma.workerInstanceGroup.findMany({ + select: { masterQueue: true, cloudProvider: true }, + }); + + const workerQueues: string[] = []; + + for (const { masterQueue, cloudProvider } of workerGroups) { + if (cloudProvider && excludedCloudProviders.has(cloudProvider.toLowerCase())) { + continue; + } + + workerQueues.push(masterQueue); + + for (const suffix of suffixes) { + workerQueues.push(`${masterQueue}${suffix}`); + } + } + + this.runQueue.setObservableWorkerQueues(workerQueues); + } + + #startWorkerQueueObserver() { + if (!this.options.workerQueueObserver?.enabled) { + return; + } + + const intervalMs = this.options.workerQueueObserver.intervalMs ?? 30_000; + this.workerQueueObserverAbortController = new AbortController(); + + this.#runWorkerQueueObserver( + intervalMs, + this.workerQueueObserverAbortController.signal + ).catch((error) => { + this.logger.error("Worker queue observer loop crashed", { + error: error instanceof Error ? error.message : String(error), + }); + }); + } + + async #runWorkerQueueObserver(intervalMs: number, signal: AbortSignal) { + const refresh = async () => { + try { + await this.refreshWorkerQueueObservation(); + } catch (error) { + this.logger.error("Failed to refresh worker queue observation", { + error: error instanceof Error ? error.message : String(error), + }); + } + }; + + // Refresh once immediately so a freshly started instance reports queue lengths + // without waiting for the first interval, then keep it fresh on an interval. + await refresh(); + + try { + for await (const _ of setInterval(intervalMs, null, { signal })) { + await refresh(); + } + } catch (error) { + if (error instanceof Error && error.name !== "AbortError") { + throw error; + } + + this.logger.debug("Worker queue observer stopped"); + } } //MARK: - Run functions @@ -1321,8 +1406,11 @@ export class RunEngine { blockingPop?: boolean; blockingPopTimeoutSeconds?: number; }): Promise { - if (!skipObserving) { - // We only do this with "prod" worker queues because we don't want to observe dev (e.g. environment) worker queues + // We only do this with "prod" worker queues because we don't want to observe dev (e.g. + // environment) worker queues. When the worker queue observer is enabled it is the source + // of truth for the observed set (and applies the cloud-provider exclusions), so the + // per-dequeue registration is skipped. + if (!skipObserving && !this.options.workerQueueObserver?.enabled) { this.runQueue.registerObservableWorkerQueue(workerQueue); } @@ -2060,6 +2148,9 @@ export class RunEngine { async quit() { try { + // stop the worker queue observer loop + this.workerQueueObserverAbortController?.abort(); + //stop the run queue await this.runQueue.quit(); await this.worker.stop(); diff --git a/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts b/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts new file mode 100644 index 00000000000..a7af72183fd --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts @@ -0,0 +1,145 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { expect } from "vitest"; +import { RunEngine } from "../index.js"; +import { setupAuthenticatedEnvironment } from "./setup.js"; +import { createTestMetricsMeter } from "./helpers/replicaTestHelpers.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +const WORKER_QUEUE_LENGTH_METRIC = "runqueue.workerQueue.length"; +const WORKER_QUEUE_ATTRIBUTE = "runqueue.workerQueue"; + +describe("RunEngine worker queue observation", () => { + containerTest( + "reports worker queue length from WorkerInstanceGroup records without any dequeue", + async ({ prisma, redisOptions }) => { + const { meter, getCounterValue } = createTestMetricsMeter(); + + // Seeds a MANAGED WorkerInstanceGroup with masterQueue "default" (no cloud provider). + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // A hidden worker group should still be observed. + await prisma.workerInstanceGroup.create({ + data: { + name: "hidden-region", + masterQueue: "hidden-region", + type: "MANAGED", + hidden: true, + cloudProvider: "aws", + token: { create: { tokenHash: "hidden_region_token_hash" } }, + }, + }); + + // A DigitalOcean worker group should be excluded from observation. + await prisma.workerInstanceGroup.create({ + data: { + name: "do-region", + masterQueue: "do-region", + type: "MANAGED", + hidden: true, + cloudProvider: "digitalocean", + token: { create: { tokenHash: "do_region_token_hash" } }, + }, + }); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + meter, + workerQueueObserver: { + enabled: true, + intervalMs: 60_000, + additionalQueueSuffixes: [":scheduled"], + excludedCloudProviders: ["digitalocean"], + }, + }); + + const enqueueTo = async (workerQueue: string, count: number, prefix: string) => { + for (let i = 0; i < count; i++) { + await engine.runQueue.enqueueMessage({ + env: authenticatedEnvironment, + message: { + runId: `${prefix}_${i}`, + taskIdentifier: "task/my-task", + orgId: authenticatedEnvironment.organization.id, + projectId: authenticatedEnvironment.project.id, + environmentId: authenticatedEnvironment.id, + environmentType: "PRODUCTION", + queue: "task/my-task", + timestamp: Date.now(), + attempt: 0, + }, + workerQueue, + skipDequeueProcessing: true, + }); + } + }; + + const lengthOf = (workerQueue: string) => + getCounterValue(WORKER_QUEUE_LENGTH_METRIC, { + [WORKER_QUEUE_ATTRIBUTE]: workerQueue, + }); + + try { + // Keep the total under the environment concurrency limit (10) so every message moves + // into its worker queue list (processMasterQueueForEnvironment is concurrency-gated). + const defaultBacklog = 3; + const scheduledBacklog = 2; + const hiddenBacklog = 2; + const doBacklog = 2; + + // Build a backlog across several worker queues, then move them into the worker queue + // lists, but never dequeue. + await enqueueTo("default", defaultBacklog, "r_default"); + await enqueueTo("default:scheduled", scheduledBacklog, "r_scheduled"); + await enqueueTo("hidden-region", hiddenBacklog, "r_hidden"); + await enqueueTo("do-region", doBacklog, "r_do"); + await engine.runQueue.processMasterQueueForEnvironment( + authenticatedEnvironment.id, + defaultBacklog + scheduledBacklog + hiddenBacklog + doBacklog + ); + + // Observe the worker queues derived from the WorkerInstanceGroup records. No dequeue + // has happened, so this is the only thing that registers them for observation. + await engine.refreshWorkerQueueObservation(); + + // Reported: the default queue, its scheduled split variant, and the hidden group. + expect(await lengthOf("default")).toBe(defaultBacklog); + expect(await lengthOf("default:scheduled")).toBe(scheduledBacklog); + expect(await lengthOf("hidden-region")).toBe(hiddenBacklog); + + // Excluded: the DigitalOcean group is not observed even though it has a backlog. + expect(await lengthOf("do-region")).toBe(0); + } finally { + await engine.quit(); + } + } + ); +}); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 0077478318b..2ec72c2938e 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -211,6 +211,31 @@ export type RunEngineOptions = { * the since snapshot is not yet on the replica, before falling back to the primary. * Set maxMs to 0 (or any value <= 0) to skip the replica retry and go straight to the primary. */ readReplicaSnapshotsSinceRetryDelay?: { minMs: number; maxMs: number }; + /** + * Periodically refreshes the set of worker queues observed by the + * `runqueue.workerQueue.length` gauge from the WorkerInstanceGroup records, so the + * gauge reports every active worker queue's length even when this instance is not + * dequeuing from them (a dequeue is otherwise the only thing that registers a worker + * queue for observation). When enabled the observer is the source of truth for the + * observed set, so the per-dequeue registration is skipped. Disabled by default; the + * server enables it. + */ + workerQueueObserver?: { + enabled?: boolean; + /** How often to refresh the observed worker queue set from the database (ms). Default: 30_000. */ + intervalMs?: number; + /** + * Extra suffix variants to also observe for each worker queue, e.g. the scheduled + * split queue suffix. The suffix value lives with the caller that owns the naming + * convention rather than in the engine. Default: []. + */ + additionalQueueSuffixes?: string[]; + /** + * Worker groups whose `cloudProvider` is in this list are not observed. Groups with + * no `cloudProvider` are always observed. Matched case-insensitively. Default: []. + */ + excludedCloudProviders?: string[]; + }; tracer: Tracer; meter?: Meter; logger?: Logger; diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index c695cacad07..9808f96f41a 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -318,6 +318,16 @@ export class RunQueue { this._observableWorkerQueues.add(workerQueue); } + /** + * Replaces the full set of worker queues observed by the `runqueue.workerQueue.length` + * gauge. Used by a periodic observer that derives the set from the current worker + * groups, so the observed set stays correct (and prunes queues that no longer exist) + * independent of dequeue activity. + */ + public setObservableWorkerQueues(workerQueues: string[]) { + this._observableWorkerQueues = new Set(workerQueues); + } + async #updateWorkerQueueLength(observableResult: ObservableResult) { for (const workerQueue of this._observableWorkerQueues) { const workerQueueLength = await this.redis.llen(this.keys.workerQueueKey(workerQueue)); From 1a33fe5b5ace6099aa73cf84eaf68a3748f74ac2 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 24 Jun 2026 12:29:36 +0100 Subject: [PATCH 2/3] chore(webapp): remove unused worker group management API endpoints The GET and POST /api/v1/workers endpoints backed a CLI command group that is no longer registered, so they had no reachable consumer. Remove them. --- .../remove-worker-create-endpoint.md | 6 ++ apps/webapp/app/routes/api.v1.workers.ts | 73 ------------------- 2 files changed, 6 insertions(+), 73 deletions(-) create mode 100644 .server-changes/remove-worker-create-endpoint.md delete mode 100644 apps/webapp/app/routes/api.v1.workers.ts diff --git a/.server-changes/remove-worker-create-endpoint.md b/.server-changes/remove-worker-create-endpoint.md new file mode 100644 index 00000000000..dd7c2041876 --- /dev/null +++ b/.server-changes/remove-worker-create-endpoint.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: breaking +--- + +Remove the unused worker group management API endpoints (GET and POST /api/v1/workers). diff --git a/apps/webapp/app/routes/api.v1.workers.ts b/apps/webapp/app/routes/api.v1.workers.ts deleted file mode 100644 index 4008d64f1a9..00000000000 --- a/apps/webapp/app/routes/api.v1.workers.ts +++ /dev/null @@ -1,73 +0,0 @@ -import { json, TypedResponse } from "@remix-run/server-runtime"; -import { - WorkersCreateRequestBody, - WorkersCreateResponseBody, - WorkersListResponseBody, -} from "@trigger.dev/core/v3"; -import { - createActionApiRoute, - createLoaderApiRoute, -} from "~/services/routeBuilders/apiBuilder.server"; -import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.server"; - -export const loader = createLoaderApiRoute( - { - corsStrategy: "all", - findResource: async () => 1, // This is a dummy function, we don't need to find a resource - }, - async ({ - authentication, - }): Promise> => { - if (authentication.environment.project.engine !== "V2") { - return json({ error: "Not supported for V1 projects" }, { status: 400 }); - } - - const service = new WorkerGroupService(); - const workers = await service.listWorkerGroups({ - projectId: authentication.environment.projectId, - }); - - return json( - workers.map((w) => ({ - type: w.type, - name: w.name, - description: w.description, - isDefault: w.id === authentication.environment.project.defaultWorkerGroupId, - updatedAt: w.updatedAt, - })) - ); - } -); - -export const { action } = createActionApiRoute( - { - corsStrategy: "all", - body: WorkersCreateRequestBody, - }, - async ({ - authentication, - body, - }): Promise> => { - if (authentication.environment.project.engine !== "V2") { - return json({ error: "Not supported" }, { status: 400 }); - } - - const service = new WorkerGroupService(); - const { workerGroup, token } = await service.createWorkerGroup({ - projectId: authentication.environment.projectId, - organizationId: authentication.environment.organizationId, - name: body.name, - description: body.description, - }); - - return json({ - token: { - plaintext: token.plaintext, - }, - workerGroup: { - name: workerGroup.name, - description: workerGroup.description, - }, - }); - } -); From 7821c3a496e7f87e74faf37f3de9451c7b8dfb56 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 24 Jun 2026 13:36:22 +0100 Subject: [PATCH 3/3] test(run-engine): keep worker queue observation test lean to avoid CI timeout Disable the execution workers and batch consumers in the worker queue observation test. It only needs enqueue + processMasterQueue + the observer gauge, and the extra workers add Redis connections and make engine.quit() hang on worker shutdown when the shard's Redis is under pressure, timing the test out in CI. --- .../src/engine/tests/workerQueueObservation.test.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts b/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts index a7af72183fd..7e84db8781c 100644 --- a/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts +++ b/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts @@ -45,17 +45,22 @@ describe("RunEngine worker queue observation", () => { const engine = new RunEngine({ prisma, + // This test only exercises enqueue + processMasterQueue + the observer gauge, so keep + // the engine lean: no execution workers or batch consumers to start up and tear down. worker: { redis: redisOptions, - workers: 1, - tasksPerWorker: 10, - pollIntervalMs: 100, + disabled: true, + shutdownTimeoutMs: 2000, }, queue: { redis: redisOptions, masterQueueConsumersDisabled: true, processWorkerQueueDebounceMs: 50, }, + batchQueue: { + redis: redisOptions, + consumerEnabled: false, + }, runLock: { redis: redisOptions, },