3232
3333#include "zerobus.h"
3434
35+ /* Ensure the URL has an http:// or https:// scheme prefix. Returns a new sds string. */
3536static flb_sds_t ensure_url_scheme (const char * url )
3637{
3738 size_t url_len ;
@@ -55,6 +56,7 @@ static flb_sds_t ensure_url_scheme(const char *url)
5556 return out ;
5657}
5758
59+ /* Format a flb_time as an RFC 3339 timestamp with nanosecond precision into buf. */
5860static int format_timestamp_rfc3339nano (struct flb_time * tm ,
5961 char * buf , size_t size )
6062{
@@ -71,6 +73,7 @@ static int format_timestamp_rfc3339nano(struct flb_time *tm,
7173 (unsigned long ) tm -> tm .tv_nsec );
7274}
7375
76+ /* Return 1 if the given key matches any entry in the log_keys list, 0 otherwise. */
7477static int key_in_log_keys (const char * key , int key_len ,
7578 struct mk_list * log_keys )
7679{
@@ -87,6 +90,7 @@ static int key_in_log_keys(const char *key, int key_len,
8790 return 0 ;
8891}
8992
93+ /* Return true if msgpack object k is a string equal to name. */
9094static inline int str_key_equals (const msgpack_object * k ,
9195 const char * name , int name_len )
9296{
@@ -241,6 +245,7 @@ static flb_sds_t record_to_json(struct flb_out_zerobus *ctx,
241245 return json ;
242246}
243247
248+ /* Initialize the ZeroBus output plugin: validate config, create SDK and stream. */
244249static int cb_zerobus_init (struct flb_output_instance * ins ,
245250 struct flb_config * config ,
246251 void * data )
@@ -358,6 +363,7 @@ static int cb_zerobus_init(struct flb_output_instance *ins,
358363 return -1 ;
359364}
360365
366+ /* Flush callback: convert log events to JSON and ingest them via ZeroBus. */
361367static void cb_zerobus_flush (struct flb_event_chunk * event_chunk ,
362368 struct flb_output_flush * out_flush ,
363369 struct flb_input_instance * i_ins ,
@@ -421,8 +427,16 @@ static void cb_zerobus_flush(struct flb_event_chunk *event_chunk,
421427 flb_sds_t * tmp = flb_realloc (json_records ,
422428 sizeof (flb_sds_t ) * new_cap );
423429 if (!tmp ) {
430+ flb_plg_error (ctx -> ins ,
431+ "realloc failed, retrying entire batch" );
424432 flb_sds_destroy (json );
425- break ;
433+ for (i = 0 ; i < num_records ; i ++ ) {
434+ flb_sds_destroy (json_records [i ]);
435+ }
436+ flb_free (json_records );
437+ msgpack_sbuffer_destroy (& sbuf );
438+ flb_log_event_decoder_destroy (& log_decoder );
439+ FLB_OUTPUT_RETURN (FLB_RETRY );
426440 }
427441 json_records = tmp ;
428442 capacity = new_cap ;
@@ -480,6 +494,7 @@ static void cb_zerobus_flush(struct flb_event_chunk *event_chunk,
480494 FLB_OUTPUT_RETURN (ret );
481495}
482496
497+ /* Cleanup callback: close the stream, free SDK resources and plugin context. */
483498static int cb_zerobus_exit (void * data , struct flb_config * config )
484499{
485500 struct flb_out_zerobus * ctx = data ;
0 commit comments