Skip to content

Commit 2702e74

Browse files
authored
feat: add zstd message compression codec for SNS and SQS (#442)
1 parent f596e2a commit 2702e74

43 files changed

Lines changed: 2774 additions & 147 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ensure-labels.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ jobs:
2424
- name: Check one of required labels are set
2525
uses: docker://agilepathway/pull-request-label-checker:v1.6.65
2626
with:
27-
one_of: major,minor,patch,skip-release
27+
one_of: major,minor,patch,skip-release,release-same-version
2828
repo_token: ${{ secrets.GITHUB_TOKEN }}

.github/workflows/publish.yml

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ jobs:
2323
has_changes: ${{ steps.finalize.outputs.has_changes }}
2424
bump: ${{ steps.finalize.outputs.bump }}
2525
should_publish: ${{ steps.finalize.outputs.should_publish }}
26+
same_version: ${{ steps.finalize.outputs.same_version }}
2627
steps:
2728
- name: Set default outputs
2829
id: defaults
@@ -31,6 +32,7 @@ jobs:
3132
echo "has_changes=false" >> $GITHUB_OUTPUT
3233
echo "bump=patch" >> $GITHUB_OUTPUT
3334
echo "should_publish=false" >> $GITHUB_OUTPUT
35+
echo "same_version=false" >> $GITHUB_OUTPUT
3436
3537
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
3638
with:
@@ -79,11 +81,11 @@ jobs:
7981
echo "PR labels: $LABELS"
8082
echo "labels=$LABELS" >> $GITHUB_OUTPUT
8183
82-
# Check if PR has version bump labels
83-
if echo "$LABELS" | grep -qE '\b(patch|minor|major)\b'; then
84+
# Check if PR has a release label (a version-bump label or release-same-version)
85+
if echo "$LABELS" | grep -qE '\b(patch|minor|major|release-same-version)\b'; then
8486
echo "should_publish=true" >> $GITHUB_OUTPUT
8587
else
86-
echo "No version bump label found (patch/minor/major)"
88+
echo "No release label found (patch/minor/major/release-same-version)"
8789
echo "should_publish=false" >> $GITHUB_OUTPUT
8890
fi
8991
@@ -93,13 +95,23 @@ jobs:
9395
env:
9496
LABELS: ${{ steps.pr-info.outputs.labels }}
9597
run: |
96-
if echo "$LABELS" | grep -qE '\bmajor\b'; then
98+
# release-same-version takes precedence: publish the versions already in
99+
# package.json as-is, with no bump. Use it when versions were set explicitly
100+
# in the PR (e.g. a deliberate, hand-committed semver-major bump).
101+
if echo "$LABELS" | grep -qE '\brelease-same-version\b'; then
102+
echo "same_version=true" >> $GITHUB_OUTPUT
103+
echo "bump=none" >> $GITHUB_OUTPUT
104+
echo "Release mode: publish current package.json versions without bumping"
105+
elif echo "$LABELS" | grep -qE '\bmajor\b'; then
106+
echo "same_version=false" >> $GITHUB_OUTPUT
97107
echo "bump=major" >> $GITHUB_OUTPUT
98108
echo "Version bump: major"
99109
elif echo "$LABELS" | grep -qE '\bminor\b'; then
110+
echo "same_version=false" >> $GITHUB_OUTPUT
100111
echo "bump=minor" >> $GITHUB_OUTPUT
101112
echo "Version bump: minor"
102113
else
114+
echo "same_version=false" >> $GITHUB_OUTPUT
103115
echo "bump=patch" >> $GITHUB_OUTPUT
104116
echo "Version bump: patch"
105117
fi
@@ -240,6 +252,8 @@ jobs:
240252
DEFAULT_BUMP: ${{ steps.defaults.outputs.bump }}
241253
PR_SHOULD_PUBLISH: ${{ steps.pr-info.outputs.should_publish }}
242254
DEFAULT_SHOULD_PUBLISH: ${{ steps.defaults.outputs.should_publish }}
255+
VERSION_SAME: ${{ steps.version.outputs.same_version }}
256+
DEFAULT_SAME_VERSION: ${{ steps.defaults.outputs.same_version }}
243257
run: |
244258
# Use build-matrix outputs if available, otherwise defaults
245259
if [ -n "$BUILD_MATRIX" ]; then
@@ -266,6 +280,12 @@ jobs:
266280
echo "should_publish=$DEFAULT_SHOULD_PUBLISH" >> $GITHUB_OUTPUT
267281
fi
268282
283+
if [ -n "$VERSION_SAME" ]; then
284+
echo "same_version=$VERSION_SAME" >> $GITHUB_OUTPUT
285+
else
286+
echo "same_version=$DEFAULT_SAME_VERSION" >> $GITHUB_OUTPUT
287+
fi
288+
269289
# Single job that bumps, publishes, and pushes tags/commits at the end
270290
# This avoids the "checkout wrong commit" problem of multi-job workflows
271291
release:
@@ -305,8 +325,10 @@ jobs:
305325
- name: Install dependencies
306326
run: pnpm install --frozen-lockfile --ignore-scripts
307327

328+
# Skipped for release-same-version: the versions in package.json are published as-is.
308329
- name: Bump versions for changed packages
309330
id: bump
331+
if: needs.detect-changes.outputs.same_version != 'true'
310332
env:
311333
MATRIX: ${{ needs.detect-changes.outputs.matrix }}
312334
BUMP: ${{ needs.detect-changes.outputs.bump }}

biome.json

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,17 @@
1515
"noUnusedPrivateClassMembers": "off"
1616
}
1717
}
18-
}
18+
},
19+
"overrides": [
20+
{
21+
"includes": ["**/bench/**"],
22+
"linter": {
23+
"rules": {
24+
"suspicious": {
25+
"noConsole": "off"
26+
}
27+
}
28+
}
29+
}
30+
]
1931
}

packages/amqp/lib/AbstractAmqpPublisher.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ export abstract class AbstractAmqpPublisher<
5151
dependencies: AMQPDependencies,
5252
options: AMQPPublisherOptions<MessagePayloadType, CreationConfig, LocatorConfig>,
5353
) {
54+
if (options.codec) {
55+
throw new Error(
56+
'codec is not supported by AbstractAmqpPublisher. Remove the codec option or use an SQS/SNS publisher.',
57+
)
58+
}
59+
5460
super(dependencies, options)
5561

5662
this.messageSchemaContainer = this.resolvePublisherMessageSchemaContainer(options)

packages/amqp/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/amqp",
3-
"version": "24.0.0",
3+
"version": "24.1.0",
44
"engines": {
55
"node": ">=18"
66
},
@@ -43,7 +43,7 @@
4343
"@biomejs/biome": "^2.3.8",
4444
"@lokalise/biome-config": "^3.1.0",
4545
"@lokalise/tsconfig": "^3.0.0",
46-
"@message-queue-toolkit/core": "*",
46+
"@message-queue-toolkit/core": "workspace:*",
4747
"@types/amqplib": "0.10.8",
4848
"@types/node": "^25.5.0",
4949
"@vitest/coverage-v8": "^4.0.18",

packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { asClass, asFunction, Lifetime } from 'awilix'
66
import { asMockFunction } from 'awilix-manager'
77
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest'
88
import { ZodError } from 'zod/v4'
9+
import { AbstractAmqpQueuePublisher } from '../../lib/AbstractAmqpQueuePublisher.ts'
910
import { deserializeAmqpMessage } from '../../lib/amqpMessageDeserializer.ts'
1011
import { AmqpPermissionConsumer } from '../consumers/AmqpPermissionConsumer.ts'
1112
import type {
@@ -25,6 +26,17 @@ import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext.ts'
2526
import { AmqpPermissionPublisher } from './AmqpPermissionPublisher.ts'
2627

2728
describe('PermissionPublisher', () => {
29+
describe('constructor', () => {
30+
it('throws when codec option is set (codec is not supported by AMQP publishers)', () => {
31+
// AmqpPermissionPublisher strips unknown options before calling super(), so we test
32+
// the guard via a minimal pass-through subclass that mirrors real user code.
33+
class TestPublisher extends AbstractAmqpQueuePublisher<{ messageType: string }> {}
34+
expect(() => new TestPublisher({} as any, { codec: 'zstd' } as any)).toThrow(
35+
'codec is not supported by AbstractAmqpPublisher',
36+
)
37+
})
38+
})
39+
2840
describe('logging', () => {
2941
let logger: FakeLogger
3042
let diContainer: AwilixContainer<Dependencies>

packages/core/README.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,58 @@ class MyPayloadStore implements PayloadStore {
641641
}
642642
```
643643

644+
#### Message compression (codec)
645+
646+
Publishers can compress outgoing messages by setting `codec` in their options. The codec implementation (zstd via the Node.js built-in `zlib` module) ships inside `@message-queue-toolkit/core` — there is no extra package to install. Requires **Node.js >=22.15.0**. Only the SQS and SNS adapters support compression.
647+
648+
**Built-in zstd:**
649+
650+
```typescript
651+
import { MessageCodecEnum } from '@message-queue-toolkit/core'
652+
653+
new MyPublisher(deps, {
654+
codec: MessageCodecEnum.ZSTD,
655+
// Optional: skip compression for messages below this byte threshold (default: 512).
656+
// Small messages often expand when compressed; set to 0 to always compress.
657+
skipCompressionBelow: 512,
658+
})
659+
```
660+
661+
**Custom codec** (bring your own compression library):
662+
663+
```typescript
664+
import type { MessageCodecHandler } from '@message-queue-toolkit/core'
665+
666+
class MyLz4Handler implements MessageCodecHandler {
667+
async compress(data: Buffer): Promise<Buffer> { /* ... */ }
668+
async decompress(data: Buffer): Promise<Buffer> { /* ... */ }
669+
createCompressStream(): Transform { /* return a Transform stream */ }
670+
}
671+
672+
const codec = { name: 'lz4', handler: new MyLz4Handler() }
673+
new MyPublisher(deps, { codec })
674+
new MyConsumer(deps, { codecs: [codec] }) // register custom codec on the consumer
675+
```
676+
677+
Compressed messages are wrapped in a self-describing envelope `{ __mqtCodec: '<name>', __mqtData: '<base64>', ...preserved fields }`. The message's identity/routing fields (`id`, `timestamp`, `type`, deduplication fields) are copied alongside `__mqtData` as plaintext — the same fields an offloaded-payload pointer preserves — so broker-side filtering (e.g. SNS body-scoped `FilterPolicy`) keeps working on them. Built-in codecs (e.g. zstd) are auto-detected on every consumer — no consumer option needed. For custom codecs, pass `codecs: [{ name, handler }]` to register them on the consumer.
678+
679+
> **Roll out consumers before publishers:** auto-detection only works on a consumer running a library version that supports the codec (and, for custom codecs, with that codec registered). Upgrade all consumers of a queue first, then enable `codec` on publishers.
680+
681+
#### Interaction with codec (compression)
682+
683+
When both `codec` and `payloadStoreConfig` are set on a publisher, compression and offloading work together with a single compression pass:
684+
685+
1. The message is compressed **once** at publish time.
686+
2. The **codec envelope wire size** (base64-encoded compressed bytes + JSON framing) is compared against `messageSizeThreshold`.
687+
3. If the envelope size exceeds the threshold, the raw compressed bytes are stored in the payload store. The codec name is written to `payloadRef.codec` so the consumer knows how to decompress after retrieval.
688+
4. If the envelope size fits within the threshold, the message is sent inline as a self-describing codec envelope — S3 is never touched.
689+
690+
`skipCompressionBelow` is honored here too: a message whose serialized JSON is below the threshold skips compression entirely and is offloaded (or sent inline) as plain JSON.
691+
692+
> **Note:** for large payloads the compress-and-offload path streams the message through a temporary file under `os.tmpdir()` to avoid buffering the whole payload in memory. The temp file is always removed in a `finally` block. Environments with a read-only or unavailable temp directory (rare; AWS Lambda's `/tmp` is writable) cannot use the codec + payload-store combination.
693+
694+
This means compression can prevent offloading entirely for messages that are large before compression but small after.
695+
644696
## API Reference
645697

646698
### Types
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import type { Transform } from 'node:stream'
2+
import { promisify } from 'node:util'
3+
import zlib from 'node:zlib'
4+
import type { MessageCodecHandler, MessageCodecRegistration } from './messageCodec.ts'
5+
import { MessageCodecEnum } from './messageCodec.ts'
6+
7+
const ZSTD_UNSUPPORTED_MSG =
8+
'zlib.zstdCompress and zlib.zstdDecompress are not available in this Node.js version. ' +
9+
'Message compression requires Node.js >=22.15.0 or >=23.8.0.'
10+
11+
/**
12+
* Default upper bound on the decompressed size of a single message, in bytes (100 MiB).
13+
*
14+
* Protects consumers from decompression-bomb inputs: a tiny compressed envelope can
15+
* otherwise expand to gigabytes of highly-repetitive data and exhaust process memory.
16+
* 100 MiB is far above any realistic queue message (SQS/SNS cap bodies at 256 KiB, and
17+
* even offloaded payloads are typically single-digit MiB) while still bounding the blast
18+
* radius of a malicious or corrupt frame. Override via the {@link ZstdCodecHandler}
19+
* constructor if you legitimately handle larger messages.
20+
*/
21+
export const DEFAULT_MAX_DECOMPRESSED_BYTES = 100 * 1024 * 1024
22+
23+
// Resolved lazily — undefined on Node versions that lack zstd support.
24+
// Keeping these lazy means importing core never throws on older Node; only an
25+
// actual compress/decompress call does, and only when zstd is genuinely used.
26+
const zstdCompress =
27+
typeof zlib.zstdCompress === 'function' ? promisify(zlib.zstdCompress) : undefined
28+
const zstdDecompress =
29+
typeof zlib.zstdDecompress === 'function' ? promisify(zlib.zstdDecompress) : undefined
30+
31+
export class ZstdCodecHandler implements MessageCodecHandler {
32+
private readonly maxDecompressedBytes: number
33+
34+
/**
35+
* @param maxDecompressedBytes upper bound on a single decompressed message, in bytes.
36+
* Defaults to {@link DEFAULT_MAX_DECOMPRESSED_BYTES} (100 MiB). Decompression of an
37+
* input that would exceed this limit is rejected before the full payload is buffered.
38+
*/
39+
constructor(maxDecompressedBytes: number = DEFAULT_MAX_DECOMPRESSED_BYTES) {
40+
this.maxDecompressedBytes = maxDecompressedBytes
41+
}
42+
43+
compress(data: Buffer): Promise<Buffer> {
44+
if (!zstdCompress) throw new Error(ZSTD_UNSUPPORTED_MSG)
45+
return zstdCompress(data)
46+
}
47+
48+
decompress(data: Buffer): Promise<Buffer> {
49+
if (!zstdDecompress) throw new Error(ZSTD_UNSUPPORTED_MSG)
50+
// maxOutputLength caps the decompressed size: zstdDecompress rejects with a
51+
// RangeError once the limit is exceeded, guarding against decompression bombs.
52+
return zstdDecompress(data, { maxOutputLength: this.maxDecompressedBytes })
53+
}
54+
55+
createCompressStream(): Transform {
56+
if (typeof zlib.createZstdCompress !== 'function') throw new Error(ZSTD_UNSUPPORTED_MSG)
57+
return zlib.createZstdCompress()
58+
}
59+
}
60+
61+
const ZSTD_HANDLER = new ZstdCodecHandler()
62+
63+
/**
64+
* Allowed characters for a custom codec name: ASCII letters, digits, hyphens, underscores.
65+
* This keeps the name JSON-safe without escaping and makes it a recognisable identifier.
66+
*/
67+
const SAFE_CODEC_NAME_RE = /^[A-Za-z0-9_-]+$/
68+
69+
/**
70+
* Returns the name string that will be written into the `__mqtCodec` field of every envelope.
71+
* Throws for custom (object-form) registrations whose name contains characters that would
72+
* produce invalid JSON when interpolated raw into the envelope string.
73+
*/
74+
export function getCodecName(codec: MessageCodecRegistration): string {
75+
if (typeof codec === 'object') {
76+
if (!SAFE_CODEC_NAME_RE.test(codec.name)) {
77+
throw new Error(
78+
`Invalid codec name "${codec.name}": only ASCII letters, digits, hyphens, and underscores are allowed`,
79+
)
80+
}
81+
return codec.name
82+
}
83+
return codec
84+
}
85+
86+
/**
87+
* Resolves the {@link MessageCodecHandler} for the given codec registration.
88+
*
89+
* - String form (`MessageCodec`): returns the built-in handler for that codec.
90+
* - Object form (`{ name, handler }`): returns the provided handler directly.
91+
*/
92+
export function resolveCodecHandler(codec: MessageCodecRegistration): MessageCodecHandler {
93+
if (typeof codec === 'object') return codec.handler
94+
if (codec === MessageCodecEnum.ZSTD) return ZSTD_HANDLER
95+
throw new Error(`Unsupported codec: ${codec}`)
96+
}
97+
98+
/**
99+
* Wraps an already-compressed buffer in a codec envelope string.
100+
* Use this when you have pre-compressed bytes and want to avoid compressing twice.
101+
*
102+
* `preservedFields`, when provided, are emitted as plaintext siblings of the codec
103+
* fields (`{ ...preserved, __mqtCodec, __mqtData }`). Publishers use this to keep
104+
* identity/routing fields (`id`, `type`, …) visible on the wire so broker-side
105+
* filtering (e.g. SNS body-scoped FilterPolicy) still works on compressed messages —
106+
* the same fields an offloaded-payload pointer carries. The codec fields are written
107+
* last, so a colliding preserved key can never corrupt the envelope; consumers ignore
108+
* the preserved siblings and decode `__mqtData` only.
109+
*
110+
* Without `preservedFields` the fast path uses string concatenation instead of
111+
* JSON.stringify, avoiding an intermediate object — the base64 string and the
112+
* envelope string are the only two allocations on the inline path.
113+
*
114+
* `codecName` must already be a JSON-safe identifier (see {@link getCodecName},
115+
* which is enforced for every registration before it reaches this function).
116+
*/
117+
export function buildCodecEnvelope(
118+
compressed: Buffer,
119+
codecName: string,
120+
preservedFields?: Record<string, unknown>,
121+
): string {
122+
const data = compressed.toString('base64')
123+
if (!preservedFields || Object.keys(preservedFields).length === 0) {
124+
return `{"__mqtCodec":"${codecName}","__mqtData":"${data}"}`
125+
}
126+
// Preserved fields present: a single JSON.stringify handles all value escaping.
127+
// Codec fields are listed last so they always win over any colliding preserved key.
128+
return JSON.stringify({ ...preservedFields, __mqtCodec: codecName, __mqtData: data })
129+
}

0 commit comments

Comments
 (0)