Skip to content

Commit 7eb1043

Browse files
authored
Implement counting (#417)
1 parent d013bc9 commit 7eb1043

4 files changed

Lines changed: 166 additions & 0 deletions

File tree

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,22 @@ const result = await myConsumer.handlerSpy.waitForMessageWithId('1')
651651
expect(result.processingResult).toEqual({ status: 'consumed' })
652652
```
653653

654+
### Counters
655+
656+
For load-testing scenarios where retaining all messages in the buffer may be too memory-intensive, handler spies provide lightweight counters that track how many messages were processed with each status:
657+
658+
```ts
659+
const counts = myConsumer.handlerSpy.counts
660+
// {
661+
// consumed: 150,
662+
// published: 0,
663+
// retryLater: 3,
664+
// error: 1,
665+
// }
666+
```
667+
668+
Counters are incremented for every processed message regardless of `bufferSize`, so you get accurate statistics even when older messages have been evicted from the buffer. Calling `clear()` resets both the buffer and the counters.
669+
654670
## Message Logging
655671

656672
When `logMessages` is enabled, processed messages are logged at the `debug` level with structured metadata. For privacy reasons, the full message payload is **not logged by default** to avoid exposing sensitive data.

packages/core/lib/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ export {
6969
type HandlerSpyParams,
7070
type PublicHandlerSpy,
7171
resolveHandlerSpy,
72+
type SpyResultCounts,
7273
type SpyResultInput,
7374
TYPE_NOT_RESOLVED,
7475
} from './queues/HandlerSpy.ts'

packages/core/lib/queues/HandlerSpy.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ export type HandlerSpyParams = {
3535
messageIdField?: string
3636
}
3737

38+
export type SpyResultCounts = {
39+
consumed: number
40+
published: number
41+
retryLater: number
42+
error: number
43+
}
44+
3845
export type SpyResultInput<MessagePayloadSchemas extends object> = {
3946
message: MessagePayloadSchemas | null
4047
processingResult: MessageProcessingResult
@@ -87,12 +94,14 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {
8794
private readonly messageBuffer: Fifo<SpyResultInput<any>>
8895
private readonly messageIdField: keyof MessagePayloadSchemas
8996
private readonly spyPromises: SpyPromiseMetadata<MessagePayloadSchemas>[]
97+
private _counts: SpyResultCounts
9098

9199
constructor(params: HandlerSpyParams = {}) {
92100
this.messageBuffer = new Fifo(params.bufferSize ?? 100)
93101
// @ts-expect-error
94102
this.messageIdField = params.messageIdField ?? 'id'
95103
this.spyPromises = []
104+
this._counts = { consumed: 0, published: 0, retryLater: 0, error: 0 }
96105
}
97106

98107
private messageMatchesFilter<T extends object>(
@@ -159,8 +168,13 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {
159168
return spyPromise
160169
}
161170

171+
get counts(): SpyResultCounts {
172+
return { ...this._counts }
173+
}
174+
162175
clear() {
163176
this.messageBuffer.clear()
177+
this._counts = { consumed: 0, published: 0, retryLater: 0, error: 0 }
164178
}
165179

166180
getAllReceivedMessages(): SpyResultOutput<MessagePayloadSchemas>[] {
@@ -199,6 +213,8 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {
199213
},
200214
} as SpyResultOutput<MessagePayloadSchemas>)
201215

216+
this._counts[processingResult.processingResult.status]++
217+
202218
const cacheId = `${resolvedMessageId}-${Date.now()}-${(Math.random() + 1)
203219
.toString(36)
204220
.substring(7)}`

packages/core/test/queues/HandlerSpy.spec.ts

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,139 @@ describe('HandlerSpy', () => {
429429
})
430430
})
431431

432+
describe('counts', () => {
433+
it('Starts with all zeroes', () => {
434+
const spy = new HandlerSpy<Message>()
435+
436+
expect(spy.counts).toEqual({
437+
consumed: 0,
438+
published: 0,
439+
retryLater: 0,
440+
error: 0,
441+
})
442+
})
443+
444+
it('Increments consumed counter', () => {
445+
const spy = new HandlerSpy<Message>()
446+
447+
spy.addProcessedMessage(
448+
{ processingResult: { status: 'consumed' }, message: TEST_MESSAGE },
449+
undefined,
450+
'test.type',
451+
)
452+
453+
expect(spy.counts.consumed).toBe(1)
454+
expect(spy.counts.error).toBe(0)
455+
})
456+
457+
it('Increments error counter', () => {
458+
const spy = new HandlerSpy<Message>()
459+
460+
spy.addProcessedMessage(
461+
{ processingResult: { status: 'error', errorReason: 'invalidMessage' }, message: null },
462+
'abc',
463+
TYPE_NOT_RESOLVED,
464+
)
465+
466+
expect(spy.counts.error).toBe(1)
467+
expect(spy.counts.consumed).toBe(0)
468+
})
469+
470+
it('Increments retryLater counter', () => {
471+
const spy = new HandlerSpy<Message>()
472+
473+
spy.addProcessedMessage(
474+
{ processingResult: { status: 'retryLater' }, message: TEST_MESSAGE },
475+
undefined,
476+
'test.type',
477+
)
478+
479+
expect(spy.counts.retryLater).toBe(1)
480+
})
481+
482+
it('Increments published counter', () => {
483+
const spy = new HandlerSpy<Message>()
484+
485+
spy.addProcessedMessage(
486+
{ processingResult: { status: 'published' }, message: TEST_MESSAGE },
487+
undefined,
488+
'test.type',
489+
)
490+
491+
expect(spy.counts.published).toBe(1)
492+
})
493+
494+
it('Tracks multiple statuses correctly', () => {
495+
const spy = new HandlerSpy<Message>()
496+
497+
spy.addProcessedMessage(
498+
{ processingResult: { status: 'consumed' }, message: TEST_MESSAGE },
499+
undefined,
500+
'test.type',
501+
)
502+
spy.addProcessedMessage(
503+
{ processingResult: { status: 'consumed' }, message: TEST_MESSAGE_2 },
504+
undefined,
505+
'test.type',
506+
)
507+
spy.addProcessedMessage(
508+
{ processingResult: { status: 'error', errorReason: 'handlerError' }, message: null },
509+
'err1',
510+
TYPE_NOT_RESOLVED,
511+
)
512+
spy.addProcessedMessage(
513+
{ processingResult: { status: 'retryLater' }, message: TEST_MESSAGE },
514+
undefined,
515+
'test.type',
516+
)
517+
518+
expect(spy.counts).toEqual({
519+
consumed: 2,
520+
published: 0,
521+
retryLater: 1,
522+
error: 1,
523+
})
524+
})
525+
526+
it('Resets counts on clear', () => {
527+
const spy = new HandlerSpy<Message>()
528+
529+
spy.addProcessedMessage(
530+
{ processingResult: { status: 'consumed' }, message: TEST_MESSAGE },
531+
undefined,
532+
'test.type',
533+
)
534+
spy.addProcessedMessage(
535+
{ processingResult: { status: 'error', errorReason: 'invalidMessage' }, message: null },
536+
'abc',
537+
TYPE_NOT_RESOLVED,
538+
)
539+
540+
spy.clear()
541+
542+
expect(spy.counts).toEqual({
543+
consumed: 0,
544+
published: 0,
545+
retryLater: 0,
546+
error: 0,
547+
})
548+
})
549+
550+
it('Returns a copy, not a reference', () => {
551+
const spy = new HandlerSpy<Message>()
552+
553+
const countsBefore = spy.counts
554+
spy.addProcessedMessage(
555+
{ processingResult: { status: 'consumed' }, message: TEST_MESSAGE },
556+
undefined,
557+
'test.type',
558+
)
559+
560+
expect(countsBefore.consumed).toBe(0)
561+
expect(spy.counts.consumed).toBe(1)
562+
})
563+
})
564+
432565
describe('isHandlerSpy', () => {
433566
it('HandlerSpy returns true', () => {
434567
const spy = new HandlerSpy<Message>()

0 commit comments

Comments
 (0)