Skip to content

Commit 7928c53

Browse files
committed
out_s3: Handle otlp JSON format
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
1 parent 230eb3c commit 7928c53

2 files changed

Lines changed: 86 additions & 14 deletions

File tree

plugins/out_s3/s3.c

Lines changed: 85 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <fluent-bit/flb_slist.h>
2323
#include <fluent-bit/flb_time.h>
2424
#include <fluent-bit/flb_pack.h>
25+
#include <fluent-bit/flb_opentelemetry.h>
2526
#include <fluent-bit/flb_config_map.h>
2627
#include <fluent-bit/flb_aws_util.h>
2728
#include <fluent-bit/aws/flb_aws_compress.h>
@@ -84,6 +85,10 @@ static void remove_from_queue(struct upload_queue *entry);
8485

8586
static int blob_initialize_authorization_endpoint_upstream(struct flb_s3 *context);
8687

88+
static flb_sds_t s3_format_event_chunk(struct flb_s3 *ctx,
89+
struct flb_event_chunk *event_chunk,
90+
struct flb_config *config);
91+
8792
static struct flb_aws_header *get_content_encoding_header(int compression_type)
8893
{
8994
static struct flb_aws_header gzip_header = {
@@ -649,6 +654,7 @@ static int cb_s3_init(struct flb_output_instance *ins,
649654

650655
ctx->retry_time = 0;
651656
ctx->upload_queue_success = FLB_FALSE;
657+
ctx->out_format = FLB_PACK_JSON_FORMAT_LINES;
652658

653659
/*
654660
* The engine default retry_limit (1) is too low for S3's internal
@@ -684,6 +690,27 @@ static int cb_s3_init(struct flb_output_instance *ins,
684690
"S3 has its own buffer files located in the store_dir.");
685691
}
686692

693+
/* Format key */
694+
tmp = flb_output_get_property("format", ins);
695+
if (tmp) {
696+
ret = flb_pack_to_json_format_type(tmp);
697+
if (ret == -1) {
698+
flb_plg_error(ctx->ins, "invalid format '%s'", tmp);
699+
return -1;
700+
}
701+
702+
if (ret != FLB_PACK_JSON_FORMAT_LINES &&
703+
ret != FLB_PACK_JSON_FORMAT_OTLP) {
704+
flb_plg_error(ctx->ins, "unsupported format '%s'", tmp);
705+
return -1;
706+
}
707+
ctx->out_format = ret;
708+
}
709+
710+
if (ctx->out_format == FLB_PACK_JSON_FORMAT_OTLP) {
711+
ins->event_type |= FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES;
712+
}
713+
687714
/* Date key */
688715
ctx->date_key = ctx->json_date_key;
689716
tmp = flb_output_get_property("json_date_key", ins);
@@ -3765,6 +3792,58 @@ static int s3_timer_create(struct flb_s3 *ctx)
37653792
return 0;
37663793
}
37673794

3795+
static flb_sds_t s3_format_event_chunk(struct flb_s3 *ctx,
3796+
struct flb_event_chunk *event_chunk,
3797+
struct flb_config *config)
3798+
{
3799+
int result;
3800+
static const char *default_logs_body_keys[] = {"log", "message"};
3801+
struct flb_opentelemetry_otlp_logs_options options;
3802+
3803+
if (ctx->out_format == FLB_PACK_JSON_FORMAT_OTLP) {
3804+
if (event_chunk->type == FLB_EVENT_TYPE_LOGS) {
3805+
memset(&options, 0, sizeof(options));
3806+
options.logs_require_otel_metadata = FLB_FALSE;
3807+
options.logs_body_keys = default_logs_body_keys;
3808+
options.logs_body_key_count = 2;
3809+
options.logs_body_key_attributes = FLB_FALSE;
3810+
3811+
return flb_opentelemetry_logs_to_otlp_json(event_chunk->data,
3812+
event_chunk->size,
3813+
&options,
3814+
&result);
3815+
}
3816+
#ifdef FLB_HAVE_METRICS
3817+
else if (event_chunk->type == FLB_EVENT_TYPE_METRICS) {
3818+
return flb_opentelemetry_metrics_msgpack_to_otlp_json(event_chunk->data,
3819+
event_chunk->size,
3820+
&result);
3821+
}
3822+
#endif
3823+
else if (event_chunk->type == FLB_EVENT_TYPE_TRACES) {
3824+
return flb_opentelemetry_traces_msgpack_to_otlp_json(event_chunk->data,
3825+
event_chunk->size,
3826+
&result);
3827+
}
3828+
3829+
return NULL;
3830+
}
3831+
3832+
if (ctx->log_key) {
3833+
return flb_pack_msgpack_extract_log_key(ctx,
3834+
event_chunk->data,
3835+
event_chunk->size,
3836+
config);
3837+
}
3838+
3839+
return flb_pack_msgpack_to_json_format(event_chunk->data,
3840+
event_chunk->size,
3841+
FLB_PACK_JSON_FORMAT_LINES,
3842+
ctx->json_date_format,
3843+
ctx->date_key,
3844+
config->json_escape_unicode);
3845+
}
3846+
37683847
static void cb_s3_flush(struct flb_event_chunk *event_chunk,
37693848
struct flb_output_flush *out_flush,
37703849
struct flb_input_instance *i_ins,
@@ -3800,20 +3879,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
38003879
flush_init(ctx);
38013880

38023881
/* Process chunk */
3803-
if (ctx->log_key) {
3804-
chunk = flb_pack_msgpack_extract_log_key(ctx,
3805-
event_chunk->data,
3806-
event_chunk->size,
3807-
config);
3808-
}
3809-
else {
3810-
chunk = flb_pack_msgpack_to_json_format(event_chunk->data,
3811-
event_chunk->size,
3812-
FLB_PACK_JSON_FORMAT_LINES,
3813-
ctx->json_date_format,
3814-
ctx->date_key,
3815-
config->json_escape_unicode);
3816-
}
3882+
chunk = s3_format_event_chunk(ctx, event_chunk, config);
38173883
if (chunk == NULL) {
38183884
flb_plg_error(ctx->ins, "Could not marshal msgpack to output string");
38193885
FLB_OUTPUT_RETURN(FLB_ERROR);
@@ -3999,6 +4065,11 @@ static int cb_s3_exit(void *data, struct flb_config *config)
39994065

40004066
/* Configuration properties map */
40014067
static struct flb_config_map config_map[] = {
4068+
{
4069+
FLB_CONFIG_MAP_STR, "format", "json_lines",
4070+
0, FLB_FALSE, 0,
4071+
"Set record output format. Supported values are json_lines and otlp_json."
4072+
},
40024073
{
40034074
FLB_CONFIG_MAP_STR, "json_date_format", NULL,
40044075
0, FLB_FALSE, 0,

plugins/out_s3/s3.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ struct flb_s3 {
153153
struct flb_tls *client_tls;
154154

155155
struct flb_aws_client *s3_client;
156+
int out_format;
156157
int json_date_format;
157158
flb_sds_t json_date_key;
158159
flb_sds_t date_key;

0 commit comments

Comments
 (0)