Skip to content

Commit df6dcf9

Browse files
Igor Savinclaude
andcommitted
fix(core): preserve type through offloading regardless of resolver mode
`offloadMessagePayloadIfNeeded` previously only re-attached the message `type` field on the offloaded payload when `messageTypeResolver` was a `messageTypePath` config. With no resolver, or with `literal` / `resolver` modes, the field was silently stripped. This was inconsistent with the rest of the offloader, which copies identity fields (`messageIdField`, `messageTimestampField`, `messageDeduplicationIdField`, `messageDeduplicationOptionsField`) unconditionally based on their defaulted field names ('id', 'timestamp', ...), and with `MessageSchemaContainer.resolveSchema`, which falls back to a single default schema when no resolver is configured. Symptoms: large messages get offloaded to S3 and the SNS publish succeeds, but any subscription with `FilterPolicyScope: 'MessageBody'` filtering on `type` silently drops them — the body the filter sees no longer contains a `type` field. Production-confirmed against file-storage-service after its 24.x → 25.x bump (lokalise/file-storage-service#1027). Default the type-preservation path to the conventional top-level `type` when the resolver doesn't provide a body path. The literal/resolver modes still drive routing/dispatch as before; this is purely about keeping the field available for downstream filters. Add direct unit tests for the offload behavior across all four resolver configurations. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 15ab7f9 commit df6dcf9

2 files changed

Lines changed: 175 additions & 7 deletions

File tree

packages/core/lib/queues/AbstractQueueService.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -710,13 +710,21 @@ export abstract class AbstractQueueService<
710710
[this.messageDeduplicationOptionsField]: message[this.messageDeduplicationOptionsField],
711711
}
712712

713-
// Preserve message type field if using messageTypePath resolver (supports nested paths)
714-
if (this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver)) {
715-
const messageTypePath = this.messageTypeResolver.messageTypePath
716-
const typeValue = getProperty(message, messageTypePath)
717-
if (typeValue !== undefined) {
718-
setProperty(result, messageTypePath, typeValue)
719-
}
713+
// Preserve the message type field through offloading. We default to the conventional
714+
// top-level `type` path so that routing/identity fields are handled consistently with
715+
// `messageIdField`/`messageTimestampField`/etc., which have defaulted names ('id',
716+
// 'timestamp', ...) and are always copied across when present. Without this fallback,
717+
// `messageTypeResolver` modes that don't specify a body path (no resolver, `literal`,
718+
// or `resolver`) silently strip `type` from the offloaded SNS body, which then breaks
719+
// any downstream subscription whose FilterPolicy filters on `type`
720+
// (FilterPolicyScope: 'MessageBody').
721+
const typePath =
722+
this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver)
723+
? this.messageTypeResolver.messageTypePath
724+
: 'type'
725+
const typeValue = getProperty(message, typePath)
726+
if (typeValue !== undefined) {
727+
setProperty(result, typePath, typeValue)
720728
}
721729

722730
return result
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/**
2+
* Regression tests for `AbstractQueueService.offloadMessagePayloadIfNeeded`.
3+
*
4+
* Identity fields (`messageIdField`, `messageTimestampField`, `messageDeduplicationIdField`,
5+
* `messageDeduplicationOptionsField`) all have defaulted names ('id', 'timestamp', ...) and
6+
* are unconditionally copied to the offloaded payload when present on the source message.
7+
* The `type` field — which downstream subscriptions rely on for routing/filtering, e.g. SNS
8+
* `FilterPolicyScope: 'MessageBody'` — must be handled the same way: present-on-source ⇒
9+
* present-on-offloaded, regardless of which `messageTypeResolver` mode (or absence) is
10+
* configured. Otherwise large messages get silently dropped by SNS subscription filters.
11+
*/
12+
import type { CommonLogger, ErrorReporter } from '@lokalise/node-core'
13+
import type { Either } from '@lokalise/node-core'
14+
import { describe, expect, it } from 'vitest'
15+
import type { ZodSchema } from 'zod/v4'
16+
import type { MessageInvalidFormatError, MessageValidationError } from '../../lib/errors/Errors.ts'
17+
import type { OffloadedPayloadPointerPayload } from '../../lib/payload-store/offloadedPayloadMessageSchemas.ts'
18+
import type { PayloadStore } from '../../lib/payload-store/payloadStoreTypes.ts'
19+
import {
20+
AbstractQueueService,
21+
type ResolvedMessage,
22+
} from '../../lib/queues/AbstractQueueService.ts'
23+
import type { MessageTypeResolverConfig } from '../../lib/queues/MessageTypeResolver.ts'
24+
import type { QueueDependencies } from '../../lib/types/queueOptionsTypes.ts'
25+
26+
type TestMessage = { type?: string; id: string; timestamp: string; payload: unknown }
27+
28+
class TestQueueService extends AbstractQueueService<
29+
TestMessage,
30+
TestMessage,
31+
QueueDependencies,
32+
Record<string, never>
33+
> {
34+
protected resolveSchema(): Either<Error, ZodSchema<TestMessage>> {
35+
throw new Error('not used in this test')
36+
}
37+
protected resolveMessage(): Either<
38+
MessageInvalidFormatError | MessageValidationError,
39+
ResolvedMessage
40+
> {
41+
throw new Error('not used in this test')
42+
}
43+
protected resolveNextFunction(): () => void {
44+
throw new Error('not used in this test')
45+
}
46+
protected processPrehandlers(): Promise<undefined> {
47+
throw new Error('not used in this test')
48+
}
49+
protected preHandlerBarrier<BarrierOutput>(): Promise<{
50+
isPassing: boolean
51+
output?: BarrierOutput
52+
}> {
53+
throw new Error('not used in this test')
54+
}
55+
processMessage(): Promise<Either<'retryLater', 'success'>> {
56+
throw new Error('not used in this test')
57+
}
58+
public close(): Promise<unknown> {
59+
return Promise.resolve()
60+
}
61+
62+
// Expose protected method for direct testing.
63+
public callOffload(message: TestMessage, sizeFn: () => number) {
64+
return this.offloadMessagePayloadIfNeeded(message, sizeFn)
65+
}
66+
}
67+
68+
const noopLogger: CommonLogger = {
69+
level: 'silent',
70+
fatal: () => undefined,
71+
error: () => undefined,
72+
warn: () => undefined,
73+
info: () => undefined,
74+
debug: () => undefined,
75+
trace: () => undefined,
76+
silent: () => undefined,
77+
child: () => noopLogger,
78+
} as unknown as CommonLogger
79+
80+
const noopReporter: ErrorReporter = { report: () => undefined }
81+
82+
const recordingStore = (storedKey: string): PayloadStore => ({
83+
storePayload: () => Promise.resolve(storedKey),
84+
retrievePayload: () => Promise.resolve(null),
85+
})
86+
87+
const buildService = (messageTypeResolver?: MessageTypeResolverConfig) =>
88+
new TestQueueService(
89+
{ errorReporter: noopReporter, logger: noopLogger },
90+
{
91+
messageTypeResolver,
92+
// any threshold; we'll always force size > threshold via sizeFn
93+
payloadStoreConfig: {
94+
messageSizeThreshold: 1,
95+
store: recordingStore('payload-id-1'),
96+
storeName: 's3',
97+
},
98+
},
99+
)
100+
101+
const baseMessage: TestMessage = {
102+
id: 'msg-1',
103+
timestamp: '2026-01-01T00:00:00.000Z',
104+
type: 'order.created',
105+
payload: { large: 'data' },
106+
}
107+
108+
describe('AbstractQueueService.offloadMessagePayloadIfNeeded — `type` preservation', () => {
109+
it('preserves `type` when no messageTypeResolver is configured', async () => {
110+
const svc = buildService(undefined)
111+
const result = (await svc.callOffload(baseMessage, () => 9999)) as OffloadedPayloadPointerPayload
112+
expect((result as unknown as TestMessage).type).toBe('order.created')
113+
expect(result.payloadRef?.id).toBe('payload-id-1')
114+
expect(result.id).toBe('msg-1')
115+
expect(result.timestamp).toBe('2026-01-01T00:00:00.000Z')
116+
})
117+
118+
it('preserves `type` when messageTypeResolver is `literal` mode', async () => {
119+
const svc = buildService({ literal: 'order.created' })
120+
const result = (await svc.callOffload(baseMessage, () => 9999)) as OffloadedPayloadPointerPayload
121+
expect((result as unknown as TestMessage).type).toBe('order.created')
122+
})
123+
124+
it('preserves `type` when messageTypeResolver is custom `resolver` mode', async () => {
125+
const svc = buildService({ resolver: () => 'order.created' })
126+
const result = (await svc.callOffload(baseMessage, () => 9999)) as OffloadedPayloadPointerPayload
127+
expect((result as unknown as TestMessage).type).toBe('order.created')
128+
})
129+
130+
it('preserves `type` at the configured path when messageTypeResolver is `messageTypePath`', async () => {
131+
const svc = buildService({ messageTypePath: 'type' })
132+
const result = (await svc.callOffload(baseMessage, () => 9999)) as OffloadedPayloadPointerPayload
133+
expect((result as unknown as TestMessage).type).toBe('order.created')
134+
})
135+
136+
it('preserves `type` at a non-default nested path when configured via `messageTypePath`', async () => {
137+
const svc = buildService({ messageTypePath: 'metadata.eventName' })
138+
const message = {
139+
id: 'msg-2',
140+
timestamp: '2026-01-01T00:00:00.000Z',
141+
payload: 'p',
142+
metadata: { eventName: 'shipment.dispatched' },
143+
} as unknown as TestMessage
144+
const result = (await svc.callOffload(message, () => 9999)) as OffloadedPayloadPointerPayload
145+
expect((result as unknown as { metadata: { eventName: string } }).metadata.eventName).toBe(
146+
'shipment.dispatched',
147+
)
148+
})
149+
150+
it('does not invent a `type` when the source message has none', async () => {
151+
const svc = buildService(undefined)
152+
const message: TestMessage = {
153+
id: 'msg-3',
154+
timestamp: '2026-01-01T00:00:00.000Z',
155+
payload: 'p',
156+
}
157+
const result = (await svc.callOffload(message, () => 9999)) as OffloadedPayloadPointerPayload
158+
expect((result as unknown as TestMessage).type).toBeUndefined()
159+
})
160+
})

0 commit comments

Comments
 (0)