Skip to content

Commit 4b304a0

Browse files
author
Rituparna Khaund
committed
out_s3: add format=parquet option with page-level compression
The compression config option conflated byte-level compression (gzip, zstd, snappy) with format conversion (parquet), making it impossible to produce Parquet files with page-level compression. This adds 'format parquet' as a new option. When format is parquet, the compression option (snappy, zstd, gzip) controls the page-level codec inside the Parquet file. Default is uncompressed to preserve existing behavior. The old 'compression=parquet' syntax is preserved as a deprecated alias that emits a warning and maps to format=parquet with no page-level compression (identical to current behavior). Arrow support is untouched and continues to work via 'compression=arrow' as before. Signed-off-by: Rituparna Khaund <ritukhau@amazon.co.uk>
1 parent 69b3dd9 commit 4b304a0

2 files changed

Lines changed: 153 additions & 42 deletions

File tree

plugins/out_s3/s3.c

Lines changed: 147 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
#include "s3.h"
4444
#include "s3_store.h"
45+
#include <fluent-bit/aws/flb_aws_compress.h>
4546

4647
#define DEFAULT_S3_PORT 443
4748
#define DEFAULT_S3_INSECURE_PORT 80
@@ -89,6 +90,41 @@ static flb_sds_t s3_format_event_chunk(struct flb_s3 *ctx,
8990
struct flb_event_chunk *event_chunk,
9091
struct flb_config *config);
9192

93+
static int enable_parquet_format(struct flb_s3 *ctx)
94+
{
95+
#ifdef FLB_HAVE_ARROW_PARQUET
96+
ctx->s3_format = FLB_S3_FORMAT_PARQUET;
97+
ctx->use_put_object = FLB_TRUE;
98+
return 0;
99+
#else
100+
flb_plg_error(ctx->ins,
101+
"parquet format requires parquet-glib at compile time");
102+
return -1;
103+
#endif
104+
}
105+
106+
static int parse_output_format(const char *format)
107+
{
108+
if (strcasecmp(format, "parquet") == 0) {
109+
return FLB_S3_FORMAT_PARQUET;
110+
}
111+
return flb_pack_to_json_format_type(format);
112+
}
113+
114+
static int map_to_parquet_codec(int compression_type)
115+
{
116+
switch (compression_type) {
117+
case FLB_AWS_COMPRESS_SNAPPY:
118+
return FLB_PARQUET_COMPRESSION_SNAPPY;
119+
case FLB_AWS_COMPRESS_ZSTD:
120+
return FLB_PARQUET_COMPRESSION_ZSTD;
121+
case FLB_AWS_COMPRESS_GZIP:
122+
return FLB_PARQUET_COMPRESSION_GZIP;
123+
default:
124+
return FLB_PARQUET_COMPRESSION_NONE;
125+
}
126+
}
127+
92128
static struct flb_aws_header *get_content_encoding_header(int compression_type)
93129
{
94130
static struct flb_aws_header gzip_header = {
@@ -655,6 +691,8 @@ static int cb_s3_init(struct flb_output_instance *ins,
655691
ctx->retry_time = 0;
656692
ctx->upload_queue_success = FLB_FALSE;
657693
ctx->out_format = FLB_PACK_JSON_FORMAT_LINES;
694+
ctx->s3_format = FLB_S3_FORMAT_JSON_LINES;
695+
ctx->parquet_compression = FLB_PARQUET_COMPRESSION_NONE;
658696

659697
/*
660698
* The engine default retry_limit (1) is too low for S3's internal
@@ -693,30 +731,37 @@ static int cb_s3_init(struct flb_output_instance *ins,
693731
/* Format key */
694732
tmp = flb_output_get_property("format", ins);
695733
if (tmp) {
696-
ret = flb_pack_to_json_format_type(tmp);
734+
ret = parse_output_format(tmp);
697735
if (ret == -1) {
698736
flb_plg_error(ctx->ins, "invalid format '%s'", tmp);
699737
return -1;
700738
}
701739

702-
if (ret == FLB_PACK_JSON_FORMAT_JSON) {
703-
flb_plg_warn(ctx->ins,
704-
"'json' format is implicitly interpreted as 'json_lines' before."
705-
"Now interpreted as 'json_lines' explicitly now");
706-
ret = FLB_PACK_JSON_FORMAT_LINES;
740+
if (ret == FLB_S3_FORMAT_PARQUET) {
741+
if (enable_parquet_format(ctx) == -1) {
742+
return -1;
743+
}
707744
}
708-
else if (ret != FLB_PACK_JSON_FORMAT_LINES &&
709-
ret != FLB_PACK_JSON_FORMAT_OTLP) {
710-
flb_plg_error(ctx->ins, "unsupported format '%s'", tmp);
711-
return -1;
745+
else if (ret == FLB_PACK_JSON_FORMAT_JSON) {
746+
flb_plg_warn(ctx->ins,
747+
"'json' format is implicitly interpreted as "
748+
"'json_lines'. Now interpreted as 'json_lines' "
749+
"explicitly");
750+
ctx->out_format = FLB_PACK_JSON_FORMAT_LINES;
712751
}
713-
ctx->out_format = ret;
752+
else if (ret == FLB_PACK_JSON_FORMAT_LINES ||
753+
ret == FLB_PACK_JSON_FORMAT_OTLP) {
754+
ctx->out_format = ret;
714755

715-
if (ctx->out_format == FLB_PACK_JSON_FORMAT_OTLP &&
716-
ctx->log_key != NULL) {
717-
flb_plg_error(ctx->ins,
718-
"'log_key' is not supported when format is "
719-
"otlp_json or otlp_json_pretty");
756+
if (ret == FLB_PACK_JSON_FORMAT_OTLP && ctx->log_key != NULL) {
757+
flb_plg_error(ctx->ins,
758+
"'log_key' is not supported when format is "
759+
"otlp_json or otlp_json_pretty");
760+
return -1;
761+
}
762+
}
763+
else {
764+
flb_plg_error(ctx->ins, "unsupported format '%s'", tmp);
720765
return -1;
721766
}
722767
}
@@ -805,19 +850,35 @@ static int cb_s3_init(struct flb_output_instance *ins,
805850

806851
tmp = flb_output_get_property("compression", ins);
807852
if (tmp) {
808-
ret = flb_aws_compression_get_type(tmp);
809-
if (ret == -1) {
810-
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
811-
return -1;
853+
if (strcasecmp(tmp, "parquet") == 0) {
854+
flb_plg_warn(ctx->ins,
855+
"'compression=parquet' is deprecated. "
856+
"Use 'format parquet' with 'compression' set to "
857+
"the desired page-level codec (snappy, zstd, gzip)");
858+
if (enable_parquet_format(ctx) == -1) {
859+
return -1;
860+
}
812861
}
813-
if (ctx->use_put_object == FLB_FALSE &&
814-
(ret == FLB_AWS_COMPRESS_ARROW ||
815-
ret == FLB_AWS_COMPRESS_PARQUET)) {
816-
flb_plg_error(ctx->ins,
817-
"use_put_object must be enabled when Apache Arrow or Parquet is enabled");
818-
return -1;
862+
else {
863+
ret = flb_aws_compression_get_type(tmp);
864+
if (ret == -1) {
865+
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
866+
return -1;
867+
}
868+
if (ctx->use_put_object == FLB_FALSE &&
869+
ret == FLB_AWS_COMPRESS_ARROW) {
870+
flb_plg_error(ctx->ins,
871+
"use_put_object must be enabled when "
872+
"Apache Arrow is enabled");
873+
return -1;
874+
}
875+
ctx->compression = ret;
876+
877+
if (ctx->s3_format == FLB_S3_FORMAT_PARQUET) {
878+
ctx->parquet_compression = map_to_parquet_codec(ret);
879+
ctx->compression = FLB_AWS_COMPRESS_NONE;
880+
}
819881
}
820-
ctx->compression = ret;
821882
}
822883

823884
tmp = flb_output_get_property("content_type", ins);
@@ -1252,9 +1313,28 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
12521313
file_first_log_time = chunk->first_log_time;
12531314
}
12541315

1316+
#ifdef FLB_HAVE_ARROW_PARQUET
1317+
if (ctx->s3_format == FLB_S3_FORMAT_PARQUET) {
1318+
ret = out_s3_compress_parquet(body, body_size, &payload_buf,
1319+
&payload_size,
1320+
ctx->parquet_compression);
1321+
if (ret == -1) {
1322+
flb_plg_error(ctx->ins, "Failed to convert data to Parquet");
1323+
if (chunk != NULL) {
1324+
s3_store_file_unlock(chunk);
1325+
chunk->failures += 1;
1326+
}
1327+
return FLB_RETRY;
1328+
}
1329+
preCompress_size = body_size;
1330+
body = (void *) payload_buf;
1331+
body_size = payload_size;
1332+
}
1333+
else
1334+
#endif
12551335
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
1256-
/* Map payload */
1257-
ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size);
1336+
ret = flb_aws_compression_compress(ctx->compression, body, body_size,
1337+
&payload_buf, &payload_size);
12581338
if (ret == -1) {
12591339
flb_plg_error(ctx->ins, "Failed to compress data");
12601340
if (chunk != NULL) {
@@ -1478,15 +1558,40 @@ static int put_all_chunks(struct flb_s3 *ctx)
14781558
return -1;
14791559
}
14801560

1561+
#ifdef FLB_HAVE_ARROW_PARQUET
1562+
if (ctx->s3_format == FLB_S3_FORMAT_PARQUET) {
1563+
ret = out_s3_compress_parquet(buffer, buffer_size,
1564+
&payload_buf, &payload_size,
1565+
ctx->parquet_compression);
1566+
if (ret == -1) {
1567+
flb_plg_error(ctx->ins,
1568+
"Failed to convert to Parquet, uploading "
1569+
"raw data to prevent data loss");
1570+
}
1571+
else {
1572+
flb_free(buffer);
1573+
buffer = (void *) payload_buf;
1574+
buffer_size = payload_size;
1575+
}
1576+
}
1577+
else
1578+
#endif
14811579
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
1482-
/* Map payload */
1483-
ret = flb_aws_compression_compress(ctx->compression, buffer, buffer_size, &payload_buf, &payload_size);
1580+
ret = flb_aws_compression_compress(ctx->compression,
1581+
buffer, buffer_size,
1582+
&payload_buf,
1583+
&payload_size);
14841584
if (ret == -1) {
1485-
flb_plg_error(ctx->ins, "Failed to compress data, uploading uncompressed data instead to prevent data loss");
1486-
} else {
1487-
flb_plg_info(ctx->ins, "Pre-compression chunk size is %zu, After compression, chunk is %zu bytes", buffer_size, payload_size);
1585+
flb_plg_error(ctx->ins,
1586+
"Failed to compress data, uploading "
1587+
"uncompressed data to prevent data loss");
1588+
}
1589+
else {
1590+
flb_plg_info(ctx->ins,
1591+
"Pre-compression chunk size is %zu, "
1592+
"After compression, chunk is %zu bytes",
1593+
buffer_size, payload_size);
14881594
flb_free(buffer);
1489-
14901595
buffer = (void *) payload_buf;
14911596
buffer_size = payload_size;
14921597
}
@@ -4076,7 +4181,9 @@ static struct flb_config_map config_map[] = {
40764181
{
40774182
FLB_CONFIG_MAP_STR, "format", "json_lines",
40784183
0, FLB_FALSE, 0,
4079-
"Set record output format. Supported values are json_lines, and otlp_json."
4184+
"Set output format. Supported values: json_lines, otlp_json, parquet. "
4185+
"When format is parquet, the 'compression' option controls the page-level "
4186+
"codec inside the Parquet file (snappy, zstd, gzip). Default: uncompressed."
40804187
},
40814188
{
40824189
FLB_CONFIG_MAP_STR, "json_date_format", NULL,
@@ -4147,12 +4254,10 @@ static struct flb_config_map config_map[] = {
41474254
{
41484255
FLB_CONFIG_MAP_STR, "compression", NULL,
41494256
0, FLB_FALSE, 0,
4150-
"Compression type for S3 objects. Supported values: 'gzip', 'zstd', 'snappy'. "
4151-
"'arrow' and 'parquet' are also available if Apache Arrow was enabled at compile time. "
4152-
"Defaults to no compression. "
4153-
"If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'. "
4154-
"If 'zstd' is selected, the Content-Encoding HTTP Header will be set to 'zstd'. "
4155-
"If 'snappy' is selected, the Content-Encoding HTTP Header will be set to 'snappy'."
4257+
"Compression type for S3 objects. Supported values: 'gzip', 'zstd', 'snappy', "
4258+
"'arrow'. When format is 'parquet', this sets the page-level codec inside the "
4259+
"Parquet file. 'compression=parquet' is deprecated; use 'format parquet' instead. "
4260+
"Defaults to no compression."
41564261
},
41574262
{
41584263
FLB_CONFIG_MAP_STR, "content_type", NULL,

plugins/out_s3/s3.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
#include <fluent-bit/flb_aws_util.h>
2828
#include <fluent-bit/flb_blob_db.h>
2929

30+
/* S3 output format types */
31+
#define FLB_S3_FORMAT_JSON_LINES 0
32+
#define FLB_S3_FORMAT_PARQUET 1
33+
3034
/* Upload data to S3 in 5MB chunks */
3135
#define MIN_CHUNKED_UPLOAD_SIZE 5242880
3236
#define MAX_CHUNKED_UPLOAD_SIZE 50000000
@@ -123,6 +127,8 @@ struct flb_s3 {
123127
int send_content_md5;
124128
int static_file_path;
125129
int compression;
130+
int s3_format;
131+
int parquet_compression;
126132
int port;
127133
int insecure;
128134
size_t store_dir_limit_size;

0 commit comments

Comments
 (0)