|
22 | 22 | #include <fluent-bit/flb_slist.h> |
23 | 23 | #include <fluent-bit/flb_time.h> |
24 | 24 | #include <fluent-bit/flb_pack.h> |
| 25 | +#include <fluent-bit/flb_opentelemetry.h> |
25 | 26 | #include <fluent-bit/flb_config_map.h> |
26 | 27 | #include <fluent-bit/flb_aws_util.h> |
27 | 28 | #include <fluent-bit/aws/flb_aws_compress.h> |
@@ -84,6 +85,10 @@ static void remove_from_queue(struct upload_queue *entry); |
84 | 85 |
|
85 | 86 | static int blob_initialize_authorization_endpoint_upstream(struct flb_s3 *context); |
86 | 87 |
|
| 88 | +static flb_sds_t s3_format_event_chunk(struct flb_s3 *ctx, |
| 89 | + struct flb_event_chunk *event_chunk, |
| 90 | + struct flb_config *config); |
| 91 | + |
87 | 92 | static struct flb_aws_header *get_content_encoding_header(int compression_type) |
88 | 93 | { |
89 | 94 | static struct flb_aws_header gzip_header = { |
@@ -649,6 +654,7 @@ static int cb_s3_init(struct flb_output_instance *ins, |
649 | 654 |
|
650 | 655 | ctx->retry_time = 0; |
651 | 656 | ctx->upload_queue_success = FLB_FALSE; |
| 657 | + ctx->out_format = FLB_PACK_JSON_FORMAT_LINES; |
652 | 658 |
|
653 | 659 | /* |
654 | 660 | * The engine default retry_limit (1) is too low for S3's internal |
@@ -684,6 +690,23 @@ static int cb_s3_init(struct flb_output_instance *ins, |
684 | 690 | "S3 has its own buffer files located in the store_dir."); |
685 | 691 | } |
686 | 692 |
|
| 693 | + /* Format key */ |
| 694 | + tmp = flb_output_get_property("format", ins); |
| 695 | + if (tmp) { |
| 696 | + ret = flb_pack_to_json_format_type(tmp); |
| 697 | + if (ret == -1) { |
| 698 | + flb_plg_error(ctx->ins, "invalid format '%s'", tmp); |
| 699 | + return -1; |
| 700 | + } |
| 701 | + |
| 702 | + if (ret != FLB_PACK_JSON_FORMAT_LINES && |
| 703 | + ret != FLB_PACK_JSON_FORMAT_OTLP) { |
| 704 | + flb_plg_error(ctx->ins, "unsupported format '%s'", tmp); |
| 705 | + return -1; |
| 706 | + } |
| 707 | + ctx->out_format = ret; |
| 708 | + } |
| 709 | + |
687 | 710 | /* Date key */ |
688 | 711 | ctx->date_key = ctx->json_date_key; |
689 | 712 | tmp = flb_output_get_property("json_date_key", ins); |
@@ -3765,6 +3788,58 @@ static int s3_timer_create(struct flb_s3 *ctx) |
3765 | 3788 | return 0; |
3766 | 3789 | } |
3767 | 3790 |
|
| 3791 | +static flb_sds_t s3_format_event_chunk(struct flb_s3 *ctx, |
| 3792 | + struct flb_event_chunk *event_chunk, |
| 3793 | + struct flb_config *config) |
| 3794 | +{ |
| 3795 | + int result; |
| 3796 | + static const char *default_logs_body_keys[] = {"log", "message"}; |
| 3797 | + struct flb_opentelemetry_otlp_logs_options options; |
| 3798 | + |
| 3799 | + if (ctx->out_format == FLB_PACK_JSON_FORMAT_OTLP) { |
| 3800 | + if (event_chunk->type == FLB_EVENT_TYPE_LOGS) { |
| 3801 | + memset(&options, 0, sizeof(options)); |
| 3802 | + options.logs_require_otel_metadata = FLB_FALSE; |
| 3803 | + options.logs_body_keys = default_logs_body_keys; |
| 3804 | + options.logs_body_key_count = 2; |
| 3805 | + options.logs_body_key_attributes = FLB_FALSE; |
| 3806 | + |
| 3807 | + return flb_opentelemetry_logs_to_otlp_json(event_chunk->data, |
| 3808 | + event_chunk->size, |
| 3809 | + &options, |
| 3810 | + &result); |
| 3811 | + } |
| 3812 | +#ifdef FLB_HAVE_METRICS |
| 3813 | + else if (event_chunk->type == FLB_EVENT_TYPE_METRICS) { |
| 3814 | + return flb_opentelemetry_metrics_msgpack_to_otlp_json(event_chunk->data, |
| 3815 | + event_chunk->size, |
| 3816 | + &result); |
| 3817 | + } |
| 3818 | +#endif |
| 3819 | + else if (event_chunk->type == FLB_EVENT_TYPE_TRACES) { |
| 3820 | + return flb_opentelemetry_traces_msgpack_to_otlp_json(event_chunk->data, |
| 3821 | + event_chunk->size, |
| 3822 | + &result); |
| 3823 | + } |
| 3824 | + |
| 3825 | + return NULL; |
| 3826 | + } |
| 3827 | + |
| 3828 | + if (ctx->log_key) { |
| 3829 | + return flb_pack_msgpack_extract_log_key(ctx, |
| 3830 | + event_chunk->data, |
| 3831 | + event_chunk->size, |
| 3832 | + config); |
| 3833 | + } |
| 3834 | + |
| 3835 | + return flb_pack_msgpack_to_json_format(event_chunk->data, |
| 3836 | + event_chunk->size, |
| 3837 | + FLB_PACK_JSON_FORMAT_LINES, |
| 3838 | + ctx->json_date_format, |
| 3839 | + ctx->date_key, |
| 3840 | + config->json_escape_unicode); |
| 3841 | +} |
| 3842 | + |
3768 | 3843 | static void cb_s3_flush(struct flb_event_chunk *event_chunk, |
3769 | 3844 | struct flb_output_flush *out_flush, |
3770 | 3845 | struct flb_input_instance *i_ins, |
@@ -3800,20 +3875,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk, |
3800 | 3875 | flush_init(ctx); |
3801 | 3876 |
|
3802 | 3877 | /* Process chunk */ |
3803 | | - if (ctx->log_key) { |
3804 | | - chunk = flb_pack_msgpack_extract_log_key(ctx, |
3805 | | - event_chunk->data, |
3806 | | - event_chunk->size, |
3807 | | - config); |
3808 | | - } |
3809 | | - else { |
3810 | | - chunk = flb_pack_msgpack_to_json_format(event_chunk->data, |
3811 | | - event_chunk->size, |
3812 | | - FLB_PACK_JSON_FORMAT_LINES, |
3813 | | - ctx->json_date_format, |
3814 | | - ctx->date_key, |
3815 | | - config->json_escape_unicode); |
3816 | | - } |
| 3878 | + chunk = s3_format_event_chunk(ctx, event_chunk, config); |
3817 | 3879 | if (chunk == NULL) { |
3818 | 3880 | flb_plg_error(ctx->ins, "Could not marshal msgpack to output string"); |
3819 | 3881 | FLB_OUTPUT_RETURN(FLB_ERROR); |
@@ -3999,6 +4061,11 @@ static int cb_s3_exit(void *data, struct flb_config *config) |
3999 | 4061 |
|
4000 | 4062 | /* Configuration properties map */ |
4001 | 4063 | static struct flb_config_map config_map[] = { |
| 4064 | + { |
| 4065 | + FLB_CONFIG_MAP_STR, "format", "json_lines", |
| 4066 | + 0, FLB_FALSE, 0, |
| 4067 | + "Set record output format. Supported values are json_lines and otlp_json." |
| 4068 | + }, |
4002 | 4069 | { |
4003 | 4070 | FLB_CONFIG_MAP_STR, "json_date_format", NULL, |
4004 | 4071 | 0, FLB_FALSE, 0, |
|
0 commit comments