File tree Expand file tree Collapse file tree
packages/kafka/load-tests Expand file tree Collapse file tree Original file line number Diff line number Diff line change 2424 "load:direct:batch:medium" : " node src/run-direct-batch.ts --rate 1000 --duration 60" ,
2525 "load:direct:batch:heavy" : " node src/run-direct-batch.ts --rate 5000 --duration 120"
2626 },
27+ "engines" : {
28+ "node" : " >=22.18.0"
29+ },
2730 "dependencies" : {
2831 "@message-queue-toolkit/kafka" : " file:../../kafka" ,
2932 "@message-queue-toolkit/core" : " file:../../core" ,
3639 "devDependencies" : {
3740 "@types/pg" : " ^8.16.0" ,
3841 "typescript" : " ^5.9.3"
39- },
40- "overrides" : {
41- "zod" : " $zod"
4242 }
4343}
Original file line number Diff line number Diff line change @@ -33,14 +33,27 @@ CREATE TABLE IF NOT EXISTS orders (
3333
3434SET CLUSTER SETTING kv.rangefeed.enabled = true;
3535
36+ SQL
37+
38+ # Check for existing running changefeeds before creating a new one
39+ EXISTING=$( /cockroach/cockroach sql --insecure --host=" $CRDB_HOST " --port=" $CRDB_PORT " \
40+ --format=csv -e " SELECT count(*) FROM [SHOW CHANGEFEED JOBS] WHERE status = 'running'" 2> /dev/null | tail -1)
41+
42+ if [ " ${EXISTING:- 0} " -gt 0 ]; then
43+ echo " Changefeed already exists, skipping creation."
44+ else
45+ /cockroach/cockroach sql --insecure --host=" $CRDB_HOST " --port=" $CRDB_PORT " << 'SQL '
46+ USE loadtest;
47+
3648CREATE CHANGEFEED FOR events, orders
3749 INTO 'kafka://kafka:9093'
3850 WITH format = json,
3951 updated,
4052 resolved = '5s',
4153 diff,
4254 kafka_sink_config = '{"Flush":{"MaxMessages":100,"Frequency":"500ms"}}';
43-
4455SQL
56+ echo " Changefeed created successfully."
57+ fi
4558
46- echo " Database, tables, and changefeed created successfully."
59+ echo " Database and tables initialized successfully."
You can’t perform that action at this time.
0 commit comments