From 62332dbea5123ad27b39a221c54772aa0e74b56c Mon Sep 17 00:00:00 2001 From: D050513 Date: Mon, 26 May 2025 22:43:06 +0200 Subject: [PATCH 1/7] review --- node.js/queue.md | 72 +++++++++++++++++++++++------------------------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/node.js/queue.md b/node.js/queue.md index 172530a555..502c312570 100644 --- a/node.js/queue.md +++ b/node.js/queue.md @@ -1,11 +1,10 @@ --- synopsis: > - Learn details about the task-queue feature. -# layout: node-js + Learn details about the task queue feature. status: released --- -# Queuing with `cds.queued` +# Queueing with `cds.queued` [[toc]] @@ -20,7 +19,7 @@ This prevents accidental execution of remote calls in case the transaction is ro Every CAP service can be _queued_, meaning that event dispatching becomes _asynchronous_. -## Queuing a Service +## Queueing a Service ### cds. queued (srv) {.method} @@ -29,10 +28,10 @@ Programmatically, you can get the queued service as follows: ```js const srv = await cds.connect.to('yourService') -const queued = cds.queued(srv) +const qd_srv = cds.queued(srv) -await queued.emit('someEvent', { some: 'message' }) // asynchronous -await queued.send('someEvent', { some: 'message' }) // asynchronous +await qd_srv.emit('someEvent', { some: 'message' }) // asynchronous +await qd_srv.send('someEvent', { some: 'message' }) // asynchronous ``` ::: tip `await` needed @@ -42,10 +41,10 @@ You still need to `await` these operations because they're asynchronous. In case The `cds.queued` function can also be called with optional configuration options. ```js -const queued = cds.queued(srv, { maxAttempts: 5 }) +const qd_srv = cds.queued(srv, { maxAttempts: 5 }) ``` -> The persistent queue can only be used if it's not disabled globally by `cds.requires.queue = false` because it requires a dedicated database table. +> The persistent queue can only be used if it is not disabled globally via `cds.requires.queue = false`, as it requires a dedicated database table. ::: warning One-time configuration Once you queued a service, you cannot override its configuration options again. @@ -57,7 +56,7 @@ Once you queued a service, you cannot override its configuration options again. Use this on a queued service to get back to the original service: ```js -const unqueued = cds.unqueued(srv) +const srv = cds.unqueued(qd_srv) ``` This is useful if your service is outboxed (that is, queued) per configuration. @@ -81,14 +80,14 @@ You can configure the outbox behavior by specifying the `outboxed` option in you } ``` -For transactional safety, you're encouraged to use the [persistent queue](#persistent-queue) which is enabled by default. +For transactional safety, you're encouraged to use the [persistent queue](#persistent-queue), which is enabled by default. ## Persistent Queue (Default) {#persistent-queue} The persistent queue is enabled by default. -You can disable it globally with: +You can disable it globally via: ```json { @@ -100,7 +99,7 @@ You can disable it globally with: Using the persistent queue, the to-be-emitted message is stored in a database table within the current transaction, therefore transactional consistency is guaranteed. -::: details You can use the following configuration options: +::: details You can use the following configuration options (listed with their respective default value): ```json { @@ -108,11 +107,11 @@ Using the persistent queue, the to-be-emitted message is stored in a database ta "queue": { "kind": "persistent-queue", "maxAttempts": 20, + "parallel": true, "chunkSize": 10, "storeLastError": true, - "parallel": true, - "timeout": "1h", - "legacyLocking": true + "legacyLocking": true, + "timeout": "1h" } } } @@ -120,24 +119,24 @@ Using the persistent queue, the to-be-emitted message is stored in a database ta The optional parameters are: -- `maxAttempts` (default `20`): The number of unsuccessful emits until the message is ignored. It will remain in the database table. -- `chunkSize` (default `10`): The number of messages which are read from the database table in one go. Only applies for `parallel != false`. -- `storeLastError` (default `true`): Specifies if error information of the last failed emit is stored in the tasks table. -- `parallel` (default `true`): Specifies if messages are sent in parallel (faster but the order isn't guaranteed). -- `timeout` (default `"1h"`): The time after which a message with `status = "processing"` can be processed again. Only for `legacyLocking = false`. -- `legacyLocking` (default `true`): If set to `false`, database locks are only used to set the status of the message to `processing` to prevent long-kept database locks. This is recommended but incompatible for parallel usage with `@sap/cds^8` instances. +- `maxAttempts` (default `20`): The number of unsuccessful emits until the message is considered unprocessable. The message will remain in the database table! +- `parallel` (default `true`): Specifies if messages are sent in parallel (faster, but the order isn't guaranteed). +- `chunkSize` (default `10`): The number of messages that are read from the database table in one go. Only applies for `parallel !== false`. +- `storeLastError` (default `true`): Specifies whether error information of the last failed emit is stored in the tasks table. +- `legacyLocking` (default `true`): If set to `false`, database locks are only used to set the status of the message to `processing` to prevent long-kept database locks. Although this is the recommended approach, it is incompatible with task runners still on `@sap/cds^8`. +- `timeout` (default `"1h"`): The time after which a message with `status === "processing"` is considered to be abandoned and eligable to be processed again. Only for `legacyLocking === false`. ::: Once the transaction succeeds, the messages are read from the database table and dispatched. -If it was successful, the respective message is deleted from the database table. -If not, the system retries the message after exponentially increasing delays. -After a maximum number of attempts, the message is ignored for processing and remains in the database table which +If processing was successful, the respective message is deleted from the database table. +If processing failed, the system retries the message after exponentially increasing delays. +After a maximum number of attempts, the message is ignored for processing and remains in the database, which therefore also acts as a dead letter queue. See [Managing the Dead Letter Queue](#managing-the-dead-letter-queue), to learn about how to handle such messages. -There's only one active message processor per service, tenant, app instance, and message. -This ensures that no duplicate emits happen, except in the unlikely case of an app crash right after the emit and before the message is deleted. +There is only one active message processor per service, tenant, app instance, and message. +This ensures that no duplicate emits happen, except in the highly unlikely case of an app crash right after successful processing but before the message could be deleted. ::: tip Unrecoverable errors Some errors during the emit are identified as unrecoverable, for example in [SAP Event Mesh](../guides/messaging/event-mesh) if the used topic is forbidden. @@ -165,16 +164,13 @@ entity Messages { } ``` -In your CDS model, you can refer to the entity `cds.outbox.Messages` using the path `@sap/cds/srv/outbox`, -for example to expose it in a service. +In your CDS model, you can refer to the entity `cds.outbox.Messages` using the path `@sap/cds/srv/outbox`, for example to expose it in a service (cf. [Managing the Dead Letter Queue](#managing-the-dead-letter-queue)). #### Known Limitations - If the app crashes, another emit for the respective tenant and service is necessary to restart the message processing. It can be triggered manually using the `flush` method. -- The service that handles the queued event must not use user roles and attributes as they are not stored. However, the user ID is stored to re-create the correct context. - -### Disable Persistent Queue +- The service that handles the queued event must not rely on user roles and attributes, as they are not stored with the message. In other words, asynchroneous task are always processed in a priviledged mode. However, the user ID is stored to re-create the correct context. ### Managing the Dead Letter Queue @@ -220,7 +216,9 @@ Finally, entries in the dead letter queue can either be _revived_ by resetting t <<< ./assets/dead-letter-queue-2.js#snippet{10-12,14-16} [srv/outbox-dead-letter-queue-service.js] ::: -### Additional APIs +### Additional APIs + +TODO: are these not the same for the in-memory queue? To manually trigger the message processing, for example if your server is restarted, you can use the `flush` method. @@ -291,7 +289,7 @@ To disable deferred emitting for a particular service, you can set the `outboxed ## Troubleshooting -### Delete Entries in the Tasks Table +### Delete Entries in the Messages Table To manually delete entries in the table `cds.outbox.Messages`, you can either expose it in a service, see [Managing the Dead Letter Queue](#managing-the-dead-letter-queue), or programmatically modify it using the `cds.outbox.Messages` @@ -302,11 +300,11 @@ const db = await cds.connect.to('db') await DELETE.from('cds.outbox.Messages') ``` -### Tasks Table Not Found +### Messages Table Not Found -If the tasks table is not found on the database, this can be caused by insufficient configuration data in _package.json_. +If the messages table is not found on the database, this can be caused by insufficient configuration data in _package.json_. -In case you have overwritten `requires.db.model` there, make sure to add the queue model path `@sap/cds/srv/outbox`: +In case you have overwritten `requires.db.model` there, make sure to add the outbox model path `@sap/cds/srv/outbox`: ```jsonc "requires": { From 9224c9cd6868f441f8b3a3f1596f4c8fabc7e452 Mon Sep 17 00:00:00 2001 From: D050513 Date: Mon, 26 May 2025 22:47:46 +0200 Subject: [PATCH 2/7] synonym --- node.js/queue.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/node.js/queue.md b/node.js/queue.md index 502c312570..7237adac3b 100644 --- a/node.js/queue.md +++ b/node.js/queue.md @@ -50,6 +50,8 @@ const qd_srv = cds.queued(srv, { maxAttempts: 5 }) Once you queued a service, you cannot override its configuration options again. ::: +For backwards compatibility, `cds.outboxed(srv)` works as a synonym. + ### cds. unqueued (srv) {.method} @@ -61,6 +63,8 @@ const srv = cds.unqueued(qd_srv) This is useful if your service is outboxed (that is, queued) per configuration. +For backwards compatibility, `cds.unboxed(srv)` works as a synonym. + ### Per Configuration From 7148bef18663669229b9fbb5ed2d67dc8fafa618 Mon Sep 17 00:00:00 2001 From: D050513 Date: Mon, 26 May 2025 22:56:36 +0200 Subject: [PATCH 3/7] move disable to immediate emit --- node.js/queue.md | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/node.js/queue.md b/node.js/queue.md index 7237adac3b..2d339a9fc3 100644 --- a/node.js/queue.md +++ b/node.js/queue.md @@ -89,17 +89,7 @@ For transactional safety, you're encouraged to use the [persistent queue](#persi ## Persistent Queue (Default) {#persistent-queue} -The persistent queue is enabled by default. - -You can disable it globally via: - -```json -{ - "requires": { - "queue": false - } -} -``` +The persistent queue is the default configuration. Using the persistent queue, the to-be-emitted message is stored in a database table within the current transaction, therefore transactional consistency is guaranteed. @@ -266,11 +256,14 @@ You can enable the in-memory queue globally with: } } ``` + Messages are emitted only after the current transaction is successfully committed. Until then, messages are only kept in memory. This is similar to the following code if done manually: + ```js cds.context.on('succeeded', () => this.emit(msg)) ``` + ::: warning No retry mechanism The message is lost if the emit fails. There's no retry mechanism. ::: @@ -278,7 +271,17 @@ The message is lost if the emit fails. There's no retry mechanism. ## Immediate Emit -To disable deferred emitting for a particular service, you can set the `outboxed` option of your service to `false`: +Queueing can be disabled globally via: + +```json +{ + "requires": { + "queue": false + } +} +``` + +To disable deferred emitting for a particular service only, you can set the `outboxed` option of that service to `false`: ```json { From 708c7ca1d1d8582d20c73e13815382ca4993b828 Mon Sep 17 00:00:00 2001 From: "Dr. David A. Kunz" Date: Tue, 27 May 2025 14:21:01 +0200 Subject: [PATCH 4/7] added .schedule --- node.js/queue.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/node.js/queue.md b/node.js/queue.md index 2d339a9fc3..ec7340cbb3 100644 --- a/node.js/queue.md +++ b/node.js/queue.md @@ -212,7 +212,14 @@ Finally, entries in the dead letter queue can either be _revived_ by resetting t ### Additional APIs -TODO: are these not the same for the in-memory queue? +You can use the `schedule` method as a shortcut for `cds.queued(srv).send()`, with optional scheduling options `after` and `every`: + +```js +await srv.schedule('someEvent', { some: 'message' }) +await srv.schedule('someEvent', { some: 'message' }).after('1h') // after one hour +await srv.schedule('someEvent', { some: 'message' }).every('1h') // every hour after each processing +``` + To manually trigger the message processing, for example if your server is restarted, you can use the `flush` method. From 9b0931903e3c3e5c58594f55cdcc4cf848057880 Mon Sep 17 00:00:00 2001 From: D050513 Date: Tue, 27 May 2025 22:39:41 +0200 Subject: [PATCH 5/7] tsx --- node.js/queue.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/node.js/queue.md b/node.js/queue.md index ec7340cbb3..e479586db8 100644 --- a/node.js/queue.md +++ b/node.js/queue.md @@ -24,6 +24,10 @@ Every CAP service can be _queued_, meaning that event dispatching becomes _async ### cds. queued (srv) {.method} +```tsx +function cds.queued ( srv: Service, options? ) => QueuedService +``` + Programmatically, you can get the queued service as follows: ```js @@ -55,6 +59,10 @@ For backwards compatibility, `cds.outboxed(srv)` works as a synonym. ### cds. unqueued (srv) {.method} +```tsx +function cds.unqueued ( srv: QueuedService ) => Service +``` + Use this on a queued service to get back to the original service: ```js From 6f357f40cf75790205a6cbd136329e94f7877172 Mon Sep 17 00:00:00 2001 From: D050513 Date: Tue, 27 May 2025 22:50:35 +0200 Subject: [PATCH 6/7] move disable and use config --- node.js/queue.md | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/node.js/queue.md b/node.js/queue.md index e479586db8..60af78c913 100644 --- a/node.js/queue.md +++ b/node.js/queue.md @@ -9,6 +9,7 @@ status: released [[toc]] + ## Overview The _task queue_ feature allows you to defer event processing. @@ -16,7 +17,11 @@ The _task queue_ feature allows you to defer event processing. A common use case is the outbox pattern, where remote operations are deferred until the main transaction has been successfully committed. This prevents accidental execution of remote calls in case the transaction is rolled back. -Every CAP service can be _queued_, meaning that event dispatching becomes _asynchronous_. +Every non-database service can be _queued_, meaning that event dispatching becomes _asynchronous_. + +::: tip +The _task queue_ feature can be disabled globally via cds.requires.queue = false. +::: ## Queueing a Service @@ -95,13 +100,14 @@ You can configure the outbox behavior by specifying the `outboxed` option in you For transactional safety, you're encouraged to use the [persistent queue](#persistent-queue), which is enabled by default. + ## Persistent Queue (Default) {#persistent-queue} The persistent queue is the default configuration. Using the persistent queue, the to-be-emitted message is stored in a database table within the current transaction, therefore transactional consistency is guaranteed. -::: details You can use the following configuration options (listed with their respective default value): +::: details You can use the following configuration options: ```json { @@ -169,11 +175,12 @@ entity Messages { In your CDS model, you can refer to the entity `cds.outbox.Messages` using the path `@sap/cds/srv/outbox`, for example to expose it in a service (cf. [Managing the Dead Letter Queue](#managing-the-dead-letter-queue)). -#### Known Limitations +### Known Limitations - If the app crashes, another emit for the respective tenant and service is necessary to restart the message processing. It can be triggered manually using the `flush` method. - The service that handles the queued event must not rely on user roles and attributes, as they are not stored with the message. In other words, asynchroneous task are always processed in a priviledged mode. However, the user ID is stored to re-create the correct context. + ### Managing the Dead Letter Queue You can manage the dead letter queue by implementing a service that exposes a read-only projection on entity `cds.outbox.Messages` as well as bound actions to either revive or delete the respective message. @@ -218,6 +225,7 @@ Finally, entries in the dead letter queue can either be _revived_ by resetting t <<< ./assets/dead-letter-queue-2.js#snippet{10-12,14-16} [srv/outbox-dead-letter-queue-service.js] ::: + ### Additional APIs You can use the `schedule` method as a shortcut for `cds.queued(srv).send()`, with optional scheduling options `after` and `every`: @@ -258,6 +266,8 @@ srv.after('someEvent/#failed', (data, req) => { Event handlers have to be registered for these specific events. The `*` wildcard handler is not called for these. ::: + + ## In-Memory Queue You can enable the in-memory queue globally with: @@ -284,17 +294,8 @@ The message is lost if the emit fails. There's no retry mechanism. ::: -## Immediate Emit - -Queueing can be disabled globally via: -```json -{ - "requires": { - "queue": false - } -} -``` +## Immediate Emit To disable deferred emitting for a particular service only, you can set the `outboxed` option of that service to `false`: @@ -309,8 +310,11 @@ To disable deferred emitting for a particular service only, you can set the `out } ``` + + ## Troubleshooting + ### Delete Entries in the Messages Table To manually delete entries in the table `cds.outbox.Messages`, you can either @@ -322,6 +326,7 @@ const db = await cds.connect.to('db') await DELETE.from('cds.outbox.Messages') ``` + ### Messages Table Not Found If the messages table is not found on the database, this can be caused by insufficient configuration data in _package.json_. From 8e0c8d9aa1cae971e94731eec8e2e24e0cdfcffd Mon Sep 17 00:00:00 2001 From: D050513 Date: Tue, 27 May 2025 22:51:31 +0200 Subject: [PATCH 7/7] "Every non-database CAP service" --- node.js/queue.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node.js/queue.md b/node.js/queue.md index 60af78c913..05de8daa8e 100644 --- a/node.js/queue.md +++ b/node.js/queue.md @@ -17,7 +17,7 @@ The _task queue_ feature allows you to defer event processing. A common use case is the outbox pattern, where remote operations are deferred until the main transaction has been successfully committed. This prevents accidental execution of remote calls in case the transaction is rolled back. -Every non-database service can be _queued_, meaning that event dispatching becomes _asynchronous_. +Every non-database CAP service can be _queued_, meaning that event dispatching becomes _asynchronous_. ::: tip The _task queue_ feature can be disabled globally via cds.requires.queue = false.