Skip to content

Commit 2827e7e

Browse files
irfanh94Irfan Hodzic
andauthored
Strip kafka config from Consumer.consume() (#432)
Co-authored-by: Irfan Hodzic <irfan.hodzic@Irfans-MacBook-Pro-2.local>
1 parent 8f1c31b commit 2827e7e

2 files changed

Lines changed: 52 additions & 2 deletions

File tree

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,12 @@ export abstract class AbstractKafkaConsumer<
172172
})
173173

174174
try {
175-
const { handlers: _, ...consumeOptions } = this.options // Handlers cannot be passed to consume method
175+
// `kafka` is excluded so that connection-level options (including function-valued ones such
176+
// as SASL/OAUTHBEARER token providers used for AWS MSK IAM) are not forwarded into
177+
// `consume()`. `MessagesStream` runs `structuredClone` on the consume options and would
178+
// throw `DataCloneError` if any function reached it. See:
179+
// https://github.com/platformatic/kafka/blob/main/src/clients/consumer/messages-stream.ts
180+
const { handlers: _, kafka: __, ...consumeOptions } = this.options // Handlers cannot be passed to consume method
176181

177182
// https://github.com/platformatic/kafka/blob/main/docs/consumer.md#my-consumer-is-not-receiving-any-message-when-the-application-restarts
178183
await this.consumer.joinGroup({

packages/kafka/test/consumer/PermissionConsumer.spec.ts

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { randomUUID } from 'node:crypto'
22
import { waitAndRetry } from '@lokalise/universal-ts-utils/node'
3-
import { Producer, stringSerializers } from '@platformatic/kafka'
3+
import { Consumer, Producer, stringSerializers } from '@platformatic/kafka'
44
import { afterAll, expect, type MockInstance } from 'vitest'
55
import z from 'zod/v4'
66
import { KafkaHandlerConfig, type RequestContext } from '../../lib/index.ts'
@@ -112,6 +112,51 @@ describe('PermissionConsumer', () => {
112112
// When - Then
113113
await expect(consumer.init()).resolves.not.toThrow()
114114
})
115+
116+
// Regression: @platformatic/kafka's MessagesStream runs `structuredClone`
117+
// on the consume options after destructuring its known fields. If the
118+
// connection-level `kafka` config is forwarded, anything function-valued
119+
// reachable from it (e.g. a SASL/OAUTHBEARER token provider used for AWS
120+
// MSK IAM) crashes the clone with `DOMException [DataCloneError]` and
121+
// prevents the consumer from starting. The toolkit must therefore strip
122+
// `kafka` from consume options.
123+
describe('does not forward kafka connection config to Consumer.consume()', () => {
124+
it('strips `kafka` from consume options', async () => {
125+
// Given
126+
const consumeSpy = vi.spyOn(Consumer.prototype, 'consume')
127+
consumer = new PermissionConsumer(testContext.cradle)
128+
129+
// When
130+
await consumer.init()
131+
132+
// Then
133+
expect(consumeSpy).toHaveBeenCalledOnce()
134+
const consumeArgs = consumeSpy.mock.calls[0]![0] as unknown as Record<string, unknown>
135+
expect(consumeArgs).not.toHaveProperty('kafka')
136+
})
137+
138+
it('keeps `kafka` out of consume options even when it carries a function-valued field (e.g. SASL/OAUTHBEARER token provider)', async () => {
139+
// Given - simulate AWS MSK IAM, where kafka.sasl is `{ mechanism: 'OAUTHBEARER', token: async () => '...' }`.
140+
// We can't actually run SASL against the plaintext test broker, so we
141+
// attach a function under `kafka` after construction. What matters is
142+
// that no part of the kafka config — function-valued or otherwise —
143+
// ends up in the args passed to `Consumer.consume()`.
144+
const consumeSpy = vi.spyOn(Consumer.prototype, 'consume')
145+
consumer = new PermissionConsumer(testContext.cradle)
146+
;(
147+
consumer as unknown as { options: { kafka: Record<string, unknown> } }
148+
).options.kafka.tokenProvider = async () => 'signed-token'
149+
150+
// When
151+
await consumer.init()
152+
153+
// Then
154+
expect(consumeSpy).toHaveBeenCalledOnce()
155+
const consumeArgs = consumeSpy.mock.calls[0]![0] as unknown as Record<string, unknown>
156+
expect(consumeArgs).not.toHaveProperty('kafka')
157+
expect(consumeArgs).not.toHaveProperty('tokenProvider')
158+
})
159+
})
115160
})
116161

117162
describe('isConnected', () => {

0 commit comments

Comments
 (0)