Skip to content

Commit 4d463b5

Browse files
authored
fix(core): preserve type through offloading + reject removed messageTypeField (#429)
1 parent 7ad4e5a commit 4d463b5

6 files changed

Lines changed: 273 additions & 7 deletions

File tree

packages/core/lib/queues/AbstractQueueService.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -710,13 +710,21 @@ export abstract class AbstractQueueService<
710710
[this.messageDeduplicationOptionsField]: message[this.messageDeduplicationOptionsField],
711711
}
712712

713-
// Preserve message type field if using messageTypePath resolver (supports nested paths)
714-
if (this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver)) {
715-
const messageTypePath = this.messageTypeResolver.messageTypePath
716-
const typeValue = getProperty(message, messageTypePath)
717-
if (typeValue !== undefined) {
718-
setProperty(result, messageTypePath, typeValue)
719-
}
713+
// Preserve the message type field through offloading. We default to the conventional
714+
// top-level `type` path so that routing/identity fields are handled consistently with
715+
// `messageIdField`/`messageTimestampField`/etc., which have defaulted names ('id',
716+
// 'timestamp', ...) and are always copied across when present. Without this fallback,
717+
// `messageTypeResolver` modes that don't specify a body path (no resolver, `literal`,
718+
// or `resolver`) silently strip `type` from the offloaded SNS body, which then breaks
719+
// any downstream subscription whose FilterPolicy filters on `type`
720+
// (FilterPolicyScope: 'MessageBody').
721+
const typePath =
722+
this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver)
723+
? this.messageTypeResolver.messageTypePath
724+
: 'type'
725+
const typeValue = getProperty(message, typePath)
726+
if (typeValue !== undefined) {
727+
setProperty(result, typePath, typeValue)
720728
}
721729

722730
return result

packages/core/lib/types/queueOptionsTypes.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,21 @@ export type CommonQueueOptions = {
114114
* }
115115
*/
116116
messageTypeResolver?: MessageTypeResolverConfig
117+
/**
118+
* @deprecated Removed in core 25.x. Use `messageTypeResolver: { messageTypePath: '<field>' }`
119+
* (or `{ literal: '<type>' }` / `{ resolver: fn }`) instead. See UPGRADING.md.
120+
*
121+
* Typed as a removal-marker string literal so callers passing the legacy option get a
122+
* compile-time error whose diagnostic shows the migration hint at the call site, even
123+
* across long `Omit`/`&`/generic chains where excess-property check no longer fires
124+
* (the realistic regression: `messageTypeField: '<name>'` flowing through e.g. the
125+
* `SnsPublisherManager.newPublisherOptions` chain).
126+
*
127+
* Note: passing `messageTypeField: undefined` still type-checks (without
128+
* `exactOptionalPropertyTypes`), but `undefined` is operationally identical to omitting
129+
* the field, so this guard catches the realistic regression (`messageTypeField: '<name>'`).
130+
*/
131+
messageTypeField?: '__REMOVED_USE_messageTypeResolver_INSTEAD__'
117132
messageIdField?: string
118133
messageTimestampField?: string
119134
messageDeduplicationIdField?: string
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/**
2+
* Regression tests for `AbstractQueueService.offloadMessagePayloadIfNeeded`.
3+
*
4+
* Identity fields (`messageIdField`, `messageTimestampField`, `messageDeduplicationIdField`,
5+
* `messageDeduplicationOptionsField`) all have defaulted names ('id', 'timestamp', ...) and
6+
* are unconditionally copied to the offloaded payload when present on the source message.
7+
* The `type` field — which downstream subscriptions rely on for routing/filtering, e.g. SNS
8+
* `FilterPolicyScope: 'MessageBody'` — must be handled the same way: present-on-source ⇒
9+
* present-on-offloaded, regardless of which `messageTypeResolver` mode (or absence) is
10+
* configured. Otherwise large messages get silently dropped by SNS subscription filters.
11+
*/
12+
import type { CommonLogger, Either, ErrorReporter } from '@lokalise/node-core'
13+
import { describe, expect, it } from 'vitest'
14+
import type { ZodSchema } from 'zod/v4'
15+
import type { MessageInvalidFormatError, MessageValidationError } from '../../lib/errors/Errors.ts'
16+
import type { OffloadedPayloadPointerPayload } from '../../lib/payload-store/offloadedPayloadMessageSchemas.ts'
17+
import type { PayloadStore } from '../../lib/payload-store/payloadStoreTypes.ts'
18+
import {
19+
AbstractQueueService,
20+
type ResolvedMessage,
21+
} from '../../lib/queues/AbstractQueueService.ts'
22+
import type { BarrierResult } from '../../lib/queues/HandlerContainer.ts'
23+
import type { MessageTypeResolverConfig } from '../../lib/queues/MessageTypeResolver.ts'
24+
import type { QueueDependencies } from '../../lib/types/queueOptionsTypes.ts'
25+
26+
type TestMessage = { type?: string; id: string; timestamp: string; payload: unknown }
27+
28+
class TestQueueService extends AbstractQueueService<
29+
TestMessage,
30+
TestMessage,
31+
QueueDependencies,
32+
Record<string, never>
33+
> {
34+
protected resolveSchema(): Either<Error, ZodSchema<TestMessage>> {
35+
throw new Error('not used in this test')
36+
}
37+
protected resolveMessage(): Either<
38+
MessageInvalidFormatError | MessageValidationError,
39+
ResolvedMessage
40+
> {
41+
throw new Error('not used in this test')
42+
}
43+
protected resolveNextFunction(): () => void {
44+
throw new Error('not used in this test')
45+
}
46+
protected processPrehandlers(): Promise<undefined> {
47+
throw new Error('not used in this test')
48+
}
49+
protected preHandlerBarrier<BarrierOutput>(): Promise<BarrierResult<BarrierOutput>> {
50+
throw new Error('not used in this test')
51+
}
52+
processMessage(): Promise<Either<'retryLater', 'success'>> {
53+
throw new Error('not used in this test')
54+
}
55+
public close(): Promise<unknown> {
56+
return Promise.resolve()
57+
}
58+
59+
// Expose protected method for direct testing.
60+
public callOffload(message: TestMessage, sizeFn: () => number) {
61+
return this.offloadMessagePayloadIfNeeded(message, sizeFn)
62+
}
63+
}
64+
65+
const noopLogger: CommonLogger = {
66+
level: 'silent',
67+
fatal: () => undefined,
68+
error: () => undefined,
69+
warn: () => undefined,
70+
info: () => undefined,
71+
debug: () => undefined,
72+
trace: () => undefined,
73+
silent: () => undefined,
74+
child: () => noopLogger,
75+
} as unknown as CommonLogger
76+
77+
const noopReporter: ErrorReporter = { report: () => undefined }
78+
79+
const recordingStore = (storedKey: string): PayloadStore => ({
80+
storePayload: () => Promise.resolve(storedKey),
81+
retrievePayload: () => Promise.resolve(null),
82+
})
83+
84+
const buildService = (messageTypeResolver?: MessageTypeResolverConfig) =>
85+
new TestQueueService(
86+
{ errorReporter: noopReporter, logger: noopLogger },
87+
{
88+
messageTypeResolver,
89+
// any threshold; we'll always force size > threshold via sizeFn
90+
payloadStoreConfig: {
91+
messageSizeThreshold: 1,
92+
store: recordingStore('payload-id-1'),
93+
storeName: 's3',
94+
},
95+
},
96+
)
97+
98+
const baseMessage: TestMessage = {
99+
id: 'msg-1',
100+
timestamp: '2026-01-01T00:00:00.000Z',
101+
type: 'order.created',
102+
payload: { large: 'data' },
103+
}
104+
105+
describe('AbstractQueueService.offloadMessagePayloadIfNeeded — `type` preservation', () => {
106+
it('preserves `type` when no messageTypeResolver is configured', async () => {
107+
const svc = buildService(undefined)
108+
const result = (await svc.callOffload(
109+
baseMessage,
110+
() => 9999,
111+
)) as OffloadedPayloadPointerPayload
112+
expect((result as unknown as TestMessage).type).toBe('order.created')
113+
expect(result.payloadRef?.id).toBe('payload-id-1')
114+
expect(result.id).toBe('msg-1')
115+
expect(result.timestamp).toBe('2026-01-01T00:00:00.000Z')
116+
})
117+
118+
it('preserves `type` when messageTypeResolver is `literal` mode', async () => {
119+
const svc = buildService({ literal: 'order.created' })
120+
const result = (await svc.callOffload(
121+
baseMessage,
122+
() => 9999,
123+
)) as OffloadedPayloadPointerPayload
124+
expect((result as unknown as TestMessage).type).toBe('order.created')
125+
})
126+
127+
it('preserves `type` when messageTypeResolver is custom `resolver` mode', async () => {
128+
const svc = buildService({ resolver: () => 'order.created' })
129+
const result = (await svc.callOffload(
130+
baseMessage,
131+
() => 9999,
132+
)) as OffloadedPayloadPointerPayload
133+
expect((result as unknown as TestMessage).type).toBe('order.created')
134+
})
135+
136+
it('preserves `type` at the configured path when messageTypeResolver is `messageTypePath`', async () => {
137+
const svc = buildService({ messageTypePath: 'type' })
138+
const result = (await svc.callOffload(
139+
baseMessage,
140+
() => 9999,
141+
)) as OffloadedPayloadPointerPayload
142+
expect((result as unknown as TestMessage).type).toBe('order.created')
143+
})
144+
145+
it('preserves `type` at a non-default nested path when configured via `messageTypePath`', async () => {
146+
const svc = buildService({ messageTypePath: 'metadata.eventName' })
147+
const message = {
148+
id: 'msg-2',
149+
timestamp: '2026-01-01T00:00:00.000Z',
150+
payload: 'p',
151+
metadata: { eventName: 'shipment.dispatched' },
152+
} as unknown as TestMessage
153+
const result = (await svc.callOffload(message, () => 9999)) as OffloadedPayloadPointerPayload
154+
expect((result as unknown as { metadata: { eventName: string } }).metadata.eventName).toBe(
155+
'shipment.dispatched',
156+
)
157+
})
158+
159+
it('does not invent a `type` when the source message has none', async () => {
160+
const svc = buildService(undefined)
161+
const message: TestMessage = {
162+
id: 'msg-3',
163+
timestamp: '2026-01-01T00:00:00.000Z',
164+
payload: 'p',
165+
}
166+
const result = (await svc.callOffload(message, () => 9999)) as OffloadedPayloadPointerPayload
167+
expect((result as unknown as TestMessage).type).toBeUndefined()
168+
})
169+
})
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Type-level tests for `CommonQueueOptions`. These guard against silent removals of legacy
3+
* options. The lib historically exposed `messageTypeField`, removed in core 25.x in favor
4+
* of `messageTypeResolver`. Callers that still pass the legacy option must get a compile-time
5+
* error rather than silently broken runtime behavior (for example: payload offloading drops
6+
* the message `type` field, then SNS subscriptions with `FilterPolicyScope: 'MessageBody'`
7+
* filter on `type` fail to deliver to SQS).
8+
*/
9+
import { describe, expectTypeOf, it } from 'vitest'
10+
import type { CommonQueueOptions } from '../../lib/types/queueOptionsTypes.ts'
11+
12+
describe('CommonQueueOptions — legacy `messageTypeField`', () => {
13+
it('rejects passing the removed `messageTypeField` option', () => {
14+
expectTypeOf<{ messageTypeField: 'type' }>().not.toMatchTypeOf<CommonQueueOptions>()
15+
})
16+
17+
it('accepts the supported `messageTypeResolver` shape', () => {
18+
expectTypeOf<{
19+
messageTypeResolver: { messageTypePath: 'type' }
20+
}>().toMatchTypeOf<CommonQueueOptions>()
21+
expectTypeOf<{
22+
messageTypeResolver: { literal: 'order.created' }
23+
}>().toMatchTypeOf<CommonQueueOptions>()
24+
expectTypeOf<{
25+
messageTypeResolver: { resolver: () => 'order.created' }
26+
}>().toMatchTypeOf<CommonQueueOptions>()
27+
})
28+
})
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Type-level guard for `SnsPublisherManager`. Reproduces the call shape that file-storage-service
3+
* was using when payload offloading silently broke after the core 24.x → 25.x bump:
4+
*
5+
* ```ts
6+
* new SnsPublisherManager(deps, {
7+
* ...
8+
* newPublisherOptions: {
9+
* messageTypeField: 'type', // <- removed in core 25.x; ignored at runtime, dropped `type` from offloaded SNS body
10+
* ...
11+
* },
12+
* })
13+
* ```
14+
*
15+
* This must now fail at compile time.
16+
*/
17+
import { describe, it } from 'vitest'
18+
import type { SNSPublisherOptions } from '../lib/sns/AbstractSnsPublisher.ts'
19+
20+
type AnyMessage = { type: string }
21+
22+
type NewPublisherOptions = Omit<
23+
SNSPublisherOptions<AnyMessage>,
24+
'messageSchemas' | 'creationConfig' | 'locatorConfig'
25+
>
26+
27+
describe('SnsPublisherManager `newPublisherOptions` — legacy `messageTypeField`', () => {
28+
it('rejects the removed `messageTypeField` option', () => {
29+
const _bad: NewPublisherOptions = {
30+
// @ts-expect-error - `messageTypeField` was removed in core 25.x; use `messageTypeResolver` instead
31+
messageTypeField: 'type',
32+
messageIdField: 'id',
33+
}
34+
})
35+
36+
it('accepts `messageTypeResolver`', () => {
37+
const _ok: NewPublisherOptions = {
38+
messageTypeResolver: { messageTypePath: 'type' },
39+
messageIdField: 'id',
40+
}
41+
})
42+
})

packages/sns/vitest.config.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ export default defineConfig({
99
pool: 'threads',
1010
maxWorkers: 1,
1111
setupFiles: ['test/utils/vitest.setup.ts'],
12+
typecheck: {
13+
enabled: true,
14+
include: ['**/*.types.spec.ts'],
15+
},
1216
coverage: {
1317
provider: 'v8',
1418
include: ['lib/**/*.ts'],

0 commit comments

Comments
 (0)