Skip to content

Commit 16d46a7

Browse files
jinyongchoiedsiper
authored andcommitted
out_kafka: fix SIGSEGV and memory leak when rd_kafka_new() fails
Move mk_list_init(&ctx->topics) to early initialization to prevent SIGSEGV crash when rd_kafka_new() fails and flb_out_kafka_destroy() is called with an uninitialized topics list. Also fix memory leak of rd_kafka_conf_t by calling rd_kafka_conf_destroy() on failure path and in flb_out_kafka_destroy() for other error paths. Set ctx->conf to NULL after successful rd_kafka_new() since ownership is transferred to the rd_kafka_t handle. Signed-off-by: jinyong.choi <inimax801@gmail.com>
1 parent 07585fe commit 16d46a7

1 file changed

Lines changed: 9 additions & 1 deletion

File tree

plugins/out_kafka/kafka_config.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
4949
}
5050
ctx->ins = ins;
5151
ctx->blocked = FLB_FALSE;
52+
mk_list_init(&ctx->topics);
5253

5354
ret = flb_output_config_map_set(ins, (void*) ctx);
5455
if (ret == -1) {
@@ -239,9 +240,13 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
239240
if (!ctx->kafka.rk) {
240241
flb_plg_error(ctx->ins, "failed to create producer: %s",
241242
errstr);
243+
rd_kafka_conf_destroy(ctx->conf);
244+
ctx->conf = NULL;
242245
flb_out_kafka_destroy(ctx);
243246
return NULL;
244247
}
248+
/* rd_kafka_new() succeeded, conf ownership transferred to rk */
249+
ctx->conf = NULL;
245250

246251
#ifdef FLB_HAVE_AVRO_ENCODER
247252
/* Config AVRO */
@@ -256,7 +261,6 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
256261
#endif
257262

258263
/* Config: Topic */
259-
mk_list_init(&ctx->topics);
260264
tmp = flb_output_get_property("topics", ins);
261265
if (!tmp) {
262266
flb_kafka_topic_create(FLB_KAFKA_TOPIC, ctx);
@@ -304,6 +308,10 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx)
304308
rd_kafka_destroy(ctx->kafka.rk);
305309
}
306310

311+
if (ctx->conf) {
312+
rd_kafka_conf_destroy(ctx->conf);
313+
}
314+
307315
if (ctx->opaque) {
308316
flb_kafka_opaque_destroy(ctx->opaque);
309317
}

0 commit comments

Comments
 (0)