From d5072c866c5b19aa6be9da02cc9bdfa01120c070 Mon Sep 17 00:00:00 2001 From: D050513 Date: Mon, 1 Jun 2026 11:32:20 +0200 Subject: [PATCH] review --- guides/events/event-queues-new.md | 403 +++++++++++++++++++++--------- 1 file changed, 287 insertions(+), 116 deletions(-) diff --git a/guides/events/event-queues-new.md b/guides/events/event-queues-new.md index 15d9ae546..695b31028 100644 --- a/guides/events/event-queues-new.md +++ b/guides/events/event-queues-new.md @@ -1,12 +1,12 @@ --- synopsis: > - Transactional Event Queues allow you to schedule events and background tasks for asynchronous, exactly-once processing with ultimate resilience. + Transactional Event Queues allow you to schedule events and background tasks for asynchronous, resilient processing in the same transaction as your business data. status: released --- # Transactional Event Queues -Persist events in the same database transaction as your business data. Process them asynchronously — with retries, ordering, and a dead letter queue. +Queue remote calls, inbound messages, and background tasks in the same transaction as your business data. CAP executes them later with retries, ordering, and dead letter handling. {.subtitle} {{ $frontmatter.synopsis }} @@ -14,41 +14,91 @@ Persist events in the same database transaction as your business data. Process t > [!tip] Guiding Principles > -> 1. **Transactional** — events are written to the database within the same transaction as your business data -> 2. **Asynchronous** — a background runner dispatches events after commit, not during the request -> 3. **Resilient** — failed events are retried with exponential backoff; unrecoverable ones land in a dead letter queue -> 4. **Unified** — one mechanism covers four use cases: outbox, inbox, background tasks, and callbacks +> 1. **Transactional** — queued work is written in the same transaction as your business data +> 2. **Asynchronous** — a background runner dispatches it after commit, not during the request +> 3. **Resilient** — failed work is retried with exponential backoff; unrecoverable entries land in a dead letter queue +> 4. **Unified** — one mechanism covers outbox, inbox, background tasks, and callbacks [[toc]] +## Before You Continue +This guide assumes familiarity with CAP services, transactions, and remote service connections. +For broker-based scenarios such as SAP Event Mesh or Kafka, basic messaging concepts are also helpful. ## Motivation -In distributed systems, things fail. A remote service may be temporarily unavailable, a network call may time out, or your process may crash right after committing a database transaction but before sending the follow-up message. -These failures leave your system in an inconsistent state — data is committed, but dependent side effects never happen. +Distributed side effects are hard to get right. +Your application may commit local data, but a follow-up remote call can still fail because of network errors, service outages, or a process crash. -_Transactional Event Queues_ solve this by persisting events and tasks in a database table **within the same transaction** as your business data. -After the transaction commits, a background runner picks up the queued entries and executes them asynchronously — with retries, exactly-once guarantees, and a dead letter queue for unrecoverable failures. +_Transactional Event Queues_ solve this by storing the follow-up work in the database as part of the same transaction as your business data. +After commit, a background runner executes that work asynchronously and retries failures until they succeed or become dead letters. -This pattern is widely known as the _Transactional Outbox_, but CAP's event queues go beyond outbound messages. -They provide a unified mechanism for four use cases: +This pattern is often known as the _Transactional Outbox_, but CAP's event queues go beyond outbound messages. +They provide one mechanism for four use cases: - **Outbox** — defer outbound calls to remote services until the transaction succeeds. - **Inbox** — acknowledge inbound messages immediately and process them asynchronously. - **Background Tasks** — schedule periodic or delayed tasks such as data replication. - **Callbacks** — react to completed or failed tasks, enabling SAGA-like compensation patterns. +Transactional Event Queues are not just a broker integration feature. +They are CAP’s general mechanism for persisting asynchronous work in the database, whether that work is an outbound message, an inbound message to be processed later, or a background task scheduled by your application. +## When to Use Them + +Use event queues when work must happen _after_ the current transaction commits, or when that work needs durable retries and dead letter handling. +If you need an immediate response from a remote system, use a normal synchronous service call instead. + +| If you need to... | Use event queues? | Why | +|---|---|---| +| Call a remote service only after DB commit | Yes | Prevents external side effects for rolled-back transactions | +| Process inbound broker messages without blocking acknowledgment | Yes | Lets the app acknowledge early and process later | +| Schedule delayed or recurring background work | Yes | Uses the same persistence and retry mechanism | +| Need an immediate synchronous response from a remote call | No | Queued requests execute later and discard the direct return value | +| Run purely local logic inside the same request | Usually no | Direct execution is simpler when no asynchronous boundary is needed | + +```mermaid +flowchart TD + A[Need asynchronous work?] -->|No| B[Use direct service call] + A -->|Yes| C[Must only happen after DB commit?] + C -->|Yes| D[Use transactional event queue] + C -->|No| E[Need durable retries and DLQ?] + E -->|Yes| D + E -->|No| F[Simple async logic may be enough] + + D --> G[Outbox] + D --> H[Inbox] + D --> I[Background task] + D --> J[Callback / compensation] +``` + +## Quick Start + +Use a queued service when a side effect must only happen after the current transaction commits. + +```js +const flights = await cds.connect.to('FlightService') +const queuedFlights = cds.queued(flights) + +this.after('CREATE', 'Travels', async travel => { + await queuedFlights.send('bookFlight', { travelId: travel.ID }) +}) +``` + +This stores the flight booking request in the database together with the travel creation. +CAP dispatches it later in the background. +If the transaction rolls back, no booking request is sent. ## How It Works { #concept } The core principle is straightforward: -1. Instead of executing side effects directly, you write an event message into a database table — **within the current transaction**. +1. Instead of executing side effects directly, you write a message into a database table — **within the current transaction**. 2. Once the transaction commits, a background runner reads pending messages and dispatches them to the respective service. -3. If processing succeeds, the message is deleted. If it fails, the system retries with exponentially increasing delays. -4. After a configurable maximum number of attempts, the message is moved to the dead letter queue for manual intervention. +3. If processing succeeds, the message is deleted. +4. If processing fails, the system retries with exponentially increasing delays. +5. After a configurable maximum number of attempts, the message is moved to the dead letter queue for manual intervention. ```mermaid sequenceDiagram @@ -58,13 +108,13 @@ sequenceDiagram participant S as Remote Service H->>DB: Write business data - H->>DB: Write event to outbox table - Note over H,DB: Both writes in the same transaction + H->>DB: Write queued message + Note over H,DB: Both writes happen in the same transaction DB-->>H: COMMIT loop Background processing - R->>DB: Poll for pending events - R->>S: Dispatch event + R->>DB: Poll for pending entries + R->>S: Dispatch work alt Success R->>DB: Delete message else Transient failure @@ -76,12 +126,86 @@ sequenceDiagram end ``` -Because the event message and your business data share the same database transaction, you get two fundamental guarantees: +Because the queued message and your business data share the same database transaction, you get two core guarantees: + +- **No phantom events** — if the transaction rolls back, no message is sent. +- **No lost events** — if the transaction commits, the queued work is persisted and processed eventually. + +There is at most one active processor per service and tenant at a given time. +That is important for understanding ordering and duplicate prevention. + +## Direct vs Queued Calls + +A queued call changes _when_ work happens and _what the caller can expect back_. +That difference is easier to understand when seen side by side. + +```mermaid +sequenceDiagram + participant App + participant DB + participant Remote + + rect rgb(255,245,245) + Note over App,Remote: Direct call + App->>Remote: send() + Remote-->>App: result or error + App->>DB: commit business data + end + + rect rgb(245,255,245) + Note over App,Remote: Queued call + App->>DB: write business data + App->>DB: write queued message + DB-->>App: commit + App-->>App: request finished + DB-->>Remote: background dispatch later + end +``` + +> [!warning] Queued calls are asynchronous +> A queued service persists the request and returns after the message is stored, not after the remote operation finishes. +> Any return value from `send()` or `run()` is therefore not available to the caller. + +## End-to-End Example + +The following example ties together queueing, callbacks, and local state updates. +It shows a common pattern: create local business data first, then trigger remote work asynchronously. + +```js +const cds = require('@sap/cds') + +module.exports = class TravelService extends cds.ApplicationService { + async init() { + const flights = await cds.connect.to('FlightService') + const queuedFlights = cds.queued(flights) + + this.after('CREATE', 'Travels', async travel => { + await queuedFlights.send('bookFlight', { + travelId: travel.ID, + customerId: travel.customer_ID + }) + }) + + flights.after('bookFlight/#succeeded', async (_, req) => { + await UPDATE('Travels') + .set({ status: 'Booked' }) + .where({ ID: req.data.travelId }) + }) -- **No phantom events** — if the transaction rolls back, no event is ever sent. -- **No lost events** — if the transaction commits, the event is guaranteed to be processed eventually. + flights.after('bookFlight/#failed', async (err, req) => { + await UPDATE('Travels') + .set({ status: 'BookingFailed' }) + .where({ ID: req.data.travelId }) + req.warn(`Flight booking permanently failed: ${err.message}`) + }) + await super.init() + } +} +``` +This example highlights an important design rule: +use callbacks or persisted status updates for outcomes, not direct return values. ## Use Cases @@ -118,13 +242,21 @@ void notifyFlights(List travels) { ``` ::: +```js +// Anti-pattern: remote side effect happens before local commit is safe +this.after('CREATE', 'Travels', async travel => { + await flights.send('bookFlight', { travelId: travel.ID }) +}) +``` + +If the surrounding transaction later fails, the external booking may already exist although the local travel record was rolled back. + Some services are outboxed automatically, including `cds.MessagingService` and `cds.AuditLogService`. You don't need to call `cds.queued()` or configure anything extra for these — they use the persistent queue by default. [Learn more about auto-outboxed services in Node.js.](../../node.js/queue#per-configuration){.learn-more} [Learn more about the outbox in Java.](../../java/outbox){.learn-more} - ### Inbox { #inbox } The inbox mirrors the outbox pattern for inbound messages. @@ -133,10 +265,21 @@ When a message arrives from a broker like SAP Event Mesh or Apache Kafka, the me This brings two advantages: - **Quick acknowledgment** — the message broker does not have to wait for your processing to complete. This reduces backpressure and prevents consumer group rebalancing under load. -- **Flatten the curve** — if a burst of messages arrives, they're queued in your database and processed at a controlled pace, preventing overload. +- **Flatten the curve** — if a burst of messages arrives, they are queued in your database and processed at a controlled pace. + +```mermaid +flowchart LR + A[Broker message arrives] --> B[Persist in app DB] + B --> C[Acknowledge broker immediately] + C --> D[Process later in app] + D --> E{Success?} + E -->|Yes| F[Done] + E -->|No| G[Retry in app] + G --> H[Dead letter queue in app] +``` > [!note] Especially useful when brokers don't support redelivery -> Some message brokers (for example, SAP Event Mesh) do not allow retriggering delivery or correcting message payloads. +> Some message brokers, for example SAP Event Mesh, do not allow retriggering delivery or correcting message payloads. > With the inbox, failures are handled inside your app via the [dead letter queue](#dead-letter-queue), where you have full control over retry and correction. Enable the inbox in your configuration: @@ -163,15 +306,15 @@ cds: ``` ::: -::: warning Inboxing moves the dead letter queue into your app -With the inbox enabled, all messages are acknowledged to the message broker regardless of whether processing succeeds. -Failures must be managed through the [dead letter queue](#dead-letter-queue). +::: warning Inboxing changes who owns failure handling +With inboxing enabled, the broker considers the message delivered as soon as your app stores it. +If later processing fails, recovery no longer happens in the broker; it happens in your application's retry and dead letter queue flow. ::: - ### Background Tasks { #background-tasks } -Event queues are not limited to messaging. You can schedule arbitrary background tasks such as data replication, cache refresh, or garbage collection. +Event queues are not limited to messaging. +You can schedule arbitrary background tasks such as data replication, cache refresh, or garbage collection. **Example:** Replicate data from a remote service every 10 minutes. @@ -186,7 +329,7 @@ await srv.schedule('replicate', { entity: 'Products' }).every('10 minutes') > The `srv.schedule()` API is currently available in Node.js only. > In Java, use a `@Scheduled` annotation in combination with a queued outbox service to achieve equivalent behavior. -The `schedule` method is a convenience shortcut that internally queues the call using `cds.queued(srv)` and adds timing options: +The `schedule()` method is a convenience shortcut that internally queues the call using `cds.queued(srv)` and adds timing options: ```js // Execute once, as soon as possible @@ -203,13 +346,13 @@ await srv.schedule('replicate', { entity: 'Products' }).every('10 minutes') The [data federation guide](../integration/data-federation) uses `srv.schedule().every()` to implement polling-based replication, fetching incremental updates from remote services on a regular interval. ::: - ### Callbacks (SAGA Patterns) { #callbacks } In distributed transactions, you often need to react when an asynchronous step completes or fails. Event queues support this with `#succeeded` and `#failed` callback events, enabling compensation logic similar to SAGA patterns. -**Example:** After successfully creating a flight booking via the outbox, replicate the full business object from the remote system. If the booking fails, notify the user. +**Example:** After successfully creating a flight booking via the outbox, replicate the full business object from the remote system. +If the booking fails, notify the user or trigger compensation logic. ::: code-group ```js [Node.js] @@ -234,8 +377,6 @@ Callback handlers must be registered for the specific `#succeeded` or `#failed` The `*` wildcard handler is not called for these events. ::: - - ## How to Use { #how-to-use } ### Queueing a Service { #cds-queued } @@ -249,9 +390,9 @@ const srv = await cds.connect.to('RemoteService') const qsrv = cds.queued(srv) // All operations are now queued -await qsrv.emit('someEvent', { key: 'value' }) // fire-and-forget +await qsrv.emit('someEvent', { key: 'value' }) // fire-and-forget await qsrv.send('someRequest', { key: 'value' }) // request (result discarded) -await qsrv.run(SELECT.from('Products')) // query (result discarded) +await qsrv.run(SELECT.from('Products')) // query (result discarded) ``` ::: @@ -274,45 +415,10 @@ queued.emit("someEvent", Map.of("key", "value")); ``` ::: - -### Unqueueing a Service - -If a service is queued by configuration, you can get back the original (synchronous) service: - -::: code-group -```js [Node.js] -const srv = cds.unqueued(qsrv) // back to synchronous -``` -```java [Java] -CqnService original = outbox.unboxed(outboxedService); -``` -::: - - - -### Service API { #service-api } - -When working with event queues, you interact with the standard CAP service APIs: - -| API | Description | -|-----|-------------| -| `srv.emit(event, data)` | Emit a fire-and-forget event message | -| `srv.send(event, data)` | Send a request (return value discarded for queued services) | -| `srv.run(query)` | Run a CQL query (return value discarded for queued services) | -| `srv.schedule(event, data)` | Schedule a task with optional timing — Node.js only | - -The `schedule` method supports a fluent API: - -```js -await srv.schedule('task', data) // execute asap -await srv.schedule('task', data).after('1h') // execute after one hour -await srv.schedule('task', data).every('1h') // repeat every hour -``` - - ### Queueing by Configuration { #by-configuration } -You can queue any service through configuration without changing code: +You can queue any service through configuration without changing code. +That is useful when you want to switch a remote integration to durable asynchronous processing centrally. ::: code-group ```json [Node.js — package.json] @@ -336,39 +442,67 @@ cds: ``` ::: - ### Auto-Outboxed Services { #auto-outboxed } The following services are outboxed by default — you don't need any additional configuration: | Service | Description | |---------|-------------| -| `cds.MessagingService` | All messaging services (Event Mesh, Kafka, etc.) | +| `cds.MessagingService` | All messaging services such as Event Mesh and Kafka | | `cds.AuditLogService` | Audit log events | -This ensures that messaging and audit log events are always sent reliably and never lost due to transaction rollbacks. +This ensures that messaging and audit log events are sent reliably and never lost because of transaction rollbacks. [Learn more about auto-outboxed services in Node.js.](../../node.js/queue#per-configuration){.learn-more} [Learn more about the outbox in Java.](../../java/outbox#persistent){.learn-more} +### Service API { #service-api } + +When working with event queues, you interact with the standard CAP service APIs: + +| API | Description | +|-----|-------------| +| `srv.emit(event, data)` | Emit a fire-and-forget event message | +| `srv.send(event, data)` | Send a request; for queued services the direct return value is discarded | +| `srv.run(query)` | Run a CQL query; for queued services the direct return value is discarded | +| `srv.schedule(event, data)` | Schedule a task with optional timing — Node.js only | + +The `schedule()` method supports a fluent API: + +```js +await srv.schedule('task', data) // execute asap +await srv.schedule('task', data).after('1h') // execute after one hour +await srv.schedule('task', data).every('1h') // repeat every hour +``` + +### Unqueueing a Service + +If a service is queued by configuration, you can get back the original synchronous service: +::: code-group +```js [Node.js] +const srv = cds.unqueued(qsrv) +``` +```java [Java] +CqnService original = outbox.unboxed(outboxedService); +``` +::: -## Characteristics +## Guarantees -### Exactly Once { #exactly-once } +### Transactional Persistence { #no-phantom-events } -The persistent queue guarantees exactly-once processing for database-related operations. -Database changes made during event processing are only committed if — and only if — the event is successfully processed. +Because the queued message is written in the same database transaction as your business data, a rollback also removes the queued message. +No event is ever dispatched for a transaction that did not commit. -To prevent duplicate processing across application instances, there is at most one active processor per service and tenant at any given time. -In the unlikely event of a process crash immediately after successful processing but before the message could be deleted, the message may be processed a second time. Handlers should therefore be idempotent where possible. +### Eventual Processing { #exactly-once } -### No Phantom Events { #no-phantom-events } +The persistent queue guarantees transactional persistence and eventual processing. +For database-backed processing, CAP avoids duplicate execution under normal operation, but handlers should still be idempotent to tolerate rare crash windows or external side effects. -Because the event message is written within the same database transaction as your business data, a rollback of the transaction also removes the event message. -No event is ever dispatched for a transaction that didn't commit. +Database changes made during queued processing are committed only if the event is processed successfully. -### Guaranteed Order { #guaranteed-order } +### Ordering { #guaranteed-order } In Node.js, messages are processed in the order they were submitted, per service and tenant. @@ -387,11 +521,12 @@ cds: ``` ::: +## Operational Behavior ### Error Handling { #errors } When processing fails, the system retries the message with exponentially increasing delays. -After a configurable maximum number of attempts (default: 20 in Node.js, 10 in Java), the message is moved to the dead letter queue. +After a configurable maximum number of attempts, the message is moved to the dead letter queue. Some errors are identified as _unrecoverable_ — for example, when a topic is forbidden in SAP Event Mesh. These messages are immediately moved to the dead letter queue without further retries. @@ -421,13 +556,13 @@ void process(OutboxMessageEventContext context) { } ``` - - ## Dead Letter Queue { #dead-letter-queue } Messages that exceed the maximum retry count remain in the `cds.outbox.Messages` database table with their error information intact. These entries form the _dead letter queue_ and require manual intervention — either to fix the underlying issue and retry, or to discard the message. +For troubleshooting, inspect `cds.outbox.Messages` and pay special attention to `status`, `attempts`, `lastError`, and `lastAttemptTimestamp`. + ### The Data Model Your database model is automatically extended with the following entity: @@ -436,19 +571,18 @@ Your database model is automatically extended with the following entity: namespace cds.outbox; entity Messages { - key ID : UUID; - timestamp : Timestamp; - target : String; - msg : LargeString; - attempts : Integer default 0; - partition : Integer default 0; - lastError : LargeString; + key ID : UUID; + timestamp : Timestamp; + target : String; + msg : LargeString; + attempts : Integer default 0; + partition : Integer default 0; + lastError : LargeString; lastAttemptTimestamp: Timestamp @cds.on.update: $now; - status : String(23); + status : String(23); } ``` - ### Managing Dead Letters You can expose a CDS service to manage the dead letter queue with actions to revive or delete entries. @@ -474,7 +608,8 @@ service OutboxDeadLetterQueueService { ::: ::: warning Restrict access -The dead letter queue contains sensitive data. Ensure the service is accessible only to internal users. +The dead letter queue contains sensitive data. +Ensure the service is accessible only to internal users. ::: #### 2. Filter for Dead Entries @@ -509,7 +644,6 @@ public class DeadOutboxMessagesHandler implements EventHandler { @Before(entity = DeadOutboxMessages_.CDS_NAME) public void addDeadEntryFilter(CdsReadEventContext context) { - // Filter for entries that exceeded maxAttempts Optional outboxFilters = createOutboxFilters(context.getCdsRuntime()); outboxFilters.ifPresent(filter -> { CqnSelect modified = copy(context.getCqn(), new Modifier() { @@ -563,8 +697,6 @@ public void deleteOutboxEntry(DeadOutboxMessagesDeleteContext context) { [Learn more about the dead letter queue in Node.js.](../../node.js/queue#managing-the-dead-letter-queue){.learn-more} [Learn more about the dead letter queue in Java.](../../java/outbox#outbox-dead-letter-queue){.learn-more} - - ## Deferred Principal Propagation { #principal-propagation } When an event is processed asynchronously, the original HTTP request context is no longer available. @@ -573,16 +705,15 @@ CAP handles this as follows: - The **user ID** is stored with the queued message and re-created when the message is processed. - **User roles and attributes** are _not_ stored. Asynchronous processing always runs in privileged mode. -This means handlers for queued events must not rely on role-based authorization checks. -If you need to enforce authorization in queued processing, encode the necessary information in the event payload itself. - - +This means queued handlers must not rely on request-time role checks. +If you need authorization in queued processing, encode the required information in the event payload itself or derive it from persisted business data. ## Configuration ### Persistent Queue (Default) { #persistent-queue } -The persistent queue is enabled by default. Messages are stored in a database table within the current transaction. +The persistent queue is enabled by default. +Messages are stored in a database table within the current transaction. ::: code-group ```json [Node.js — package.json] @@ -618,7 +749,7 @@ Configuration options for Node.js: |--------|---------|-------------| | `maxAttempts` | `20` | Maximum retries before moving to dead letter queue | | `storeLastError` | `true` | Store error information of the last failed attempt | -| `timeout` | `"1h"` | Time after which a `processing` message is considered abandoned and eligible for reprocessing | +| `timeout` | `"1h"` | Time after which a `processing` message is considered abandoned and can be reprocessed | Configuration options for Java: @@ -627,10 +758,10 @@ Configuration options for Java: | `maxAttempts` | `10` | Maximum retries before the entry is considered dead | | `ordered` | `true` | Process entries in submission order | - ### In-Memory Queue -For development and testing, you can use the in-memory queue. Messages are held in memory and emitted after the transaction commits. +For development and testing, you can use the in-memory queue. +Messages are held in memory and emitted after the transaction commits. ::: code-group ```json [Node.js — package.json] @@ -647,10 +778,10 @@ For development and testing, you can use the in-memory queue. Messages are held ::: ::: warning No retry mechanism -With the in-memory queue, messages are lost if processing fails. There is no retry mechanism and no dead letter queue. +With the in-memory queue, messages are lost if processing fails. +There is no retry mechanism and no dead letter queue. ::: - ### Disabling the Queue You can disable event queues globally: @@ -679,12 +810,10 @@ Or disable queueing for a specific service: } ``` - - ## Manual Processing { #flush } After an application restart or crash, pending events in the database are not automatically picked up until a new outbox write occurs for the same service and tenant. -You can trigger reprocessing manually using the `flush` method, for example from a startup hook or admin endpoint: +You can trigger reprocessing manually using the `flush()` method, for example from a startup hook or admin endpoint: ::: code-group ```js [Node.js] @@ -692,3 +821,45 @@ const srv = await cds.connect.to('RemoteService') await cds.queued(srv).flush() ``` ::: + +## Appendix: Simple Diagram Variants + +### When to Use Event Queues + +```mermaid +flowchart TD + A[Need async work?] -->|No| B[Use direct call] + A -->|Yes| C[Must happen after DB commit?] + C -->|Yes| D[Use event queue] + C -->|No| E[Need retries and DLQ?] + E -->|Yes| D + E -->|No| F[Simple async logic may be enough] +``` + +### Simpler Processing Flow + +```mermaid +flowchart LR + A[Request handler] --> B[Write business data] + A --> C[Write queued message] + B --> D[Commit] + C --> D + D --> E[Background runner] + E --> F[Dispatch work] + F --> G{Success?} + G -->|Yes| H[Delete message] + G -->|No| I[Retry] + I --> J[Dead letter queue] +``` + +### Simpler Inbox Flow + +```mermaid +flowchart LR + A[Broker message] --> B[Store in app DB] + B --> C[Acknowledge broker] + C --> D[Process later] + D --> E{Success?} + E -->|Yes| F[Done] + E -->|No| G[Retry / DLQ in app] +```