Skip to content

Commit 9004bb0

Browse files
authored
feat(core): expose payload-offloading flag on handlerSpy (#464)
1 parent 14e9eb8 commit 9004bb0

8 files changed

Lines changed: 67 additions & 28 deletions

File tree

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,23 @@ const result = await myConsumer.handlerSpy.waitForMessageWithId('1')
676676
expect(result.processingResult).toEqual({ status: 'consumed' })
677677
```
678678

679+
When [payload offloading](#payload-offloading) is configured, the publisher's processing result
680+
exposes whether the message was offloaded to the external payload store. This lets you assert the
681+
offloading path was taken without mocking the store:
682+
683+
```ts
684+
// Large message — payload was offloaded
685+
const offloaded = await myPublisher.handlerSpy.waitForMessageWithId('1', 'published')
686+
expect(offloaded.processingResult).toEqual({ status: 'published', offloaded: true })
687+
688+
// Small message — sent inline, `offloaded` is omitted
689+
const inline = await myPublisher.handlerSpy.waitForMessageWithId('2', 'published')
690+
expect(inline.processingResult.offloaded).toBeUndefined()
691+
```
692+
693+
The `offloaded` field is only present (set to `true`) when offloading actually happened, so existing
694+
exact-match assertions such as `toEqual({ status: 'published' })` keep passing for inline messages.
695+
679696
### Counters
680697

681698
For load-testing scenarios where retaining all messages in the buffer may be too memory-intensive, handler spies provide lightweight counters that track how many messages were processed with each status:

packages/core/lib/types/MessageQueueTypes.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,16 @@ export type MessageProcessingResult =
1717
status: 'retryLater'
1818
}
1919
| {
20-
status: 'consumed' | 'published'
20+
status: 'consumed'
2121
skippedAsDuplicate?: boolean
2222
}
23+
| {
24+
status: 'published'
25+
skippedAsDuplicate?: boolean
26+
// Only set (to `true`) when the message payload was offloaded to the external
27+
// payload store; omitted entirely for inline messages.
28+
offloaded?: boolean
29+
}
2330
| {
2431
status: 'error'
2532
errorReason: 'invalidMessage' | 'handlerError' | 'retryLaterExceeded'

packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
type AsyncPublisher,
55
type BarrierResult,
66
DeduplicationRequesterEnum,
7+
isOffloadedPayloadPointerPayload,
78
type MessageInvalidFormatError,
89
type MessageSchemaContainer,
910
type MessageValidationError,
@@ -99,7 +100,9 @@ export abstract class AbstractPubSubPublisher<MessagePayloadType extends object>
99100
await this.sendMessage(maybeOffloadedPayloadMessage, options)
100101
this.handleMessageProcessed({
101102
message: parsedMessage,
102-
processingResult: { status: 'published' },
103+
processingResult: isOffloadedPayloadPointerPayload(maybeOffloadedPayloadMessage)
104+
? { status: 'published', offloaded: true }
105+
: { status: 'published' },
103106
messageProcessingStartTimestamp,
104107
queueName: this.topicName,
105108
})

packages/sns/lib/sns/AbstractSnsPublisher.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
type AsyncPublisher,
77
type BarrierResult,
88
DeduplicationRequesterEnum,
9+
isOffloadedPayloadPointerPayload,
910
type MessageInvalidFormatError,
1011
type MessageSchemaContainer,
1112
type MessageValidationError,
@@ -169,7 +170,9 @@ export abstract class AbstractSnsPublisher<MessagePayloadType extends object>
169170

170171
this.handleMessageProcessed({
171172
message: parsedMessage,
172-
processingResult: { status: 'published' },
173+
processingResult: isOffloadedPayloadPointerPayload(payload)
174+
? { status: 'published', offloaded: true }
175+
: { status: 'published' },
173176
messageProcessingStartTimestamp,
174177
queueName: topicName,
175178
})

packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,10 @@ describe('SnsPermissionPublisher - single-store payload offloading', () => {
118118

119119
await publisher.publish(message)
120120

121-
await expect(
122-
publisher.handlerSpy.waitForMessageWithId('1', 'published'),
123-
).resolves.toBeDefined()
121+
// The handler spy exposes that the message took the offloading path, so tests can
122+
// assert offloading happened without inspecting the store directly.
123+
const spyResult = await publisher.handlerSpy.waitForMessageWithId('1', 'published')
124+
expect(spyResult.processingResult).toEqual({ status: 'published', offloaded: true })
124125
await waitAndRetry(() => receivedSnsMessages.length > 0)
125126

126127
// Check that the published message's body is a pointer to the offloaded payload.

packages/sqs/lib/sqs/AbstractSqsPublisher.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
type AsyncPublisher,
77
type BarrierResult,
88
DeduplicationRequesterEnum,
9+
isOffloadedPayloadPointerPayload,
910
type MessageInvalidFormatError,
1011
type MessageSchemaContainer,
1112
type MessageValidationError,
@@ -160,7 +161,9 @@ export abstract class AbstractSqsPublisher<MessagePayloadType extends object>
160161
await this.sendMessage(payload, resolvedOptions, preBuiltBody)
161162
this.handleMessageProcessed({
162163
message: parsedMessage,
163-
processingResult: { status: 'published' },
164+
processingResult: isOffloadedPayloadPointerPayload(payload)
165+
? { status: 'published', offloaded: true }
166+
: { status: 'published' },
164167
messageProcessingStartTimestamp,
165168
queueName: this.queue.name,
166169
})

packages/sqs/test/publishers/SqsPermissionPublisher.payloadOffloading.spec.ts

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,10 @@ describe('SqsPermissionPublisher - payload offloading', () => {
9898

9999
await publisher.publish(message)
100100

101-
await expect(
102-
publisher.handlerSpy.waitForMessageWithId('1', 'published'),
103-
).resolves.toBeDefined()
101+
// The handler spy exposes that the message took the offloading path, so tests can
102+
// assert offloading happened without inspecting the store directly.
103+
const spyResult = await publisher.handlerSpy.waitForMessageWithId('1', 'published')
104+
expect(spyResult.processingResult).toEqual({ status: 'published', offloaded: true })
104105
await waitAndRetry(() => receivedSqsMessages.length > 0)
105106

106107
// Check that the published message's body is a pointer to the offloaded payload.
@@ -140,5 +141,27 @@ describe('SqsPermissionPublisher - payload offloading', () => {
140141
getObjectContent(s3, s3BucketName, offloadedPayloadPointer),
141142
).resolves.toBeDefined()
142143
})
144+
145+
it('does not flag small messages as offloaded', async () => {
146+
const message = {
147+
id: '2',
148+
messageType: 'add',
149+
} satisfies PERMISSIONS_ADD_MESSAGE_TYPE
150+
expect(JSON.stringify(message).length).toBeLessThan(largeMessageSizeThreshold)
151+
152+
await publisher.publish(message)
153+
154+
const spyResult = await publisher.handlerSpy.waitForMessageWithId('2', 'published')
155+
// `offloaded` is omitted entirely for inline messages, so the exact-match holds.
156+
expect(spyResult.processingResult).toEqual({ status: 'published' })
157+
await waitAndRetry(() => receivedSqsMessages.length > 0)
158+
159+
// Small messages are sent inline, not as an offloaded-payload pointer.
160+
expect(receivedSqsMessages.length).toBe(1)
161+
const parsedReceivedMessageBody = JSON.parse(receivedSqsMessages[0]!.Body!)
162+
expect(parsedReceivedMessageBody.payloadRef).toBeUndefined()
163+
expect(parsedReceivedMessageBody.offloadedPayloadPointer).toBeUndefined()
164+
expect(parsedReceivedMessageBody.id).toBe('2')
165+
})
143166
})
144167
})

pnpm-lock.yaml

Lines changed: 0 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)