11# Kafka
2-
3- This library provides utilities for implementing Kafka consumers and publishers.
2+
3+ This library provides utilities for implementing Kafka consumers and publishers using the [ @ platformatic/kafka ] ( https://github.com/platformatic/kafka ) client library .
44While following the same patterns as other message broker implementations,
55Kafka's unique characteristics require some specific adaptations in the publisher and consumer definitions.
66
77> ** _ NOTE:_ ** Check [ README.md] ( ../../README.md ) for transport-agnostic library documentation.
88
9+ ## Installation
10+
11+ ``` bash
12+ npm install @message-queue-toolkit/kafka @platformatic/kafka
13+ ```
14+
915## Publishers
1016
1117Use ` AbstractKafkaPublisher ` as a base class for publisher implementation.
1218
19+ ``` typescript
20+ import { AbstractKafkaPublisher } from ' @message-queue-toolkit/kafka'
21+ import type { KafkaDependencies , TopicConfig } from ' @message-queue-toolkit/kafka'
22+ import { z } from ' zod'
23+
24+ const MY_MESSAGE_SCHEMA = z .object ({
25+ id: z .string (),
26+ type: z .literal (' my.event' ),
27+ payload: z .object ({
28+ userId: z .string (),
29+ }),
30+ })
31+
32+ const MY_TOPICS_CONFIG = [
33+ { topic: ' my-topic' , schema: MY_MESSAGE_SCHEMA },
34+ ] as const satisfies TopicConfig []
35+
36+ export class MyPublisher extends AbstractKafkaPublisher <typeof MY_TOPICS_CONFIG > {
37+ constructor (dependencies : KafkaDependencies ) {
38+ super (dependencies , {
39+ kafka: {
40+ bootstrapBrokers: [' localhost:9092' ],
41+ clientId: ' my-app' ,
42+ },
43+ topicsConfig: MY_TOPICS_CONFIG ,
44+ autocreateTopics: true ,
45+ })
46+ }
47+ }
48+
49+ // Usage
50+ const publisher = new MyPublisher (dependencies )
51+ await publisher .init ()
52+ await publisher .publish (' my-topic' , {
53+ id: ' 123' ,
54+ type: ' my.event' ,
55+ payload: { userId: ' user-1' },
56+ })
57+ await publisher .close ()
58+ ```
59+
1360See [ test publisher] ( test/publisher/PermissionPublisher.ts ) for an example of implementation.
1461
1562## Consumers
1663
1764Use ` AbstractKafkaConsumer ` as a base class for consumer implementation.
1865
66+ ``` typescript
67+ import {
68+ AbstractKafkaConsumer ,
69+ KafkaHandlerConfig ,
70+ KafkaHandlerRoutingBuilder
71+ } from ' @message-queue-toolkit/kafka'
72+ import type { KafkaConsumerDependencies , TopicConfig } from ' @message-queue-toolkit/kafka'
73+
74+ type MyExecutionContext = {
75+ userService: UserService
76+ }
77+
78+ export class MyConsumer extends AbstractKafkaConsumer <
79+ typeof MY_TOPICS_CONFIG ,
80+ MyExecutionContext ,
81+ false
82+ > {
83+ constructor (
84+ dependencies : KafkaConsumerDependencies ,
85+ executionContext : MyExecutionContext ,
86+ ) {
87+ super (
88+ dependencies ,
89+ {
90+ kafka: {
91+ bootstrapBrokers: [' localhost:9092' ],
92+ clientId: ' my-app' ,
93+ },
94+ groupId: ' my-consumer-group' ,
95+ batchProcessingEnabled: false ,
96+ handlers: new KafkaHandlerRoutingBuilder <
97+ typeof MY_TOPICS_CONFIG ,
98+ MyExecutionContext ,
99+ false
100+ >()
101+ .addConfig(
102+ ' my-topic' ,
103+ new KafkaHandlerConfig(MY_MESSAGE_SCHEMA , async (message , context ) => {
104+ // Handle message
105+ console .log (' Received:' , message .value )
106+ await context .userService .processEvent (message .value .payload .userId )
107+ }),
108+ )
109+ .build(),
110+ },
111+ executionContext ,
112+ )
113+ }
114+ }
115+
116+ // Usage
117+ const consumer = new MyConsumer (dependencies , { userService })
118+ await consumer .init ()
119+ // Consumer is now running and processing messages
120+ // ...
121+ await consumer .close ()
122+ ```
123+
19124See [ test consumer] ( test/consumer/PermissionConsumer.ts ) for an example of implementation.
20125
21126## Batch Processing
@@ -24,6 +129,52 @@ Kafka supports batch processing for improved throughput. To enable it, set `batc
24129
25130When batch processing is enabled, message handlers receive an array of messages instead of a single message.
26131
132+ ``` typescript
133+ export class MyBatchConsumer extends AbstractKafkaConsumer <
134+ typeof MY_TOPICS_CONFIG ,
135+ MyExecutionContext ,
136+ true // Enable batch processing
137+ > {
138+ constructor (
139+ dependencies : KafkaConsumerDependencies ,
140+ executionContext : MyExecutionContext ,
141+ ) {
142+ super (
143+ dependencies ,
144+ {
145+ kafka: {
146+ bootstrapBrokers: [' localhost:9092' ],
147+ clientId: ' my-app' ,
148+ },
149+ groupId: ' my-batch-consumer-group' ,
150+ batchProcessingEnabled: true ,
151+ batchProcessingOptions: {
152+ batchSize: 100 ,
153+ timeoutMilliseconds: 5000 ,
154+ },
155+ handlers: new KafkaHandlerRoutingBuilder <
156+ typeof MY_TOPICS_CONFIG ,
157+ MyExecutionContext ,
158+ true
159+ >()
160+ .addConfig(
161+ ' my-topic' ,
162+ new KafkaHandlerConfig(MY_MESSAGE_SCHEMA , async (messages , context ) => {
163+ // Handle batch of messages
164+ console .log (` Processing batch of ${messages .length } messages ` )
165+ for (const message of messages ) {
166+ await context .userService .processEvent (message .value .payload .userId )
167+ }
168+ }),
169+ )
170+ .build(),
171+ },
172+ executionContext ,
173+ )
174+ }
175+ }
176+ ```
177+
27178### Configuration Options
28179
29180- ` batchSize ` - Maximum number of messages per batch
@@ -38,3 +189,86 @@ Messages are buffered per topic-partition combination. Batches are processed whe
38189After successful batch processing, the offset of the last message in the batch is committed.
39190
40191See [ test batch consumer] ( test/consumer/PermissionBatchConsumer.ts ) for an example of implementation.
192+
193+ ## Configuration
194+
195+ ### KafkaConfig
196+
197+ ``` typescript
198+ type KafkaConfig = {
199+ bootstrapBrokers: string [] // List of Kafka broker addresses
200+ clientId: string // Client identifier
201+ ssl? : boolean | TLSConfig // SSL configuration
202+ sasl? : SASLOptions // SASL authentication
203+ connectTimeout? : number // Connection timeout in ms
204+ }
205+ ` ` `
206+
207+ ### Publisher Options
208+
209+ - ` kafka ` - Kafka connection configuration
210+ - ` topicsConfig ` - Array of topic configurations with schemas
211+ - ` autocreateTopics ` - Whether to auto-create topics (default: false)
212+
213+ ### Consumer Options
214+
215+ - ` kafka ` - Kafka connection configuration
216+ - ` groupId ` - Consumer group ID (required)
217+ - ` handlers ` - Handler routing configuration built with ` KafkaHandlerRoutingBuilder `
218+ - ` batchProcessingEnabled ` - Enable batch processing (default: false)
219+ - ` batchProcessingOptions ` - Batch configuration (required if batch processing enabled)
220+ - ` autocreateTopics ` - Whether to auto-create topics (default: false)
221+ - ` fromBeginning ` - Start consuming from beginning of topic (default: false)
222+ - ` sessionTimeout ` - Session timeout in ms
223+ - ` rebalanceTimeout ` - Rebalance timeout in ms
224+ - ` heartbeatInterval ` - Heartbeat interval in ms
225+
226+ ## Error Handling and Retries
227+
228+ The consumer implements an in-memory retry mechanism with exponential backoff:
229+ - Failed messages are retried up to 3 times
230+ - Backoff delay: 2^(retry-1) seconds between retries
231+ - After all retries are exhausted, the message is logged as an error
232+
233+ ## Message Format
234+
235+ Messages are deserialized and passed to handlers with the following structure:
236+
237+ ` ` ` typescript
238+ type DeserializedMessage <MessageValue > = {
239+ topic: string
240+ partition: number
241+ key: string | null
242+ value: MessageValue
243+ headers: Record <string , string | undefined >
244+ offset: string
245+ timestamp: string
246+ }
247+ ` ` `
248+
249+ ## Handler Routing
250+
251+ The ` KafkaHandlerRoutingBuilder ` provides a type-safe way to configure message handlers:
252+
253+ ` ` ` typescript
254+ const handlers = new KafkaHandlerRoutingBuilder <TopicsConfig , ExecutionContext , BatchEnabled >()
255+ .addConfig (' topic-1' , new KafkaHandlerConfig (SCHEMA_1 , handler1 ))
256+ .addConfig (' topic-2' , new KafkaHandlerConfig (SCHEMA_2 , handler2 ))
257+ .build ()
258+ ```
259+
260+ Each handler config requires:
261+ - A Zod schema for message validation
262+ - A handler function that receives the validated message(s) and execution context
263+
264+ ## Testing
265+
266+ Use the ` handlerSpy ` for testing message processing:
267+
268+ ``` typescript
269+ // Wait for a specific message to be processed
270+ const result = await consumer .handlerSpy .waitForMessageWithId (' message-123' , ' consumed' )
271+
272+ // Check if a message was processed without waiting
273+ const check = consumer .handlerSpy .checkForMessage ({ type: ' my.event' })
274+ ```
0 commit comments