Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -710,13 +710,21 @@ export abstract class AbstractQueueService<
[this.messageDeduplicationOptionsField]: message[this.messageDeduplicationOptionsField],
}

// Preserve message type field if using messageTypePath resolver (supports nested paths)
if (this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver)) {
const messageTypePath = this.messageTypeResolver.messageTypePath
const typeValue = getProperty(message, messageTypePath)
if (typeValue !== undefined) {
setProperty(result, messageTypePath, typeValue)
}
// Preserve the message type field through offloading. We default to the conventional
// top-level `type` path so that routing/identity fields are handled consistently with
// `messageIdField`/`messageTimestampField`/etc., which have defaulted names ('id',
// 'timestamp', ...) and are always copied across when present. Without this fallback,
// `messageTypeResolver` modes that don't specify a body path (no resolver, `literal`,
// or `resolver`) silently strip `type` from the offloaded SNS body, which then breaks
// any downstream subscription whose FilterPolicy filters on `type`
// (FilterPolicyScope: 'MessageBody').
const typePath =
this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver)
? this.messageTypeResolver.messageTypePath
: 'type'
const typeValue = getProperty(message, typePath)
if (typeValue !== undefined) {
setProperty(result, typePath, typeValue)
}

return result
Expand Down
15 changes: 15 additions & 0 deletions packages/core/lib/types/queueOptionsTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ export type CommonQueueOptions = {
* }
*/
messageTypeResolver?: MessageTypeResolverConfig
/**
* @deprecated Removed in core 25.x. Use `messageTypeResolver: { messageTypePath: '<field>' }`
* (or `{ literal: '<type>' }` / `{ resolver: fn }`) instead. See UPGRADING.md.
*
* Typed as a removal-marker string literal so callers passing the legacy option get a
* compile-time error whose diagnostic shows the migration hint, rather than a silent
* runtime drop. Leaving this set on a publisher with payload offloading causes the
* message `type` field to be stripped from the offloaded SNS body, which then silently
* fails any downstream subscription whose FilterPolicy filters on `type`.
*
* Note: passing `messageTypeField: undefined` still type-checks (without
* `exactOptionalPropertyTypes`), but `undefined` is operationally identical to omitting
* the field, so this guard catches the realistic regression (`messageTypeField: '<name>'`).
*/
messageTypeField?: '__REMOVED_USE_messageTypeResolver_INSTEAD__'
messageIdField?: string
messageTimestampField?: string
messageDeduplicationIdField?: string
Expand Down
160 changes: 160 additions & 0 deletions packages/core/test/queues/AbstractQueueService.offload.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/**
* Regression tests for `AbstractQueueService.offloadMessagePayloadIfNeeded`.
*
* Identity fields (`messageIdField`, `messageTimestampField`, `messageDeduplicationIdField`,
* `messageDeduplicationOptionsField`) all have defaulted names ('id', 'timestamp', ...) and
* are unconditionally copied to the offloaded payload when present on the source message.
* The `type` field — which downstream subscriptions rely on for routing/filtering, e.g. SNS
* `FilterPolicyScope: 'MessageBody'` — must be handled the same way: present-on-source ⇒
* present-on-offloaded, regardless of which `messageTypeResolver` mode (or absence) is
* configured. Otherwise large messages get silently dropped by SNS subscription filters.
*/
import type { CommonLogger, ErrorReporter } from '@lokalise/node-core'
import type { Either } from '@lokalise/node-core'
import { describe, expect, it } from 'vitest'
import type { ZodSchema } from 'zod/v4'
import type { MessageInvalidFormatError, MessageValidationError } from '../../lib/errors/Errors.ts'
import type { OffloadedPayloadPointerPayload } from '../../lib/payload-store/offloadedPayloadMessageSchemas.ts'
import type { PayloadStore } from '../../lib/payload-store/payloadStoreTypes.ts'
import {
AbstractQueueService,
type ResolvedMessage,
} from '../../lib/queues/AbstractQueueService.ts'
import type { MessageTypeResolverConfig } from '../../lib/queues/MessageTypeResolver.ts'
import type { QueueDependencies } from '../../lib/types/queueOptionsTypes.ts'

type TestMessage = { type?: string; id: string; timestamp: string; payload: unknown }

class TestQueueService extends AbstractQueueService<
TestMessage,
TestMessage,
QueueDependencies,
Record<string, never>
> {
protected resolveSchema(): Either<Error, ZodSchema<TestMessage>> {
throw new Error('not used in this test')
}
protected resolveMessage(): Either<
MessageInvalidFormatError | MessageValidationError,
ResolvedMessage
> {
throw new Error('not used in this test')
}
protected resolveNextFunction(): () => void {
throw new Error('not used in this test')
}
protected processPrehandlers(): Promise<undefined> {
throw new Error('not used in this test')
}
protected preHandlerBarrier<BarrierOutput>(): Promise<{
isPassing: boolean
output?: BarrierOutput
}> {
throw new Error('not used in this test')
}
processMessage(): Promise<Either<'retryLater', 'success'>> {
throw new Error('not used in this test')
}
public close(): Promise<unknown> {
return Promise.resolve()
}

// Expose protected method for direct testing.
public callOffload(message: TestMessage, sizeFn: () => number) {
return this.offloadMessagePayloadIfNeeded(message, sizeFn)
}
}

const noopLogger: CommonLogger = {
level: 'silent',
fatal: () => undefined,
error: () => undefined,
warn: () => undefined,
info: () => undefined,
debug: () => undefined,
trace: () => undefined,
silent: () => undefined,
child: () => noopLogger,
} as unknown as CommonLogger

const noopReporter: ErrorReporter = { report: () => undefined }

const recordingStore = (storedKey: string): PayloadStore => ({
storePayload: () => Promise.resolve(storedKey),
retrievePayload: () => Promise.resolve(null),
})

const buildService = (messageTypeResolver?: MessageTypeResolverConfig) =>
new TestQueueService(
{ errorReporter: noopReporter, logger: noopLogger },
{
messageTypeResolver,
// any threshold; we'll always force size > threshold via sizeFn
payloadStoreConfig: {
messageSizeThreshold: 1,
store: recordingStore('payload-id-1'),
storeName: 's3',
},
},
)

const baseMessage: TestMessage = {
id: 'msg-1',
timestamp: '2026-01-01T00:00:00.000Z',
type: 'order.created',
payload: { large: 'data' },
}

describe('AbstractQueueService.offloadMessagePayloadIfNeeded — `type` preservation', () => {
it('preserves `type` when no messageTypeResolver is configured', async () => {
const svc = buildService(undefined)
const result = (await svc.callOffload(baseMessage, () => 9999)) as OffloadedPayloadPointerPayload
expect((result as unknown as TestMessage).type).toBe('order.created')
expect(result.payloadRef?.id).toBe('payload-id-1')
expect(result.id).toBe('msg-1')
expect(result.timestamp).toBe('2026-01-01T00:00:00.000Z')
})

it('preserves `type` when messageTypeResolver is `literal` mode', async () => {
const svc = buildService({ literal: 'order.created' })
const result = (await svc.callOffload(baseMessage, () => 9999)) as OffloadedPayloadPointerPayload
expect((result as unknown as TestMessage).type).toBe('order.created')
})

it('preserves `type` when messageTypeResolver is custom `resolver` mode', async () => {
const svc = buildService({ resolver: () => 'order.created' })
const result = (await svc.callOffload(baseMessage, () => 9999)) as OffloadedPayloadPointerPayload
expect((result as unknown as TestMessage).type).toBe('order.created')
})

it('preserves `type` at the configured path when messageTypeResolver is `messageTypePath`', async () => {
const svc = buildService({ messageTypePath: 'type' })
const result = (await svc.callOffload(baseMessage, () => 9999)) as OffloadedPayloadPointerPayload
expect((result as unknown as TestMessage).type).toBe('order.created')
})

it('preserves `type` at a non-default nested path when configured via `messageTypePath`', async () => {
const svc = buildService({ messageTypePath: 'metadata.eventName' })
const message = {
id: 'msg-2',
timestamp: '2026-01-01T00:00:00.000Z',
payload: 'p',
metadata: { eventName: 'shipment.dispatched' },
} as unknown as TestMessage
const result = (await svc.callOffload(message, () => 9999)) as OffloadedPayloadPointerPayload
expect((result as unknown as { metadata: { eventName: string } }).metadata.eventName).toBe(
'shipment.dispatched',
)
})

it('does not invent a `type` when the source message has none', async () => {
const svc = buildService(undefined)
const message: TestMessage = {
id: 'msg-3',
timestamp: '2026-01-01T00:00:00.000Z',
payload: 'p',
}
const result = (await svc.callOffload(message, () => 9999)) as OffloadedPayloadPointerPayload
expect((result as unknown as TestMessage).type).toBeUndefined()
})
})
29 changes: 29 additions & 0 deletions packages/core/test/types/queueOptionsTypes.types.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Type-level tests for `CommonQueueOptions`. These guard against silent removals of legacy
* options. The lib historically exposed `messageTypeField`, removed in core 25.x in favor
* of `messageTypeResolver`. Callers that still pass the legacy option must get a compile-time
* error rather than silently broken runtime behavior (for example: payload offloading drops
* the message `type` field, then SNS subscriptions with `FilterPolicyScope: 'MessageBody'`
* filter on `type` fail to deliver to SQS).
*
* Run with: npx tsc --noEmit
*/
import { describe, it } from 'vitest'
import type { CommonQueueOptions } from '../../lib/types/queueOptionsTypes.ts'

describe('CommonQueueOptions — legacy `messageTypeField`', () => {
it('rejects passing the removed `messageTypeField` option', () => {
const _bad: CommonQueueOptions = {
// @ts-expect-error - `messageTypeField` was removed in core 25.x; use `messageTypeResolver` instead
messageTypeField: 'type',
}
})

it('accepts the supported `messageTypeResolver` shape', () => {
const _ok1: CommonQueueOptions = { messageTypeResolver: { messageTypePath: 'type' } }
const _ok2: CommonQueueOptions = { messageTypeResolver: { literal: 'order.created' } }
const _ok3: CommonQueueOptions = {
messageTypeResolver: { resolver: () => 'order.created' },
}
})
})
44 changes: 44 additions & 0 deletions packages/sns/test/SnsPublisherManager.types.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Type-level guard for `SnsPublisherManager`. Reproduces the call shape that file-storage-service
* was using when payload offloading silently broke after the core 24.x → 25.x bump:
*
* ```ts
* new SnsPublisherManager(deps, {
* ...
* newPublisherOptions: {
* messageTypeField: 'type', // <- removed in core 25.x; ignored at runtime, dropped `type` from offloaded SNS body
* ...
* },
* })
* ```
*
* This must now fail at compile time.
*
* Run with: npx tsc --noEmit
*/
import { describe, it } from 'vitest'
import type { SNSPublisherOptions } from '../lib/sns/AbstractSnsPublisher.ts'

type AnyMessage = { type: string }

type NewPublisherOptions = Omit<
SNSPublisherOptions<AnyMessage>,
'messageSchemas' | 'creationConfig' | 'locatorConfig'
>

describe('SnsPublisherManager `newPublisherOptions` — legacy `messageTypeField`', () => {
it('rejects the removed `messageTypeField` option', () => {
const _bad: NewPublisherOptions = {
// @ts-expect-error - `messageTypeField` was removed in core 25.x; use `messageTypeResolver` instead
messageTypeField: 'type',
messageIdField: 'id',
}
Comment on lines +30 to +35
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the expect type of option from vitest?

})

it('accepts `messageTypeResolver`', () => {
const _ok: NewPublisherOptions = {
messageTypeResolver: { messageTypePath: 'type' },
messageIdField: 'id',
}
})
})
Loading