Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 73 additions & 48 deletions node.js/queue.md
Original file line number Diff line number Diff line change
@@ -1,38 +1,46 @@
---
synopsis: >
Learn details about the task-queue feature.
# layout: node-js
Learn details about the task queue feature.
status: released
---

# Queuing with `cds.queued`
# Queueing with `cds.queued`

[[toc]]



## Overview

The _task queue_ feature allows you to defer event processing.

A common use case is the outbox pattern, where remote operations are deferred until the main transaction has been successfully committed.
This prevents accidental execution of remote calls in case the transaction is rolled back.

Every CAP service can be _queued_, meaning that event dispatching becomes _asynchronous_.
Every non-database CAP service can be _queued_, meaning that event dispatching becomes _asynchronous_.

::: tip
The _task queue_ feature can be disabled globally via <Config>cds.requires.queue = false</Config>.
:::

## Queuing a Service

## Queueing a Service


### cds. queued (srv) {.method}

```tsx
function cds.queued ( srv: Service, options? ) => QueuedService
```

Programmatically, you can get the queued service as follows:

```js
const srv = await cds.connect.to('yourService')
const queued = cds.queued(srv)
const qd_srv = cds.queued(srv)

await queued.emit('someEvent', { some: 'message' }) // asynchronous
await queued.send('someEvent', { some: 'message' }) // asynchronous
await qd_srv.emit('someEvent', { some: 'message' }) // asynchronous
await qd_srv.send('someEvent', { some: 'message' }) // asynchronous
```

::: tip `await` needed
Expand All @@ -42,26 +50,34 @@ You still need to `await` these operations because they're asynchronous. In case
The `cds.queued` function can also be called with optional configuration options.

```js
const queued = cds.queued(srv, { maxAttempts: 5 })
const qd_srv = cds.queued(srv, { maxAttempts: 5 })
```

> The persistent queue can only be used if it's not disabled globally by `cds.requires.queue = false` because it requires a dedicated database table.
> The persistent queue can only be used if it is not disabled globally via `cds.requires.queue = false`, as it requires a dedicated database table.

::: warning One-time configuration
Once you queued a service, you cannot override its configuration options again.
:::

For backwards compatibility, `cds.outboxed(srv)` works as a synonym.


### cds. unqueued (srv) {.method}

```tsx
function cds.unqueued ( srv: QueuedService ) => Service
```

Use this on a queued service to get back to the original service:

```js
const unqueued = cds.unqueued(srv)
const srv = cds.unqueued(qd_srv)
```

This is useful if your service is outboxed (that is, queued) per configuration.

For backwards compatibility, `cds.unboxed(srv)` works as a synonym.


### Per Configuration

Expand All @@ -81,22 +97,13 @@ You can configure the outbox behavior by specifying the `outboxed` option in you
}
```

For transactional safety, you're encouraged to use the [persistent queue](#persistent-queue) which is enabled by default.
For transactional safety, you're encouraged to use the [persistent queue](#persistent-queue), which is enabled by default.


## Persistent Queue (Default) {#persistent-queue}

The persistent queue is enabled by default.

You can disable it globally with:
## Persistent Queue (Default) {#persistent-queue}

```json
{
"requires": {
"queue": false
}
}
```
The persistent queue is the default configuration.

Using the persistent queue, the to-be-emitted message is stored in a database table within the current transaction, therefore transactional consistency is guaranteed.

Expand All @@ -108,36 +115,36 @@ Using the persistent queue, the to-be-emitted message is stored in a database ta
"queue": {
"kind": "persistent-queue",
"maxAttempts": 20,
"parallel": true,
"chunkSize": 10,
"storeLastError": true,
"parallel": true,
"timeout": "1h",
"legacyLocking": true
"legacyLocking": true,
"timeout": "1h"
}
}
}
```

The optional parameters are:

- `maxAttempts` (default `20`): The number of unsuccessful emits until the message is ignored. It will remain in the database table.
- `chunkSize` (default `10`): The number of messages which are read from the database table in one go. Only applies for `parallel != false`.
- `storeLastError` (default `true`): Specifies if error information of the last failed emit is stored in the tasks table.
- `parallel` (default `true`): Specifies if messages are sent in parallel (faster but the order isn't guaranteed).
- `timeout` (default `"1h"`): The time after which a message with `status = "processing"` can be processed again. Only for `legacyLocking = false`.
- `legacyLocking` (default `true`): If set to `false`, database locks are only used to set the status of the message to `processing` to prevent long-kept database locks. This is recommended but incompatible for parallel usage with `@sap/cds^8` instances.
- `maxAttempts` (default `20`): The number of unsuccessful emits until the message is considered unprocessable. The message will remain in the database table!
- `parallel` (default `true`): Specifies if messages are sent in parallel (faster, but the order isn't guaranteed).
- `chunkSize` (default `10`): The number of messages that are read from the database table in one go. Only applies for `parallel !== false`.
- `storeLastError` (default `true`): Specifies whether error information of the last failed emit is stored in the tasks table.
- `legacyLocking` (default `true`): If set to `false`, database locks are only used to set the status of the message to `processing` to prevent long-kept database locks. Although this is the recommended approach, it is incompatible with task runners still on `@sap/cds^8`.
- `timeout` (default `"1h"`): The time after which a message with `status === "processing"` is considered to be abandoned and eligable to be processed again. Only for `legacyLocking === false`.

:::

Once the transaction succeeds, the messages are read from the database table and dispatched.
If it was successful, the respective message is deleted from the database table.
If not, the system retries the message after exponentially increasing delays.
After a maximum number of attempts, the message is ignored for processing and remains in the database table which
If processing was successful, the respective message is deleted from the database table.
If processing failed, the system retries the message after exponentially increasing delays.
After a maximum number of attempts, the message is ignored for processing and remains in the database, which
therefore also acts as a dead letter queue.
See [Managing the Dead Letter Queue](#managing-the-dead-letter-queue), to learn about how to handle such messages.

There's only one active message processor per service, tenant, app instance, and message.
This ensures that no duplicate emits happen, except in the unlikely case of an app crash right after the emit and before the message is deleted.
There is only one active message processor per service, tenant, app instance, and message.
This ensures that no duplicate emits happen, except in the highly unlikely case of an app crash right after successful processing but before the message could be deleted.

::: tip Unrecoverable errors
Some errors during the emit are identified as unrecoverable, for example in [SAP Event Mesh](../guides/messaging/event-mesh) if the used topic is forbidden.
Expand Down Expand Up @@ -165,16 +172,14 @@ entity Messages {
}
```

In your CDS model, you can refer to the entity `cds.outbox.Messages` using the path `@sap/cds/srv/outbox`,
for example to expose it in a service.
In your CDS model, you can refer to the entity `cds.outbox.Messages` using the path `@sap/cds/srv/outbox`, for example to expose it in a service (cf. [Managing the Dead Letter Queue](#managing-the-dead-letter-queue)).


#### Known Limitations
### Known Limitations

- If the app crashes, another emit for the respective tenant and service is necessary to restart the message processing. It can be triggered manually using the `flush` method.
- The service that handles the queued event must not use user roles and attributes as they are not stored. However, the user ID is stored to re-create the correct context.
- The service that handles the queued event must not rely on user roles and attributes, as they are not stored with the message. In other words, asynchroneous task are always processed in a priviledged mode. However, the user ID is stored to re-create the correct context.

### Disable Persistent Queue

### Managing the Dead Letter Queue

Expand Down Expand Up @@ -220,7 +225,17 @@ Finally, entries in the dead letter queue can either be _revived_ by resetting t
<<< ./assets/dead-letter-queue-2.js#snippet{10-12,14-16} [srv/outbox-dead-letter-queue-service.js]
:::

### Additional APIs <Beta />

### Additional APIs <Alpha />
Comment thread
sjvans marked this conversation as resolved.

You can use the `schedule` method as a shortcut for `cds.queued(srv).send()`, with optional scheduling options `after` and `every`:

```js
await srv.schedule('someEvent', { some: 'message' })
await srv.schedule('someEvent', { some: 'message' }).after('1h') // after one hour
await srv.schedule('someEvent', { some: 'message' }).every('1h') // every hour after each processing
```


To manually trigger the message processing, for example if your server is restarted, you can use the `flush` method.

Expand Down Expand Up @@ -251,6 +266,8 @@ srv.after('someEvent/#failed', (data, req) => {
Event handlers have to be registered for these specific events. The `*` wildcard handler is not called for these.
:::



## In-Memory Queue

You can enable the in-memory queue globally with:
Expand All @@ -264,19 +281,23 @@ You can enable the in-memory queue globally with:
}
}
```

Messages are emitted only after the current transaction is successfully committed. Until then, messages are only kept in memory.
This is similar to the following code if done manually:

```js
cds.context.on('succeeded', () => this.emit(msg))
```

::: warning No retry mechanism
The message is lost if the emit fails. There's no retry mechanism.
:::



## Immediate Emit

To disable deferred emitting for a particular service, you can set the `outboxed` option of your service to `false`:
To disable deferred emitting for a particular service only, you can set the `outboxed` option of that service to `false`:

```json
{
Expand All @@ -289,9 +310,12 @@ To disable deferred emitting for a particular service, you can set the `outboxed
}
```



## Troubleshooting

### Delete Entries in the Tasks Table

### Delete Entries in the Messages Table

To manually delete entries in the table `cds.outbox.Messages`, you can either
expose it in a service, see [Managing the Dead Letter Queue](#managing-the-dead-letter-queue), or programmatically modify it using the `cds.outbox.Messages`
Expand All @@ -302,11 +326,12 @@ const db = await cds.connect.to('db')
await DELETE.from('cds.outbox.Messages')
```

### Tasks Table Not Found

If the tasks table is not found on the database, this can be caused by insufficient configuration data in _package.json_.
### Messages Table Not Found

If the messages table is not found on the database, this can be caused by insufficient configuration data in _package.json_.

In case you have overwritten `requires.db.model` there, make sure to add the queue model path `@sap/cds/srv/outbox`:
In case you have overwritten `requires.db.model` there, make sure to add the outbox model path `@sap/cds/srv/outbox`:

```jsonc
"requires": {
Expand Down