Skip to content

Commit 6abab55

Browse files
authored
feat: Kafka improving memory handling (#393)
* Improve KafkaMessageBatchStream memory handling by using transform and callback approach * Using new batch procesing class * Coverage fix * Trying to fix backpreassure issues due to timeout flush * name improvement * Adding test to verify proper timeout flush handling * Lint fix * Improving tests
1 parent a40c3cb commit 6abab55

3 files changed

Lines changed: 210 additions & 128 deletions

File tree

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,12 @@ export abstract class AbstractKafkaConsumer<
149149
if (!this.consumerStream && !this.messageBatchStream) return false
150150
try {
151151
return this.consumer.isConnected()
152+
/* v8 ignore start */
152153
} catch (_) {
153154
// this should not happen, but if so it means the consumer is not healthy
154-
/* v8 ignore next */
155155
return false
156156
}
157+
/* v8 ignore stop */
157158
}
158159

159160
/**
@@ -165,11 +166,12 @@ export abstract class AbstractKafkaConsumer<
165166
if (!this.consumerStream && !this.messageBatchStream) return false
166167
try {
167168
return this.consumer.isActive()
169+
/* v8 ignore start */
168170
} catch (_) {
169171
// this should not happen, but if so it means the consumer is not healthy
170-
/* v8 ignore next */
171172
return false
172173
}
174+
/* v8 ignore stop */
173175
}
174176

175177
async init(): Promise<void> {
@@ -188,14 +190,19 @@ export abstract class AbstractKafkaConsumer<
188190
})
189191

190192
this.consumerStream = await this.consumer.consume({ ...consumeOptions, topics })
193+
this.consumerStream.on('error', (error) => this.handlerError(error))
194+
191195
if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) {
192196
this.messageBatchStream = new KafkaMessageBatchStream<
193197
DeserializedMessage<SupportedMessageValues<TopicsConfig>>
194-
>({
195-
batchSize: this.options.batchProcessingOptions.batchSize,
196-
timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds,
197-
})
198+
>(
199+
(batch) =>
200+
this.consume(batch.topic, batch.messages).catch((error) => this.handlerError(error)),
201+
this.options.batchProcessingOptions,
202+
)
198203
this.consumerStream.pipe(this.messageBatchStream)
204+
} else {
205+
this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error))
199206
}
200207
} catch (error) {
201208
throw new InternalError({
@@ -204,14 +211,6 @@ export abstract class AbstractKafkaConsumer<
204211
cause: error,
205212
})
206213
}
207-
208-
if (this.options.batchProcessingEnabled && this.messageBatchStream) {
209-
this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error))
210-
} else {
211-
this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error))
212-
}
213-
214-
this.consumerStream.on('error', (error) => this.handlerError(error))
215214
}
216215

217216
private async handleSyncStream(
@@ -224,16 +223,6 @@ export abstract class AbstractKafkaConsumer<
224223
)
225224
}
226225
}
227-
private async handleSyncStreamBatch(
228-
stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>,
229-
): Promise<void> {
230-
for await (const messageBatch of stream) {
231-
await this.consume(
232-
messageBatch.topic,
233-
messageBatch.messages as DeserializedMessage<SupportedMessageValues<TopicsConfig>>,
234-
)
235-
}
236-
}
237226

238227
async close(): Promise<void> {
239228
if (!this.consumerStream && !this.messageBatchStream) {
@@ -291,7 +280,6 @@ export abstract class AbstractKafkaConsumer<
291280
const firstMessage = validMessages[0]!
292281
const requestContext = this.getRequestContext(firstMessage)
293282

294-
/* v8 ignore next */
295283
const transactionId = randomUUID()
296284
this.transactionObservabilityManager?.start(this.buildTransactionName(topic), transactionId)
297285

packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts

Lines changed: 130 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { setTimeout } from 'node:timers/promises'
2+
import { waitAndRetry } from '@lokalise/universal-ts-utils/node'
13
import { KafkaMessageBatchStream, type MessageBatch } from './KafkaMessageBatchStream.ts'
24

35
describe('KafkaMessageBatchStream', () => {
@@ -12,22 +14,27 @@ describe('KafkaMessageBatchStream', () => {
1214
}))
1315

1416
// When
15-
const batchStream = new KafkaMessageBatchStream<any>({
16-
batchSize: 3,
17-
timeoutMilliseconds: 10000,
18-
}) // Setting big timeout to check batch size only
19-
2017
const receivedBatches: MessageBatch<any>[] = []
2118

22-
const dataFetchingPromise = new Promise((resolve) => {
23-
batchStream.on('data', (batch) => {
19+
let resolvePromise: () => void
20+
const dataFetchingPromise = new Promise<void>((resolve) => {
21+
resolvePromise = resolve
22+
})
23+
24+
const batchStream = new KafkaMessageBatchStream<any>(
25+
(batch) => {
2426
receivedBatches.push(batch)
25-
// We expect 3 batches and last message waiting in the stream
27+
// We expect 3 batches and the last message waiting in the stream
2628
if (receivedBatches.length >= 3) {
27-
resolve(null)
29+
resolvePromise()
2830
}
29-
})
30-
})
31+
return Promise.resolve()
32+
},
33+
{
34+
batchSize: 3,
35+
timeoutMilliseconds: 10000,
36+
},
37+
) // Setting big timeout to check batch size only
3138

3239
for (const message of messages) {
3340
batchStream.write(message)
@@ -54,24 +61,25 @@ describe('KafkaMessageBatchStream', () => {
5461
}))
5562

5663
// When
57-
const batchStream = new KafkaMessageBatchStream<any>({
58-
batchSize: 1000,
59-
timeoutMilliseconds: 500,
60-
}) // Setting big batch size to check timeout only
61-
6264
const receivedBatches: MessageBatch<any>[] = []
63-
batchStream.on('data', (batch) => {
64-
receivedBatches.push(batch)
65-
})
65+
66+
const batchStream = new KafkaMessageBatchStream<any>(
67+
(batch) => {
68+
receivedBatches.push(batch)
69+
return Promise.resolve()
70+
},
71+
{
72+
batchSize: 1000,
73+
timeoutMilliseconds: 100,
74+
},
75+
) // Setting big batch size to check timeout only
6676

6777
for (const message of messages) {
6878
batchStream.write(message)
6979
}
7080

71-
// Sleep 1 seconds to let the timeout trigger
72-
await new Promise((resolve) => {
73-
setTimeout(resolve, 1000)
74-
})
81+
// Sleep to let the timeout trigger
82+
await setTimeout(150)
7583

7684
// Then
7785
expect(receivedBatches).toEqual([{ topic, partition: 0, messages }])
@@ -104,16 +112,16 @@ describe('KafkaMessageBatchStream', () => {
104112
]
105113

106114
// When
107-
const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({
108-
batchSize: 2,
109-
timeoutMilliseconds: 10000,
110-
}) // Setting big timeout to check batch size only
111-
112115
const receivedBatchesByTopicPartition: Record<string, any[][]> = {}
113-
114116
let receivedMessagesCounter = 0
115-
const dataFetchingPromise = new Promise((resolve) => {
116-
batchStream.on('data', (batch) => {
117+
118+
let resolvePromise: () => void
119+
const dataFetchingPromise = new Promise<void>((resolve) => {
120+
resolvePromise = resolve
121+
})
122+
123+
const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>(
124+
(batch) => {
117125
const key = `${batch.topic}:${batch.partition}`
118126
if (!receivedBatchesByTopicPartition[key]) {
119127
receivedBatchesByTopicPartition[key] = []
@@ -123,10 +131,16 @@ describe('KafkaMessageBatchStream', () => {
123131
// We expect 5 batches and last message waiting in the stream
124132
receivedMessagesCounter++
125133
if (receivedMessagesCounter >= 5) {
126-
resolve(null)
134+
resolvePromise()
127135
}
128-
})
129-
})
136+
137+
return Promise.resolve()
138+
},
139+
{
140+
batchSize: 2,
141+
timeoutMilliseconds: 10000,
142+
},
143+
) // Setting big timeout to check batch size only
130144

131145
for (const message of messages) {
132146
batchStream.write(message)
@@ -177,25 +191,31 @@ describe('KafkaMessageBatchStream', () => {
177191
]
178192

179193
// When
180-
const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({
181-
batchSize: 2,
182-
timeoutMilliseconds: 10000,
183-
}) // Setting big timeout to check batch size only
184-
185194
const receivedBatches: any[] = []
186-
187195
let receivedBatchesCounter = 0
188-
const dataFetchingPromise = new Promise((resolve) => {
189-
batchStream.on('data', (batch) => {
196+
197+
let resolvePromise: () => void
198+
const dataFetchingPromise = new Promise<void>((resolve) => {
199+
resolvePromise = resolve
200+
})
201+
202+
const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>(
203+
(batch) => {
190204
receivedBatches.push(batch)
191205

192206
// We expect 4 batches (2 per partition)
193207
receivedBatchesCounter++
194208
if (receivedBatchesCounter >= 4) {
195-
resolve(null)
209+
resolvePromise()
196210
}
197-
})
198-
})
211+
212+
return Promise.resolve()
213+
},
214+
{
215+
batchSize: 2,
216+
timeoutMilliseconds: 10000,
217+
},
218+
) // Setting big timeout to check batch size only
199219

200220
for (const message of messages) {
201221
batchStream.write(message)
@@ -211,4 +231,69 @@ describe('KafkaMessageBatchStream', () => {
211231
{ topic, partition: 1, messages: [messages[5], messages[7]] },
212232
])
213233
})
234+
235+
it('should handle backpressure correctly when timeout flush is slow', async () => {
236+
// Given
237+
const topic = 'test-topic'
238+
const messages = Array.from({ length: 6 }, (_, i) => ({
239+
id: i + 1,
240+
content: `Message ${i + 1}`,
241+
topic,
242+
partition: 0,
243+
}))
244+
245+
const batchStartTimes: number[] = [] // Track start times of batch processing
246+
const batchEndTimes: number[] = [] // Track end times of batch processing
247+
const batchMessageCounts: number[] = [] // Track number of messages per batch
248+
let maxConcurrentBatches = 0 // Track max concurrent batches
249+
250+
let batchesProcessing = 0
251+
const batchStream = new KafkaMessageBatchStream<any>(
252+
async (batch) => {
253+
batchStartTimes.push(Date.now())
254+
batchMessageCounts.push(batch.messages.length)
255+
256+
batchesProcessing++
257+
maxConcurrentBatches = Math.max(maxConcurrentBatches, batchesProcessing)
258+
259+
// Simulate batch processing (50ms per batch)
260+
await setTimeout(50)
261+
262+
batchEndTimes.push(Date.now())
263+
batchesProcessing--
264+
},
265+
{
266+
batchSize: 1000, // Large batch size to never trigger size-based flushing
267+
timeoutMilliseconds: 10, // Short timeout to trigger flush after each message
268+
},
269+
)
270+
271+
// When: Write messages with 20ms delay between them
272+
// Since processing (50ms) is slower than message arrival + timeout, backpressure causes accumulation
273+
for (const message of messages) {
274+
batchStream.write(message)
275+
await setTimeout(20)
276+
}
277+
278+
// Then
279+
// Wait until all 3 batches have been processed
280+
await waitAndRetry(() => batchMessageCounts.length >= 3, 500, 20)
281+
282+
// Backpressure causes messages to accumulate while previous batch processes:
283+
// - Batch 1: Message 1 (flushed at 10ms timeout)
284+
// - Batch 2: Messages 2-4 (accumulated during Batch 1 processing, including Message 4 arriving at ~60ms)
285+
// - Batch 3: Messages 5-6 (accumulated during Batch 2 processing)
286+
expect(batchMessageCounts).toEqual([1, 3, 2])
287+
288+
// Verify that batches never processed in parallel (backpressure working)
289+
expect(maxConcurrentBatches).toBe(1) // Should never process more than 1 batch at a time
290+
291+
// Verify that batches were processed sequentially (each starts after previous ends)
292+
for (let i = 1; i < batchStartTimes.length; i++) {
293+
const previousEndTime = batchEndTimes[i - 1]
294+
const currentStartTime = batchStartTimes[i]
295+
// The current batch must start after the previous batch finished
296+
expect(currentStartTime).toBeGreaterThanOrEqual(previousEndTime ?? 0)
297+
}
298+
})
214299
})

0 commit comments

Comments
 (0)