|
1 | | -import { once } from 'node:events' |
| 1 | +import { randomUUID } from 'node:crypto' |
2 | 2 | import { waitAndRetry } from '@lokalise/universal-ts-utils/node' |
3 | | -import { KafkaConsumer, type Message, Producer, features, librdkafkaVersion } from 'node-rdkafka' |
| 3 | +import { |
| 4 | + Consumer, |
| 5 | + type Message, |
| 6 | + Producer, |
| 7 | + stringDeserializers, |
| 8 | + stringSerializers, |
| 9 | +} from '@platformatic/kafka' |
| 10 | +import { ProduceAcks } from '@platformatic/kafka' |
| 11 | +import { afterAll } from 'vitest' |
| 12 | +import { type TestContext, registerDependencies } from '../test/testContext.ts' |
4 | 13 |
|
5 | 14 | // TODO: to be removed once we have proper tests |
6 | 15 | describe('Test', () => { |
7 | | - it('should use node-rdkafka', () => { |
8 | | - expect(features).toBeDefined() |
9 | | - expect(librdkafkaVersion).toBeDefined() |
| 16 | + let testContext: TestContext |
| 17 | + |
| 18 | + beforeAll(async () => { |
| 19 | + testContext = await registerDependencies() |
| 20 | + }) |
| 21 | + |
| 22 | + afterAll(async () => { |
| 23 | + await testContext.dispose() |
10 | 24 | }) |
11 | 25 |
|
12 | 26 | it('should send and receive a message', { timeout: 10000 }, async () => { |
13 | 27 | // Given |
14 | | - const brokers = 'localhost:9092' |
| 28 | + const clientId = randomUUID() |
15 | 29 | // Use a fresh, unique topic per run to avoid stale state |
16 | 30 | const topic = `test-topic-${Date.now()}` |
17 | 31 | const messageValue = 'My test message' |
18 | 32 |
|
19 | | - const receivedMessages: Message[] = [] |
| 33 | + const receivedMessages: Message<string, string, string, string>[] = [] |
20 | 34 |
|
21 | | - // Create a producer |
| 35 | + // Create producer |
22 | 36 | const producer = new Producer({ |
23 | | - 'metadata.broker.list': brokers, |
24 | | - 'allow.auto.create.topics': true, |
| 37 | + clientId, |
| 38 | + bootstrapBrokers: testContext.cradle.kafkaConfig.brokers, |
| 39 | + serializers: stringSerializers, |
| 40 | + autocreateTopics: true, |
25 | 41 | }) |
26 | | - producer.connect() |
27 | | - await once(producer, 'ready') |
28 | 42 |
|
29 | | - // Create a consumer with a unique group and disable auto-commit for fresh offsets |
30 | | - const consumer = new KafkaConsumer( |
31 | | - { |
32 | | - 'group.id': 'test-group', |
33 | | - 'metadata.broker.list': brokers, |
34 | | - 'allow.auto.create.topics': true, |
35 | | - 'enable.auto.commit': false, |
36 | | - }, |
37 | | - { 'auto.offset.reset': 'earliest' }, |
38 | | - ) |
39 | | - consumer.connect() |
| 43 | + // Create consumer |
| 44 | + const consumer = new Consumer({ |
| 45 | + clientId, |
| 46 | + groupId: randomUUID(), |
| 47 | + bootstrapBrokers: testContext.cradle.kafkaConfig.brokers, |
| 48 | + deserializers: stringDeserializers, |
| 49 | + autocreateTopics: true, |
| 50 | + }) |
40 | 51 |
|
41 | | - await new Promise<void>((resolve, reject) => |
42 | | - consumer |
43 | | - .on('ready', () => { |
44 | | - consumer.subscribe([topic]) |
45 | | - consumer.consume() |
46 | | - resolve() |
47 | | - }) |
48 | | - .on('event.error', (err) => reject(err)) |
49 | | - .on('data', (data) => { |
50 | | - receivedMessages.push(data) |
51 | | - }), |
52 | | - ) |
| 52 | + const stream = await consumer.consume({ topics: [topic] }) |
| 53 | + stream.on('data', (message) => { |
| 54 | + receivedMessages.push(message) |
| 55 | + stream.close() |
| 56 | + }) |
53 | 57 |
|
54 | 58 | // When |
55 | | - producer.produce(topic, null, Buffer.from(messageValue)) |
56 | | - producer.flush() |
| 59 | + await producer.send({ |
| 60 | + messages: [{ topic, value: messageValue }], |
| 61 | + acks: ProduceAcks.NO_RESPONSE, |
| 62 | + }) |
57 | 63 |
|
58 | 64 | // Then |
59 | 65 | await waitAndRetry(() => receivedMessages.length > 0, 10, 800) |
60 | 66 | expect(receivedMessages).toHaveLength(1) |
61 | 67 | expect(receivedMessages[0]?.value?.toString()).toBe(messageValue) |
62 | 68 |
|
63 | 69 | // Cleanup |
64 | | - producer.disconnect() |
65 | | - consumer.disconnect() |
| 70 | + producer.close() |
| 71 | + consumer.close() |
66 | 72 | }) |
67 | 73 | }) |
0 commit comments