From 29085fa05e28a0c460e4728f8fb4fc4dd7871b05 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Fri, 29 May 2026 09:01:56 +0300 Subject: [PATCH 1/4] add Redis queue tutorial --- docs.json | 1 + redis/tutorials/redis_queue.mdx | 289 ++++++++++++++++++++++++++++++++ 2 files changed, 290 insertions(+) create mode 100644 redis/tutorials/redis_queue.mdx diff --git a/docs.json b/docs.json index f9348742..c9801440 100644 --- a/docs.json +++ b/docs.json @@ -879,6 +879,7 @@ "redis/tutorials/goapi", "redis/tutorials/histogram", "redis/tutorials/job_processing", + "redis/tutorials/redis_queue", "redis/tutorials/nextjs_with_redis", "redis/tutorials/notification", "redis/tutorials/nuxtjs_with_redis", diff --git a/redis/tutorials/redis_queue.mdx b/redis/tutorials/redis_queue.mdx new file mode 100644 index 00000000..bb45e0a0 --- /dev/null +++ b/redis/tutorials/redis_queue.mdx @@ -0,0 +1,289 @@ +--- +title: "Redis Queue: From FIFO Lists to a Job Queue" +description: "How to build FIFO, reliable, and blocking queues on Upstash Redis, then a delayed and prioritized job queue with a dead-letter queue." +--- + +## What is a queue? + +A queue holds items in order: the first item added is the first one taken out +(FIFO, or First In First Out), like a line of people waiting for service. In +software, a queue lets one part of your system hand work to another so the producer +doesn't have to wait for the work to finish. + +Redis works well for queues. Its [list](/redis/sdks/ts/commands/list/lpush) and +[sorted set](/redis/sdks/ts/commands/zset/zadd) types map onto queue operations, + and operations are atomic, so multiple producers and +consumers can share a queue without stepping on each other. With +[Upstash Redis](https://upstash.com/) you reach it over HTTP, so the same queue +works from a serverless function or a long-running worker. + +This tutorial starts with a plain FIFO queue and builds up to a job queue that +supports delays, priorities, and retries. + +### Database Setup + +Create a Redis database using the [Upstash Console](https://console.upstash.com), +and add `UPSTASH_REDIS_REST_URL` and +`UPSTASH_REDIS_REST_TOKEN` to your `.env` file: + +```bash +UPSTASH_REDIS_REST_URL=your_upstash_redis_url +UPSTASH_REDIS_REST_TOKEN=your_upstash_redis_token +``` + +### Installation + +```bash +npm install @upstash/redis +``` + +Then create a client. `Redis.fromEnv()` reads the two variables above +automatically: + +```ts +import { Redis } from "@upstash/redis"; + +const redis = Redis.fromEnv(); +``` + +## A basic FIFO queue + +A Redis list is all you need for a simple queue. You push items onto one end and +pop them off the other. The convention is to enqueue on the left with `LPUSH` and +dequeue on the right with `RPOP`, so the oldest item is always the next one out. + +```ts +// Producer — add tasks to the queue +await redis.lpush("tasks", "send-welcome-email:alice"); +await redis.lpush("tasks", "send-welcome-email:bob"); + +// Consumer — take the next task (returns "send-welcome-email:alice") +const task = await redis.rpop("tasks"); +``` + +| Operation | Command | Description | +| --- | --- | --- | +| Enqueue | `LPUSH queue task` | Add a task to the head of the list | +| Dequeue | `RPOP queue` | Remove and return the oldest task | +| Peek | `LRANGE queue -1 -1` | Look at the next task without removing it | +| Length | `LLEN queue` | How many tasks are waiting | + +To peek at the next task without removing it, read the tail with `LRANGE`. This is +useful for monitoring or for deciding whether to process: + +```ts +const [next] = await redis.lrange("tasks", -1, -1); +``` + +This is enough for fire-and-forget work, but it has a flaw: if your consumer pops a +task and then crashes before finishing it, that task is lost. The next sections fix +that. + +## A reliable queue + +To avoid losing tasks, don't fully remove a task until it has been processed. +Redis's `LMOVE` command (the modern replacement for `RPOPLPUSH`) atomically moves a +task from the main queue to a "processing" list in a single step: + +```ts +// Atomically move the next task from "tasks" to "processing" +const task = await redis.lmove("tasks", "processing", "right", "left"); + +if (task) { + try { + await handle(task); + // Success: remove it from the processing list + await redis.lrem("processing", 1, task); + } catch (err) { + // Failure: the task is still safe in "processing". + // A recovery job can move stuck tasks back to "tasks" later. + console.error("task failed, left in processing list", err); + } +} +``` + +Because the task stays in `processing` while it is being worked on, a crashed +consumer doesn't cause data loss. A periodic recovery job can scan `processing` for +tasks that have been stuck too long and move them back to `tasks`. + +## A blocking queue + +Polling an empty queue in a tight loop wastes CPU and network calls. Blocking pop +commands let a consumer sleep until a task arrives. `BRPOP` blocks on the right end +of the list and returns as soon as an item is available, or after a timeout: + +```ts +// Wait up to 5 seconds for a task. Use 0 to block indefinitely. +const result = await redis.brpop("tasks", 5); +// result is [queueName, task] or null on timeout +``` + + +Blocking commands hold the connection open for the duration of the wait. This suits +long-running workers, but in short-lived serverless functions you usually want a +short timeout (or plain `RPOP` polling) so the function doesn't hang on an idle +queue. For event-driven serverless delivery, consider +[Upstash QStash](/qstash/overall/getstarted), which pushes messages to your HTTP +endpoint instead. + + +## Going further: a delayed, prioritized job queue with retries + +Plain lists are FIFO and immediate. Real job systems usually need a few more things: + +- Delays: run a job later, like a reminder in an hour, instead of right away. +- Priorities: let urgent jobs go ahead of routine ones. +- Retries: when a job keeps failing, retry it a few times and then set it aside + rather than retrying forever. + +A sorted set handles all three. It keeps members ordered by a numeric score. If the +score is the time a job becomes due, the member with the lowest score is always the +next job to run, and jobs scheduled for the future sit untouched until their time +comes. + +### The schedule: score = "run at" timestamp + +```ts +type Job = { id: string; type: string; payload: unknown }; + +// Enqueue a job to run after `delayMs` (0 = immediately). +// `priority` shaves milliseconds off the score so higher-priority jobs +// of the same due-time sort first. +async function enqueue(job: Job, delayMs = 0, priority = 0) { + const runAt = Date.now() + delayMs - priority; + await redis.zadd("jobs:scheduled", { score: runAt, member: JSON.stringify(job) }); +} + +// Routine job, runs now +await enqueue({ id: "1", type: "email", payload: { to: "alice@example.com" } }); + +// Reminder, runs in one hour +await enqueue({ id: "2", type: "reminder", payload: { userId: 42 } }, 60 * 60 * 1000); + +// Urgent job — higher priority means it sorts ahead of same-time jobs +await enqueue({ id: "3", type: "alert", payload: { level: "critical" } }, 0, 10_000); +``` + +### Claiming due jobs atomically + +A worker should only pick up jobs whose `runAt` is in the past, and when several +workers run at once, no two should claim the same job. A Lua script handles this: it +reads the due jobs and removes them in one step, so there is no race between the +read and the remove. + +```ts +// Returns up to `limit` jobs that are due, and removes them from the schedule +// in the same atomic operation. +const CLAIM = ` + local due = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2]) + if #due > 0 then + redis.call('ZREM', KEYS[1], unpack(due)) + end + return due +`; + +async function claimDueJobs(limit = 10): Promise { + return (await redis.eval( + CLAIM, + ["jobs:scheduled"], + [Date.now().toString(), limit.toString()], + )) as string[]; +} +``` + +### The worker: process, retry, or dead-letter + +When a job fails, we re-schedule it with an exponential back-off delay and increment +its attempt count. Once it runs out of retries, it goes to a dead-letter queue: a +separate list of jobs to look at later, rather than retrying them again. + +```ts +const MAX_ATTEMPTS = 3; + +async function processJob(raw: string) { + const job = JSON.parse(raw) as Job & { attempts?: number }; + const attempts = job.attempts ?? 0; + + try { + await handle(job); // your business logic + } catch (err) { + if (attempts + 1 >= MAX_ATTEMPTS) { + // Out of retries: set it aside for inspection + await redis.lpush( + "jobs:dead-letter", + JSON.stringify({ job, error: String(err), failedAt: Date.now() }), + ); + } else { + // Re-schedule with exponential back-off: 1s, 2s, 4s, ... + const backoffMs = 1000 * 2 ** attempts; + await enqueue({ ...job, attempts: attempts + 1 }, backoffMs); + } + } +} + +// Worker loop +async function runWorker() { + while (true) { + const jobs = await claimDueJobs(); + if (jobs.length === 0) { + await new Promise((r) => setTimeout(r, 1000)); // nothing due, wait before retrying + continue; + } + await Promise.all(jobs.map(processJob)); + } +} + +runWorker(); +``` + +This gives you a job queue that schedules work for later, runs urgent jobs first, +retries failures with back-off, and moves jobs that keep failing to a dead-letter +queue, all on a single sorted set. + +### Inspecting the dead-letter queue + +Failed jobs live in a list, so checking on them is straightforward. You can wire +this into a dashboard or an alert: + +```ts +const failedCount = await redis.llen("jobs:dead-letter"); +const failed = await redis.lrange("jobs:dead-letter", 0, 9); // 10 most recent +``` + +## Wrapping up + +Redis covers queues at several levels, all in the same database: + +- Lists (`LPUSH` / `RPOP`) for simple FIFO queues, plus `LMOVE` for reliability and + `BRPOP` for blocking consumers. +- Sorted sets (`ZADD` / `ZRANGEBYSCORE`) for delayed and prioritized scheduling. +- Lua scripts (`EVAL`) to claim work atomically across many concurrent workers. + +Because Upstash Redis is serverless and accessed over HTTP, you can produce jobs +from serverless or edge functions and consume them from wherever your workers run. + +### When to reach for QStash instead + +The job queue above assumes you have a worker process polling Redis. In a fully +serverless setup you often don't want to keep a worker running. That is where +[Upstash QStash](/qstash/overall/getstarted) comes in: instead of pulling jobs from +a list, you publish a message and QStash pushes it to your HTTP endpoint, with +retries, scheduling, and delays handled for you. + +```ts +import { Client } from "@upstash/qstash"; + +const qstash = new Client({ token: process.env.QSTASH_TOKEN! }); + +// Deliver this job to your endpoint after a one-hour delay +await qstash.publishJSON({ + url: "https://your-app.com/api/jobs/reminder", + body: { userId: 42 }, + delay: 60 * 60, // seconds +}); +``` + +A rough rule of thumb: use the Redis patterns in this guide when you control the +consumer and want full control over how jobs are stored and claimed. Reach for +QStash when you'd rather not run a worker at all and just want jobs delivered to an +endpoint. See the [QStash documentation](/qstash/overall/getstarted) to get started. From 12e9e55f35a322cb06ba6e6f06a77805690706cc Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 06:02:34 +0000 Subject: [PATCH 2/4] chore(llms): regenerate llms.txt and llms-full.txt --- llms-full.txt | 288 ++++++++++++++++++++++++++++++++++++++++++++++++++ llms.txt | 1 + 2 files changed, 289 insertions(+) diff --git a/llms-full.txt b/llms-full.txt index 30bb92cd..d9a76f89 100644 --- a/llms-full.txt +++ b/llms-full.txt @@ -32953,6 +32953,294 @@ serverless deploy Visit the output url. +# Redis Queue: From FIFO Lists to a Job Queue +Source: https://upstash.com/docs/redis/tutorials/redis_queue + +## What is a queue? + +A queue holds items in order: the first item added is the first one taken out +(FIFO, or First In First Out), like a line of people waiting for service. In +software, a queue lets one part of your system hand work to another so the producer +doesn't have to wait for the work to finish. + +Redis works well for queues. Its [list](/docs/redis/sdks/ts/commands/list/lpush) and +[sorted set](/docs/redis/sdks/ts/commands/zset/zadd) types map onto queue operations, + and operations are atomic, so multiple producers and +consumers can share a queue without stepping on each other. With +[Upstash Redis](https://upstash.com/) you reach it over HTTP, so the same queue +works from a serverless function or a long-running worker. + +This tutorial starts with a plain FIFO queue and builds up to a job queue that +supports delays, priorities, and retries. + +### Database Setup + +Create a Redis database using the [Upstash Console](https://console.upstash.com), +and add `UPSTASH_REDIS_REST_URL` and +`UPSTASH_REDIS_REST_TOKEN` to your `.env` file: + +```bash +UPSTASH_REDIS_REST_URL=your_upstash_redis_url +UPSTASH_REDIS_REST_TOKEN=your_upstash_redis_token +``` + +### Installation + +```bash +npm install @upstash/redis +``` + +Then create a client. `Redis.fromEnv()` reads the two variables above +automatically: + +```ts +import { Redis } from "@upstash/redis"; + +const redis = Redis.fromEnv(); +``` + +## A basic FIFO queue + +A Redis list is all you need for a simple queue. You push items onto one end and +pop them off the other. The convention is to enqueue on the left with `LPUSH` and +dequeue on the right with `RPOP`, so the oldest item is always the next one out. + +```ts +// Producer — add tasks to the queue +await redis.lpush("tasks", "send-welcome-email:alice"); +await redis.lpush("tasks", "send-welcome-email:bob"); + +// Consumer — take the next task (returns "send-welcome-email:alice") +const task = await redis.rpop("tasks"); +``` + +| Operation | Command | Description | +| --- | --- | --- | +| Enqueue | `LPUSH queue task` | Add a task to the head of the list | +| Dequeue | `RPOP queue` | Remove and return the oldest task | +| Peek | `LRANGE queue -1 -1` | Look at the next task without removing it | +| Length | `LLEN queue` | How many tasks are waiting | + +To peek at the next task without removing it, read the tail with `LRANGE`. This is +useful for monitoring or for deciding whether to process: + +```ts +const [next] = await redis.lrange("tasks", -1, -1); +``` + +This is enough for fire-and-forget work, but it has a flaw: if your consumer pops a +task and then crashes before finishing it, that task is lost. The next sections fix +that. + +## A reliable queue + +To avoid losing tasks, don't fully remove a task until it has been processed. +Redis's `LMOVE` command (the modern replacement for `RPOPLPUSH`) atomically moves a +task from the main queue to a "processing" list in a single step: + +```ts +// Atomically move the next task from "tasks" to "processing" +const task = await redis.lmove("tasks", "processing", "right", "left"); + +if (task) { + try { + await handle(task); + // Success: remove it from the processing list + await redis.lrem("processing", 1, task); + } catch (err) { + // Failure: the task is still safe in "processing". + // A recovery job can move stuck tasks back to "tasks" later. + console.error("task failed, left in processing list", err); + } +} +``` + +Because the task stays in `processing` while it is being worked on, a crashed +consumer doesn't cause data loss. A periodic recovery job can scan `processing` for +tasks that have been stuck too long and move them back to `tasks`. + +## A blocking queue + +Polling an empty queue in a tight loop wastes CPU and network calls. Blocking pop +commands let a consumer sleep until a task arrives. `BRPOP` blocks on the right end +of the list and returns as soon as an item is available, or after a timeout: + +```ts +// Wait up to 5 seconds for a task. Use 0 to block indefinitely. +const result = await redis.brpop("tasks", 5); +// result is [queueName, task] or null on timeout +``` + + +Blocking commands hold the connection open for the duration of the wait. This suits +long-running workers, but in short-lived serverless functions you usually want a +short timeout (or plain `RPOP` polling) so the function doesn't hang on an idle +queue. For event-driven serverless delivery, consider +[Upstash QStash](/docs/qstash/overall/getstarted), which pushes messages to your HTTP +endpoint instead. + + +## Going further: a delayed, prioritized job queue with retries + +Plain lists are FIFO and immediate. Real job systems usually need a few more things: + +* Delays: run a job later, like a reminder in an hour, instead of right away. +* Priorities: let urgent jobs go ahead of routine ones. +* Retries: when a job keeps failing, retry it a few times and then set it aside + rather than retrying forever. + +A sorted set handles all three. It keeps members ordered by a numeric score. If the +score is the time a job becomes due, the member with the lowest score is always the +next job to run, and jobs scheduled for the future sit untouched until their time +comes. + +### The schedule: score = "run at" timestamp + +```ts +type Job = { id: string; type: string; payload: unknown }; + +// Enqueue a job to run after `delayMs` (0 = immediately). +// `priority` shaves milliseconds off the score so higher-priority jobs +// of the same due-time sort first. +async function enqueue(job: Job, delayMs = 0, priority = 0) { + const runAt = Date.now() + delayMs - priority; + await redis.zadd("jobs:scheduled", { score: runAt, member: JSON.stringify(job) }); +} + +// Routine job, runs now +await enqueue({ id: "1", type: "email", payload: { to: "alice@example.com" } }); + +// Reminder, runs in one hour +await enqueue({ id: "2", type: "reminder", payload: { userId: 42 } }, 60 * 60 * 1000); + +// Urgent job — higher priority means it sorts ahead of same-time jobs +await enqueue({ id: "3", type: "alert", payload: { level: "critical" } }, 0, 10_000); +``` + +### Claiming due jobs atomically + +A worker should only pick up jobs whose `runAt` is in the past, and when several +workers run at once, no two should claim the same job. A Lua script handles this: it +reads the due jobs and removes them in one step, so there is no race between the +read and the remove. + +```ts +// Returns up to `limit` jobs that are due, and removes them from the schedule +// in the same atomic operation. +const CLAIM = ` + local due = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2]) + if #due > 0 then + redis.call('ZREM', KEYS[1], unpack(due)) + end + return due +`; + +async function claimDueJobs(limit = 10): Promise { + return (await redis.eval( + CLAIM, + ["jobs:scheduled"], + [Date.now().toString(), limit.toString()], + )) as string[]; +} +``` + +### The worker: process, retry, or dead-letter + +When a job fails, we re-schedule it with an exponential back-off delay and increment +its attempt count. Once it runs out of retries, it goes to a dead-letter queue: a +separate list of jobs to look at later, rather than retrying them again. + +```ts +const MAX_ATTEMPTS = 3; + +async function processJob(raw: string) { + const job = JSON.parse(raw) as Job & { attempts?: number }; + const attempts = job.attempts ?? 0; + + try { + await handle(job); // your business logic + } catch (err) { + if (attempts + 1 >= MAX_ATTEMPTS) { + // Out of retries: set it aside for inspection + await redis.lpush( + "jobs:dead-letter", + JSON.stringify({ job, error: String(err), failedAt: Date.now() }), + ); + } else { + // Re-schedule with exponential back-off: 1s, 2s, 4s, ... + const backoffMs = 1000 * 2 ** attempts; + await enqueue({ ...job, attempts: attempts + 1 }, backoffMs); + } + } +} + +// Worker loop +async function runWorker() { + while (true) { + const jobs = await claimDueJobs(); + if (jobs.length === 0) { + await new Promise((r) => setTimeout(r, 1000)); // nothing due, wait before retrying + continue; + } + await Promise.all(jobs.map(processJob)); + } +} + +runWorker(); +``` + +This gives you a job queue that schedules work for later, runs urgent jobs first, +retries failures with back-off, and moves jobs that keep failing to a dead-letter +queue, all on a single sorted set. + +### Inspecting the dead-letter queue + +Failed jobs live in a list, so checking on them is straightforward. You can wire +this into a dashboard or an alert: + +```ts +const failedCount = await redis.llen("jobs:dead-letter"); +const failed = await redis.lrange("jobs:dead-letter", 0, 9); // 10 most recent +``` + +## Wrapping up + +Redis covers queues at several levels, all in the same database: + +* Lists (`LPUSH` / `RPOP`) for simple FIFO queues, plus `LMOVE` for reliability and + `BRPOP` for blocking consumers. +* Sorted sets (`ZADD` / `ZRANGEBYSCORE`) for delayed and prioritized scheduling. +* Lua scripts (`EVAL`) to claim work atomically across many concurrent workers. + +Because Upstash Redis is serverless and accessed over HTTP, you can produce jobs +from serverless or edge functions and consume them from wherever your workers run. + +### When to reach for QStash instead + +The job queue above assumes you have a worker process polling Redis. In a fully +serverless setup you often don't want to keep a worker running. That is where +[Upstash QStash](/docs/qstash/overall/getstarted) comes in: instead of pulling jobs from +a list, you publish a message and QStash pushes it to your HTTP endpoint, with +retries, scheduling, and delays handled for you. + +```ts +import { Client } from "@upstash/qstash"; + +const qstash = new Client({ token: process.env.QSTASH_TOKEN! }); + +// Deliver this job to your endpoint after a one-hour delay +await qstash.publishJSON({ + url: "https://your-app.com/api/jobs/reminder", + body: { userId: 42 }, + delay: 60 * 60, // seconds +}); +``` + +A rough rule of thumb: use the Redis patterns in this guide when you control the +consumer and want full control over how jobs are stored and claimed. Reach for +QStash when you'd rather not run a worker at all and just want jobs delivered to an +endpoint. See the [QStash documentation](/docs/qstash/overall/getstarted) to get started. + # Serverless Redisson Source: https://upstash.com/docs/redis/tutorials/redisson diff --git a/llms.txt b/llms.txt index af527dbb..c7a5b314 100644 --- a/llms.txt +++ b/llms.txt @@ -813,6 +813,7 @@ - [Building a URL Shortener with Redis](https://upstash.com/docs/redis/tutorials/python_url_shortener.md) - [Serverless Python API with Redis](https://upstash.com/docs/redis/tutorials/pythonapi.md) - [AWS Lambda Rate Limiting with Serverless Redis](https://upstash.com/docs/redis/tutorials/rate-limiting.md) +- [Redis Queue: From FIFO Lists to a Job Queue](https://upstash.com/docs/redis/tutorials/redis_queue.md): How to build FIFO, reliable, and blocking queues on Upstash Redis, then a delayed and prioritized job queue with a dead-letter queue. - [Serverless Redisson](https://upstash.com/docs/redis/tutorials/redisson.md): This tutorial shows how to use Upstash with Redisson client. - [Roadmap Voting App with Serverless Redis](https://upstash.com/docs/redis/tutorials/roadmapvotingapp.md): This is a single page application powered by upstash and next.js. - [Serverless API with Java and Redis](https://upstash.com/docs/redis/tutorials/serverless_java_redis.md) From c5b438445e076f1225d25ff3809ecc539f143ce8 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Fri, 29 May 2026 10:03:50 +0300 Subject: [PATCH 3/4] fix: correct SDK usage in redis queue tutorial - replace non-existent redis.brpop with RPOP polling (HTTP SDK has no blocking commands) - type eval/lrange results as deserialized objects, not strings - pass objects directly to lpush instead of JSON.stringify - note Lua unpack portability and stack limits --- redis/tutorials/redis_queue.mdx | 66 ++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/redis/tutorials/redis_queue.mdx b/redis/tutorials/redis_queue.mdx index bb45e0a0..7235232a 100644 --- a/redis/tutorials/redis_queue.mdx +++ b/redis/tutorials/redis_queue.mdx @@ -106,25 +106,32 @@ Because the task stays in `processing` while it is being worked on, a crashed consumer doesn't cause data loss. A periodic recovery job can scan `processing` for tasks that have been stuck too long and move them back to `tasks`. -## A blocking queue +## Consuming the queue -Polling an empty queue in a tight loop wastes CPU and network calls. Blocking pop -commands let a consumer sleep until a task arrives. `BRPOP` blocks on the right end -of the list and returns as soon as an item is available, or after a timeout: +A consumer reads from the queue in a loop. With a TCP Redis client you might use a +blocking pop (`BRPOP`) to sleep until a task arrives, but `@upstash/redis` talks to +Redis over HTTP, so it doesn't expose blocking commands. Instead, poll with `RPOP` +and back off with a short sleep when the queue is empty: ```ts -// Wait up to 5 seconds for a task. Use 0 to block indefinitely. -const result = await redis.brpop("tasks", 5); -// result is [queueName, task] or null on timeout +async function consume() { + while (true) { + const task = await redis.rpop("tasks"); + if (task === null) { + await new Promise((r) => setTimeout(r, 1000)); // queue empty, wait before retrying + continue; + } + await handle(task); + } +} ``` -Blocking commands hold the connection open for the duration of the wait. This suits -long-running workers, but in short-lived serverless functions you usually want a -short timeout (or plain `RPOP` polling) so the function doesn't hang on an idle -queue. For event-driven serverless delivery, consider -[Upstash QStash](/qstash/overall/getstarted), which pushes messages to your HTTP -endpoint instead. +Polling works well for a long-running worker, but in short-lived serverless +functions you usually don't want to run a loop at all. For event-driven delivery, +consider [Upstash QStash](/qstash/overall/getstarted), which pushes messages to your +HTTP endpoint instead of making you poll. We come back to this +[at the end](#when-to-reach-for-qstash-instead). ## Going further: a delayed, prioritized job queue with retries @@ -145,11 +152,12 @@ comes. ```ts type Job = { id: string; type: string; payload: unknown }; +type ScheduledJob = Job & { attempts?: number }; // Enqueue a job to run after `delayMs` (0 = immediately). // `priority` shaves milliseconds off the score so higher-priority jobs // of the same due-time sort first. -async function enqueue(job: Job, delayMs = 0, priority = 0) { +async function enqueue(job: ScheduledJob, delayMs = 0, priority = 0) { const runAt = Date.now() + delayMs - priority; await redis.zadd("jobs:scheduled", { score: runAt, member: JSON.stringify(job) }); } @@ -173,7 +181,9 @@ read and the remove. ```ts // Returns up to `limit` jobs that are due, and removes them from the schedule -// in the same atomic operation. +// in the same atomic operation. `unpack` is available in Redis's Lua (5.1); on +// newer Lua you'd use `table.unpack`. Keep `limit` modest so the unpacked +// argument list stays well within Lua's stack limits. const CLAIM = ` local due = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2]) if #due > 0 then @@ -182,12 +192,14 @@ const CLAIM = ` return due `; -async function claimDueJobs(limit = 10): Promise { +async function claimDueJobs(limit = 10): Promise { + // The members were stored as JSON, and @upstash/redis deserializes JSON in + // command results by default, so `eval` hands back parsed objects, not strings. return (await redis.eval( CLAIM, ["jobs:scheduled"], [Date.now().toString(), limit.toString()], - )) as string[]; + )) as ScheduledJob[]; } ``` @@ -200,19 +212,20 @@ separate list of jobs to look at later, rather than retrying them again. ```ts const MAX_ATTEMPTS = 3; -async function processJob(raw: string) { - const job = JSON.parse(raw) as Job & { attempts?: number }; +async function processJob(job: ScheduledJob) { const attempts = job.attempts ?? 0; try { await handle(job); // your business logic } catch (err) { if (attempts + 1 >= MAX_ATTEMPTS) { - // Out of retries: set it aside for inspection - await redis.lpush( - "jobs:dead-letter", - JSON.stringify({ job, error: String(err), failedAt: Date.now() }), - ); + // Out of retries: set it aside for inspection. @upstash/redis serializes + // the object to JSON for us, so we pass it directly. + await redis.lpush("jobs:dead-letter", { + job, + error: String(err), + failedAt: Date.now(), + }); } else { // Re-schedule with exponential back-off: 1s, 2s, 4s, ... const backoffMs = 1000 * 2 ** attempts; @@ -246,8 +259,11 @@ Failed jobs live in a list, so checking on them is straightforward. You can wire this into a dashboard or an alert: ```ts +type DeadLetter = { job: ScheduledJob; error: string; failedAt: number }; + const failedCount = await redis.llen("jobs:dead-letter"); -const failed = await redis.lrange("jobs:dead-letter", 0, 9); // 10 most recent +// lrange also deserializes JSON, so these come back as objects, not strings. +const failed = await redis.lrange("jobs:dead-letter", 0, 9); // 10 most recent ``` ## Wrapping up From 5e1e00ba0e0bf83fade2397a684018ee43a56dea Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 07:04:36 +0000 Subject: [PATCH 4/4] chore(llms): regenerate llms.txt and llms-full.txt --- llms-full.txt | 66 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/llms-full.txt b/llms-full.txt index d9a76f89..4ce6a125 100644 --- a/llms-full.txt +++ b/llms-full.txt @@ -33059,25 +33059,32 @@ Because the task stays in `processing` while it is being worked on, a crashed consumer doesn't cause data loss. A periodic recovery job can scan `processing` for tasks that have been stuck too long and move them back to `tasks`. -## A blocking queue +## Consuming the queue -Polling an empty queue in a tight loop wastes CPU and network calls. Blocking pop -commands let a consumer sleep until a task arrives. `BRPOP` blocks on the right end -of the list and returns as soon as an item is available, or after a timeout: +A consumer reads from the queue in a loop. With a TCP Redis client you might use a +blocking pop (`BRPOP`) to sleep until a task arrives, but `@upstash/redis` talks to +Redis over HTTP, so it doesn't expose blocking commands. Instead, poll with `RPOP` +and back off with a short sleep when the queue is empty: ```ts -// Wait up to 5 seconds for a task. Use 0 to block indefinitely. -const result = await redis.brpop("tasks", 5); -// result is [queueName, task] or null on timeout +async function consume() { + while (true) { + const task = await redis.rpop("tasks"); + if (task === null) { + await new Promise((r) => setTimeout(r, 1000)); // queue empty, wait before retrying + continue; + } + await handle(task); + } +} ``` -Blocking commands hold the connection open for the duration of the wait. This suits -long-running workers, but in short-lived serverless functions you usually want a -short timeout (or plain `RPOP` polling) so the function doesn't hang on an idle -queue. For event-driven serverless delivery, consider -[Upstash QStash](/docs/qstash/overall/getstarted), which pushes messages to your HTTP -endpoint instead. +Polling works well for a long-running worker, but in short-lived serverless +functions you usually don't want to run a loop at all. For event-driven delivery, +consider [Upstash QStash](/docs/qstash/overall/getstarted), which pushes messages to your +HTTP endpoint instead of making you poll. We come back to this +[at the end](#when-to-reach-for-qstash-instead). ## Going further: a delayed, prioritized job queue with retries @@ -33098,11 +33105,12 @@ comes. ```ts type Job = { id: string; type: string; payload: unknown }; +type ScheduledJob = Job & { attempts?: number }; // Enqueue a job to run after `delayMs` (0 = immediately). // `priority` shaves milliseconds off the score so higher-priority jobs // of the same due-time sort first. -async function enqueue(job: Job, delayMs = 0, priority = 0) { +async function enqueue(job: ScheduledJob, delayMs = 0, priority = 0) { const runAt = Date.now() + delayMs - priority; await redis.zadd("jobs:scheduled", { score: runAt, member: JSON.stringify(job) }); } @@ -33126,7 +33134,9 @@ read and the remove. ```ts // Returns up to `limit` jobs that are due, and removes them from the schedule -// in the same atomic operation. +// in the same atomic operation. `unpack` is available in Redis's Lua (5.1); on +// newer Lua you'd use `table.unpack`. Keep `limit` modest so the unpacked +// argument list stays well within Lua's stack limits. const CLAIM = ` local due = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2]) if #due > 0 then @@ -33135,12 +33145,14 @@ const CLAIM = ` return due `; -async function claimDueJobs(limit = 10): Promise { +async function claimDueJobs(limit = 10): Promise { + // The members were stored as JSON, and @upstash/redis deserializes JSON in + // command results by default, so `eval` hands back parsed objects, not strings. return (await redis.eval( CLAIM, ["jobs:scheduled"], [Date.now().toString(), limit.toString()], - )) as string[]; + )) as ScheduledJob[]; } ``` @@ -33153,19 +33165,20 @@ separate list of jobs to look at later, rather than retrying them again. ```ts const MAX_ATTEMPTS = 3; -async function processJob(raw: string) { - const job = JSON.parse(raw) as Job & { attempts?: number }; +async function processJob(job: ScheduledJob) { const attempts = job.attempts ?? 0; try { await handle(job); // your business logic } catch (err) { if (attempts + 1 >= MAX_ATTEMPTS) { - // Out of retries: set it aside for inspection - await redis.lpush( - "jobs:dead-letter", - JSON.stringify({ job, error: String(err), failedAt: Date.now() }), - ); + // Out of retries: set it aside for inspection. @upstash/redis serializes + // the object to JSON for us, so we pass it directly. + await redis.lpush("jobs:dead-letter", { + job, + error: String(err), + failedAt: Date.now(), + }); } else { // Re-schedule with exponential back-off: 1s, 2s, 4s, ... const backoffMs = 1000 * 2 ** attempts; @@ -33199,8 +33212,11 @@ Failed jobs live in a list, so checking on them is straightforward. You can wire this into a dashboard or an alert: ```ts +type DeadLetter = { job: ScheduledJob; error: string; failedAt: number }; + const failedCount = await redis.llen("jobs:dead-letter"); -const failed = await redis.lrange("jobs:dead-letter", 0, 9); // 10 most recent +// lrange also deserializes JSON, so these come back as objects, not strings. +const failed = await redis.lrange("jobs:dead-letter", 0, 9); // 10 most recent ``` ## Wrapping up