Skip to content

Commit 8122f14

Browse files
t-bonkrenejeglinskyBraunMatthias
authored
Provided a better solution for reading dead outbox entries (#1940)
Co-authored-by: René Jeglinsky <rene.jeglinsky@sap.com> Co-authored-by: BraunMatthias <59841349+BraunMatthias@users.noreply.github.com>
1 parent 0c8b08c commit 8122f14

1 file changed

Lines changed: 41 additions & 21 deletions

File tree

java/outbox.md

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ To enable the persistence for the outbox, you need to add the service `outbox` o
4949
}
5050
```
5151

52-
::: warning
52+
::: warning
5353
Be aware that you need to migrate the database schemas of all tenants after you've enhanced your model with an outbox version from `@sap/cds` version 6.0.0 or later.
5454
:::
5555

@@ -173,7 +173,7 @@ To avoid this, you can apply a manual workaround as follows:
173173
1. Customize the outbox configuration and isolating them via distinct namespaces for each service.
174174
2. Adapt the Audit Log outbox configuration.
175175
3. Adapt the messaging outbox configuration per service.
176-
176+
177177
These steps are described in the following sections.
178178

179179
#### Deactivate Default Outboxes
@@ -228,7 +228,7 @@ cds:
228228
::: tip Important Note
229229
It is crucial to **deactivate** the default outboxes, and ensure **unique outbox namespaces** in order to achieve proper isolation between services in a shared DB scenario.
230230
:::
231-
231+
232232
233233
## Outboxing CAP Service Events
234234
@@ -424,29 +424,50 @@ It is crucial to make the service `OutboxDeadLetterQueueService` accessible for
424424

425425
:::
426426

427-
### Filter for Dead Entries
428-
429-
This filtering can't be done on the database since the maximum number of attempts is only available from the CDS properties.
427+
### Reading Dead Entries
430428

431-
To ensure that only dead outbox entries are returned when reading `DeadOutboxMessages`, the following code provides the handler for the `DeadLetterQueueService` and the `@After-READ` handler that filters for the dead outbox entries:
429+
Filtering the dead entries is done by adding an appropriate `where`-clause to all `READ`-queries which matches all outbox message entries that have been retried for the maximum number of times. The following code provides an example handler implementation defining this behavior for the `DeadLetterQueueService`:
432430

433431
```java
434432
@Component
435433
@ServiceName(OutboxDeadLetterQueueService_.CDS_NAME)
436434
public class DeadOutboxMessagesHandler implements EventHandler {
437435
438-
@After(entity = DeadOutboxMessages_.CDS_NAME)
439-
public void filterDeadEntries(CdsReadEventContext context) {
440-
CdsProperties.Outbox outboxConfigs = context.getCdsRuntime().getEnvironment().getCdsProperties().getOutbox();
441-
List<DeadOutboxMessages> deadEntries = context
442-
.getResult()
443-
.listOf(DeadOutboxMessages.class)
444-
.stream()
445-
.filter(entry -> entry.getAttempts() >= outboxConfigs.getService(entry.getTarget()).getMaxAttempts())
446-
.toList();
447-
448-
context.setResult(deadEntries);
449-
}
436+
private final PersistenceService db;
437+
438+
public DeadOutboxMessagesHandler(@Qualifier(PersistenceService.DEFAULT_NAME) PersistenceService db) {
439+
this.db = db;
440+
}
441+
442+
@Before(entity = DeadOutboxMessages_.CDS_NAME)
443+
public void addDeadEntryFilter(CdsReadEventContext context) {
444+
Optional<Predicate> outboxFilters = this.createOutboxFilters(context.getCdsRuntime());
445+
446+
if (outboxFilters.isPresent()) {
447+
CqnSelect modifiedCqn =
448+
copy(
449+
context.getCqn(),
450+
new Modifier() {
451+
@Override
452+
public CqnPredicate where(Predicate where) {
453+
return outboxFilters.get().and(where);
454+
}
455+
});
456+
context.setCqn(modifiedCqn);
457+
}
458+
}
459+
460+
private Optional<Predicate> createOutboxFilters(CdsRuntime runtime) {
461+
CdsProperties.Outbox outboxConfigs = runtime.getEnvironment().getCdsProperties().getOutbox();
462+
463+
return runtime.getServiceCatalog().getServices(OutboxService.class)
464+
.map(service -> {
465+
OutboxServiceConfig config = outboxConfigs.getService(service.getName());
466+
return CQL.get(Messages.TARGET).eq(service.getName())
467+
.and(CQL.get(Messages.ATTEMPTS).ge(config.getMaxAttempts()));
468+
})
469+
.reduce(Predicate::or);
470+
}
450471
}
451472
```
452473

@@ -488,8 +509,7 @@ The injected `PersistenceService` instance is used to perform the operations on
488509
[Learn more about CQL statement inspection.](./working-with-cql/query-introspection#cqnanalyzer){.learn-more}
489510

490511
::: tip Use paging logic
491-
Avoid to read all entries of the `cds.outbox.Messages` or `OutboxDeadLetterQueueService.DeadOutboxMessages` table at once, as the size of an entry is unpredictable
492-
and depends on the size of the payload. Prefer paging logic instead.
512+
Avoid reading all outbox entries at once in case entries which have large request payloads are present. Prefer `READ`-queries with paging instead.
493513
:::
494514

495515
## Observability using Open Telemetry

0 commit comments

Comments
 (0)