Skip to content

Commit 2f8da15

Browse files
committed
feat: enhance Kafka configuration with dynamic topics prefix support
1 parent b76ff9f commit 2f8da15

8 files changed

Lines changed: 13 additions & 7 deletions

File tree

apps/chaingraph-execution-api/.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ EXECUTION_MODE=distributed
1010

1111
# Kafka (required for distributed mode)
1212
KAFKA_BROKERS=localhost:9092
13+
KAFKA_TOPICS_PREFIX=dev.
1314

1415
# Logging
1516
# debug, info, warn, error

apps/chaingraph-execution-api/src/config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export const config = {
2727
// Kafka configuration (needed for distributed mode)
2828
kafka: {
2929
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
30-
topicsPrefix: process.env.KAFKA_TOPICS_PREFIX || 'chaingraph',
30+
topicsPrefix: process.env.KAFKA_TOPICS_PREFIX || '',
3131
},
3232

3333
// Logging

apps/chaingraph-execution-worker/.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ DATABASE_URL_EXECUTIONS=postgres://postgres@0.0.0.0:5435/postgres?sslmode=disabl
1919

2020
# Kafka (required for distributed mode)
2121
KAFKA_BROKERS=localhost:9092
22+
KAFKA_TOPICS_PREFIX=dev.
2223

2324
# Health Monitoring
2425
# Port for health endpoint

apps/chaingraph-execution-worker/src/config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ export const config = {
3232
// Kafka configuration (needed for distributed mode)
3333
kafka: {
3434
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
35+
topicsPrefix: process.env.KAFKA_TOPICS_PREFIX || '',
3536
},
3637

3738
// Health and monitoring

apps/chaingraph-execution-worker/src/worker.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export async function startWorker(): Promise<void> {
2525
process.env.DATABASE_URL = config.databaseUrl
2626
process.env.DATABASE_URL_EXECUTIONS = config.databaseUrlExecutions
2727
process.env.KAFKA_BROKERS = config.kafka.brokers.join(',')
28+
process.env.KAFKA_TOPICS_PREFIX = config.kafka.topicsPrefix
2829
process.env.WORKER_ID = workerId
2930

3031
// Create services (EventBus, TaskQueue, ExecutionStore)

packages/chaingraph-executor/server/utils/config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ export const config = {
2828
kafka: {
2929
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092,localhost:9093,localhost:9094').split(','),
3030
clientId: process.env.KAFKA_CLIENT_ID || 'chaingraph-executor',
31+
topicsPrefix: process.env.KAFKA_TOPICS_PREFIX || '',
3132
groupId: {
3233
worker: process.env.KAFKA_GROUP_ID_WORKER || 'chaingraph-execution-workers',
3334
stream: process.env.KAFKA_GROUP_ID_STREAM || 'chaingraph-event-stream',

packages/chaingraph-executor/server/workers/ExecutionWorker.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,9 +373,7 @@ export class ExecutionWorker {
373373
heartbeatInterval: 3000,
374374
// Ultra-low latency optimizations
375375
maxWaitTimeInMs: 1, // 1ms for ultra-low latency
376-
fetchMinBytes: 1, // Don't wait for data accumulation
377-
fetchBatchSize: 1024 * 1024, // 1MB batch size
378-
maxBytesPerPartition: 1024 * 1024, // 1MB per partition
376+
allowAutoTopicCreation: false,
379377
})
380378

381379
await this.commandConsumer.connect()

packages/chaingraph-executor/types/messages.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import type {
1212
ExecutionOptions,
1313
IntegrationContext,
1414
} from '@badaitech/chaingraph-types'
15+
import process from 'node:process'
1516

1617
export enum ExecutionCommandType {
1718
CREATE = 'CREATE',
@@ -79,13 +80,15 @@ export interface ExecutionEventMessage {
7980
workerId: string // Which worker produced this
8081
}
8182

83+
const topicsPrefix = process.env.KAFKA_TOPICS_PREFIX || ''
84+
8285
/**
8386
* Kafka topic names
8487
*/
8588
export const KafkaTopics = {
86-
COMMANDS: '2.chaingraph.execution.commands',
87-
EVENTS: '2.chaingraph.execution.events',
88-
TASKS: '2.chaingraph.execution.tasks',
89+
COMMANDS: `${topicsPrefix}chaingraph.execution.commands`,
90+
EVENTS: `${topicsPrefix}chaingraph.execution.events`,
91+
TASKS: `${topicsPrefix}chaingraph.execution.tasks`,
8992
} as const
9093

9194
export type KafkaTopicName = typeof KafkaTopics[keyof typeof KafkaTopics]

0 commit comments

Comments
 (0)