diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 14972f7b..8913c667 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -172,7 +172,12 @@ export abstract class AbstractKafkaConsumer< }) try { - const { handlers: _, ...consumeOptions } = this.options // Handlers cannot be passed to consume method + // `kafka` is excluded so that connection-level options (including function-valued ones such + // as SASL/OAUTHBEARER token providers used for AWS MSK IAM) are not forwarded into + // `consume()`. `MessagesStream` runs `structuredClone` on the consume options and would + // throw `DataCloneError` if any function reached it. See: + // https://github.com/platformatic/kafka/blob/main/src/clients/consumer/messages-stream.ts + const { handlers: _, kafka: __, ...consumeOptions } = this.options // Handlers cannot be passed to consume method // https://github.com/platformatic/kafka/blob/main/docs/consumer.md#my-consumer-is-not-receiving-any-message-when-the-application-restarts await this.consumer.joinGroup({ diff --git a/packages/kafka/test/consumer/PermissionConsumer.spec.ts b/packages/kafka/test/consumer/PermissionConsumer.spec.ts index 8e3489b4..74111bcc 100644 --- a/packages/kafka/test/consumer/PermissionConsumer.spec.ts +++ b/packages/kafka/test/consumer/PermissionConsumer.spec.ts @@ -1,6 +1,6 @@ import { randomUUID } from 'node:crypto' import { waitAndRetry } from '@lokalise/universal-ts-utils/node' -import { Producer, stringSerializers } from '@platformatic/kafka' +import { Consumer, Producer, stringSerializers } from '@platformatic/kafka' import { afterAll, expect, type MockInstance } from 'vitest' import z from 'zod/v4' import { KafkaHandlerConfig, type RequestContext } from '../../lib/index.ts' @@ -112,6 +112,51 @@ describe('PermissionConsumer', () => { // When - Then await expect(consumer.init()).resolves.not.toThrow() }) + + // Regression: @platformatic/kafka's MessagesStream runs `structuredClone` + // on the consume options after destructuring its known fields. If the + // connection-level `kafka` config is forwarded, anything function-valued + // reachable from it (e.g. a SASL/OAUTHBEARER token provider used for AWS + // MSK IAM) crashes the clone with `DOMException [DataCloneError]` and + // prevents the consumer from starting. The toolkit must therefore strip + // `kafka` from consume options. + describe('does not forward kafka connection config to Consumer.consume()', () => { + it('strips `kafka` from consume options', async () => { + // Given + const consumeSpy = vi.spyOn(Consumer.prototype, 'consume') + consumer = new PermissionConsumer(testContext.cradle) + + // When + await consumer.init() + + // Then + expect(consumeSpy).toHaveBeenCalledOnce() + const consumeArgs = consumeSpy.mock.calls[0]![0] as unknown as Record + expect(consumeArgs).not.toHaveProperty('kafka') + }) + + it('keeps `kafka` out of consume options even when it carries a function-valued field (e.g. SASL/OAUTHBEARER token provider)', async () => { + // Given - simulate AWS MSK IAM, where kafka.sasl is `{ mechanism: 'OAUTHBEARER', token: async () => '...' }`. + // We can't actually run SASL against the plaintext test broker, so we + // attach a function under `kafka` after construction. What matters is + // that no part of the kafka config — function-valued or otherwise — + // ends up in the args passed to `Consumer.consume()`. + const consumeSpy = vi.spyOn(Consumer.prototype, 'consume') + consumer = new PermissionConsumer(testContext.cradle) + ;( + consumer as unknown as { options: { kafka: Record } } + ).options.kafka.tokenProvider = async () => 'signed-token' + + // When + await consumer.init() + + // Then + expect(consumeSpy).toHaveBeenCalledOnce() + const consumeArgs = consumeSpy.mock.calls[0]![0] as unknown as Record + expect(consumeArgs).not.toHaveProperty('kafka') + expect(consumeArgs).not.toHaveProperty('tokenProvider') + }) + }) }) describe('isConnected', () => {