Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions include/fluent-bit/flb_input_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,21 @@ int flb_input_chunk_append_raw(struct flb_input_instance *in,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size);
int flb_input_chunk_append_raw_local(struct flb_input_instance *in,
int event_type,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size);
int flb_input_chunk_ring_buffer_enqueue(struct flb_input_instance *in,
int event_type,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size);
int flb_input_chunk_ring_buffer_enqueue_log_routing(struct flb_input_instance *in,
int event_type,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size);

const void *flb_input_chunk_flush(struct flb_input_chunk *ic, size_t *size);
int flb_input_chunk_release_lock(struct flb_input_chunk *ic);
Expand Down
7 changes: 7 additions & 0 deletions include/fluent-bit/flb_input_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,11 @@ int flb_input_log_append_skip_processor_stages(struct flb_input_instance *ins,
size_t tag_len,
const void *buf,
size_t buf_size);

int flb_input_log_append_processed(struct flb_input_instance *ins,
size_t records,
const char *tag,
size_t tag_len,
const void *buf,
size_t buf_size);
#endif
49 changes: 44 additions & 5 deletions plugins/in_opentelemetry/opentelemetry_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,13 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
uint8_t *in_buf,
size_t in_size)
{
int ret;
int ret = 0;
int len;
int resource_logs_index;
int scope_log_index;
int log_record_index;
char *logs_body_key;
int scope_has_schema_url;
struct flb_mp_map_header mh;
struct flb_mp_map_header mh_tmp;
struct flb_time tm;
Expand Down Expand Up @@ -356,6 +357,11 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
}

resource_logs = input_logs->resource_logs;
if (input_logs->n_resource_logs == 0) {
ret = 0;
goto binary_payload_to_msgpack_end;
}

if (resource_logs == NULL) {
flb_plg_warn(ctx->ins, "no resource logs found");
ret = -1;
Expand All @@ -364,6 +370,11 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,

for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) {
resource_log = resource_logs[resource_logs_index];
if (resource_log == NULL) {
flb_plg_warn(ctx->ins, "null resource logs entry found");
ret = -1;
goto binary_payload_to_msgpack_end;
}

resource = resource_log->resource;
scope_logs = resource_log->scope_logs;
Expand All @@ -376,8 +387,18 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,

for (scope_log_index = 0; scope_log_index < resource_log->n_scope_logs; scope_log_index++) {
scope_log = scope_logs[scope_log_index];
if (scope_log == NULL) {
flb_plg_warn(ctx->ins, "null scope logs entry found");
ret = -1;
goto binary_payload_to_msgpack_end;
}

log_records = scope_log->log_records;

if (scope_log->n_log_records == 0) {
continue;
}

if (log_records == NULL) {
flb_plg_warn(ctx->ins, "no log records found");
ret = -1;
Expand Down Expand Up @@ -452,11 +473,18 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,

/* Scope */
scope = scope_log->scope;
scope_has_schema_url = FLB_FALSE;

if (scope && (scope->name || scope->version || scope->n_attributes > 0)) {
if (scope_log->schema_url && strlen(scope_log->schema_url) > 0) {
scope_has_schema_url = FLB_TRUE;
}

if (scope && (scope->name || scope->version ||
scope->n_attributes > 0 || scope->dropped_attributes_count > 0 ||
scope_has_schema_url == FLB_TRUE)) {
flb_mp_map_header_init(&mh_tmp, mp_pck);

if (scope_log->schema_url && strlen(scope_log->schema_url) > 0) {
if (scope_has_schema_url == FLB_TRUE) {
flb_mp_map_header_append(&mh_tmp);
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "schema_url", 10);
Expand Down Expand Up @@ -508,8 +536,19 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
flb_mp_map_header_end(&mh_tmp);
}
else {
/* set an empty scope */
msgpack_pack_map(mp_pck, 0);
flb_mp_map_header_init(&mh_tmp, mp_pck);

if (scope_has_schema_url == FLB_TRUE) {
flb_mp_map_header_append(&mh_tmp);
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "schema_url", 10);

len = strlen(scope_log->schema_url);
msgpack_pack_str(mp_pck, len);
msgpack_pack_str_body(mp_pck, scope_log->schema_url, len);
}

flb_mp_map_header_end(&mh_tmp);
}

flb_mp_map_header_end(&mh);
Expand Down
6 changes: 5 additions & 1 deletion plugins/out_opentelemetry/opentelemetry_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,11 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx)
if (ctx->oauth2_ctx) {
flb_oauth2_destroy(ctx->oauth2_ctx);
}
flb_oauth2_config_destroy(&ctx->oauth2_config);
/*
* ctx->oauth2_config strings are populated through the output config map,
* so flb_config_map_destroy() owns their release. The OAuth2 runtime
* context clones the strings it needs.
*/

flb_free(ctx->proxy_host);
flb_free(ctx);
Expand Down
Loading
Loading