Skip to content

Commit a77e022

Browse files
sjvansDavid-Kunz
andauthored
cds 9: task queue - review (#1878)
Co-authored-by: Dr. David A. Kunz <david.kunz@sap.com>
1 parent 064d67a commit a77e022

1 file changed

Lines changed: 73 additions & 48 deletions

File tree

node.js/queue.md

Lines changed: 73 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,46 @@
11
---
22
synopsis: >
3-
Learn details about the task-queue feature.
4-
# layout: node-js
3+
Learn details about the task queue feature.
54
status: released
65
---
76

8-
# Queuing with `cds.queued`
7+
# Queueing with `cds.queued`
98

109
[[toc]]
1110

1211

12+
1313
## Overview
1414

1515
The _task queue_ feature allows you to defer event processing.
1616

1717
A common use case is the outbox pattern, where remote operations are deferred until the main transaction has been successfully committed.
1818
This prevents accidental execution of remote calls in case the transaction is rolled back.
1919

20-
Every CAP service can be _queued_, meaning that event dispatching becomes _asynchronous_.
20+
Every non-database CAP service can be _queued_, meaning that event dispatching becomes _asynchronous_.
2121

22+
::: tip
23+
The _task queue_ feature can be disabled globally via <Config>cds.requires.queue = false</Config>.
24+
:::
2225

23-
## Queuing a Service
26+
27+
## Queueing a Service
2428

2529

2630
### cds. queued (srv) {.method}
2731

32+
```tsx
33+
function cds.queued ( srv: Service, options? ) => QueuedService
34+
```
35+
2836
Programmatically, you can get the queued service as follows:
2937

3038
```js
3139
const srv = await cds.connect.to('yourService')
32-
const queued = cds.queued(srv)
40+
const qd_srv = cds.queued(srv)
3341
34-
await queued.emit('someEvent', { some: 'message' }) // asynchronous
35-
await queued.send('someEvent', { some: 'message' }) // asynchronous
42+
await qd_srv.emit('someEvent', { some: 'message' }) // asynchronous
43+
await qd_srv.send('someEvent', { some: 'message' }) // asynchronous
3644
```
3745

3846
::: tip `await` needed
@@ -42,26 +50,34 @@ You still need to `await` these operations because they're asynchronous. In case
4250
The `cds.queued` function can also be called with optional configuration options.
4351

4452
```js
45-
const queued = cds.queued(srv, { maxAttempts: 5 })
53+
const qd_srv = cds.queued(srv, { maxAttempts: 5 })
4654
```
4755

48-
> The persistent queue can only be used if it's not disabled globally by `cds.requires.queue = false` because it requires a dedicated database table.
56+
> The persistent queue can only be used if it is not disabled globally via `cds.requires.queue = false`, as it requires a dedicated database table.
4957

5058
::: warning One-time configuration
5159
Once you queued a service, you cannot override its configuration options again.
5260
:::
5361

62+
For backwards compatibility, `cds.outboxed(srv)` works as a synonym.
63+
5464

5565
### cds. unqueued (srv) {.method}
5666

67+
```tsx
68+
function cds.unqueued ( srv: QueuedService ) => Service
69+
```
70+
5771
Use this on a queued service to get back to the original service:
5872

5973
```js
60-
const unqueued = cds.unqueued(srv)
74+
const srv = cds.unqueued(qd_srv)
6175
```
6276

6377
This is useful if your service is outboxed (that is, queued) per configuration.
6478

79+
For backwards compatibility, `cds.unboxed(srv)` works as a synonym.
80+
6581

6682
### Per Configuration
6783

@@ -81,22 +97,13 @@ You can configure the outbox behavior by specifying the `outboxed` option in you
8197
}
8298
```
8399

84-
For transactional safety, you're encouraged to use the [persistent queue](#persistent-queue) which is enabled by default.
100+
For transactional safety, you're encouraged to use the [persistent queue](#persistent-queue), which is enabled by default.
85101

86102

87-
## Persistent Queue (Default) {#persistent-queue}
88-
89-
The persistent queue is enabled by default.
90103

91-
You can disable it globally with:
104+
## Persistent Queue (Default) {#persistent-queue}
92105

93-
```json
94-
{
95-
"requires": {
96-
"queue": false
97-
}
98-
}
99-
```
106+
The persistent queue is the default configuration.
100107

101108
Using the persistent queue, the to-be-emitted message is stored in a database table within the current transaction, therefore transactional consistency is guaranteed.
102109

@@ -108,36 +115,36 @@ Using the persistent queue, the to-be-emitted message is stored in a database ta
108115
"queue": {
109116
"kind": "persistent-queue",
110117
"maxAttempts": 20,
118+
"parallel": true,
111119
"chunkSize": 10,
112120
"storeLastError": true,
113-
"parallel": true,
114-
"timeout": "1h",
115-
"legacyLocking": true
121+
"legacyLocking": true,
122+
"timeout": "1h"
116123
}
117124
}
118125
}
119126
```
120127

121128
The optional parameters are:
122129

123-
- `maxAttempts` (default `20`): The number of unsuccessful emits until the message is ignored. It will remain in the database table.
124-
- `chunkSize` (default `10`): The number of messages which are read from the database table in one go. Only applies for `parallel != false`.
125-
- `storeLastError` (default `true`): Specifies if error information of the last failed emit is stored in the tasks table.
126-
- `parallel` (default `true`): Specifies if messages are sent in parallel (faster but the order isn't guaranteed).
127-
- `timeout` (default `"1h"`): The time after which a message with `status = "processing"` can be processed again. Only for `legacyLocking = false`.
128-
- `legacyLocking` (default `true`): If set to `false`, database locks are only used to set the status of the message to `processing` to prevent long-kept database locks. This is recommended but incompatible for parallel usage with `@sap/cds^8` instances.
130+
- `maxAttempts` (default `20`): The number of unsuccessful emits until the message is considered unprocessable. The message will remain in the database table!
131+
- `parallel` (default `true`): Specifies if messages are sent in parallel (faster, but the order isn't guaranteed).
132+
- `chunkSize` (default `10`): The number of messages that are read from the database table in one go. Only applies for `parallel !== false`.
133+
- `storeLastError` (default `true`): Specifies whether error information of the last failed emit is stored in the tasks table.
134+
- `legacyLocking` (default `true`): If set to `false`, database locks are only used to set the status of the message to `processing` to prevent long-kept database locks. Although this is the recommended approach, it is incompatible with task runners still on `@sap/cds^8`.
135+
- `timeout` (default `"1h"`): The time after which a message with `status === "processing"` is considered to be abandoned and eligable to be processed again. Only for `legacyLocking === false`.
129136

130137
:::
131138

132139
Once the transaction succeeds, the messages are read from the database table and dispatched.
133-
If it was successful, the respective message is deleted from the database table.
134-
If not, the system retries the message after exponentially increasing delays.
135-
After a maximum number of attempts, the message is ignored for processing and remains in the database table which
140+
If processing was successful, the respective message is deleted from the database table.
141+
If processing failed, the system retries the message after exponentially increasing delays.
142+
After a maximum number of attempts, the message is ignored for processing and remains in the database, which
136143
therefore also acts as a dead letter queue.
137144
See [Managing the Dead Letter Queue](#managing-the-dead-letter-queue), to learn about how to handle such messages.
138145

139-
There's only one active message processor per service, tenant, app instance, and message.
140-
This ensures that no duplicate emits happen, except in the unlikely case of an app crash right after the emit and before the message is deleted.
146+
There is only one active message processor per service, tenant, app instance, and message.
147+
This ensures that no duplicate emits happen, except in the highly unlikely case of an app crash right after successful processing but before the message could be deleted.
141148

142149
::: tip Unrecoverable errors
143150
Some errors during the emit are identified as unrecoverable, for example in [SAP Event Mesh](../guides/messaging/event-mesh) if the used topic is forbidden.
@@ -165,16 +172,14 @@ entity Messages {
165172
}
166173
```
167174

168-
In your CDS model, you can refer to the entity `cds.outbox.Messages` using the path `@sap/cds/srv/outbox`,
169-
for example to expose it in a service.
175+
In your CDS model, you can refer to the entity `cds.outbox.Messages` using the path `@sap/cds/srv/outbox`, for example to expose it in a service (cf. [Managing the Dead Letter Queue](#managing-the-dead-letter-queue)).
170176

171177

172-
#### Known Limitations
178+
### Known Limitations
173179

174180
- If the app crashes, another emit for the respective tenant and service is necessary to restart the message processing. It can be triggered manually using the `flush` method.
175-
- The service that handles the queued event must not use user roles and attributes as they are not stored. However, the user ID is stored to re-create the correct context.
181+
- The service that handles the queued event must not rely on user roles and attributes, as they are not stored with the message. In other words, asynchroneous task are always processed in a priviledged mode. However, the user ID is stored to re-create the correct context.
176182

177-
### Disable Persistent Queue
178183

179184
### Managing the Dead Letter Queue
180185

@@ -220,7 +225,17 @@ Finally, entries in the dead letter queue can either be _revived_ by resetting t
220225
<<< ./assets/dead-letter-queue-2.js#snippet{10-12,14-16} [srv/outbox-dead-letter-queue-service.js]
221226
:::
222227

223-
### Additional APIs <Beta />
228+
229+
### Additional APIs <Alpha />
230+
231+
You can use the `schedule` method as a shortcut for `cds.queued(srv).send()`, with optional scheduling options `after` and `every`:
232+
233+
```js
234+
await srv.schedule('someEvent', { some: 'message' })
235+
await srv.schedule('someEvent', { some: 'message' }).after('1h') // after one hour
236+
await srv.schedule('someEvent', { some: 'message' }).every('1h') // every hour after each processing
237+
```
238+
224239

225240
To manually trigger the message processing, for example if your server is restarted, you can use the `flush` method.
226241

@@ -251,6 +266,8 @@ srv.after('someEvent/#failed', (data, req) => {
251266
Event handlers have to be registered for these specific events. The `*` wildcard handler is not called for these.
252267
:::
253268

269+
270+
254271
## In-Memory Queue
255272

256273
You can enable the in-memory queue globally with:
@@ -264,19 +281,23 @@ You can enable the in-memory queue globally with:
264281
}
265282
}
266283
```
284+
267285
Messages are emitted only after the current transaction is successfully committed. Until then, messages are only kept in memory.
268286
This is similar to the following code if done manually:
287+
269288
```js
270289
cds.context.on('succeeded', () => this.emit(msg))
271290
```
291+
272292
::: warning No retry mechanism
273293
The message is lost if the emit fails. There's no retry mechanism.
274294
:::
275295

276296

297+
277298
## Immediate Emit
278299

279-
To disable deferred emitting for a particular service, you can set the `outboxed` option of your service to `false`:
300+
To disable deferred emitting for a particular service only, you can set the `outboxed` option of that service to `false`:
280301

281302
```json
282303
{
@@ -289,9 +310,12 @@ To disable deferred emitting for a particular service, you can set the `outboxed
289310
}
290311
```
291312

313+
314+
292315
## Troubleshooting
293316

294-
### Delete Entries in the Tasks Table
317+
318+
### Delete Entries in the Messages Table
295319

296320
To manually delete entries in the table `cds.outbox.Messages`, you can either
297321
expose it in a service, see [Managing the Dead Letter Queue](#managing-the-dead-letter-queue), or programmatically modify it using the `cds.outbox.Messages`
@@ -302,11 +326,12 @@ const db = await cds.connect.to('db')
302326
await DELETE.from('cds.outbox.Messages')
303327
```
304328

305-
### Tasks Table Not Found
306329

307-
If the tasks table is not found on the database, this can be caused by insufficient configuration data in _package.json_.
330+
### Messages Table Not Found
331+
332+
If the messages table is not found on the database, this can be caused by insufficient configuration data in _package.json_.
308333

309-
In case you have overwritten `requires.db.model` there, make sure to add the queue model path `@sap/cds/srv/outbox`:
334+
In case you have overwritten `requires.db.model` there, make sure to add the outbox model path `@sap/cds/srv/outbox`:
310335

311336
```jsonc
312337
"requires": {

0 commit comments

Comments
 (0)