Skip to content

Commit e4afdf3

Browse files
author
Rituparna Khaund
committed
aws: add page-level compression support to parquet writer
The Parquet writer passed NULL for GParquetWriterProperties, causing all output files to have UNCOMPRESSED pages. This adds a parquet_compression parameter that configures the page-level codec (snappy, gzip, zstd, or none) via the GLib Parquet writer properties API. The parquet entry is removed from the compression dispatch table since the new function signature includes the codec parameter and no longer matches the generic compress function pointer type. The S3 plugin now calls out_s3_compress_parquet directly. Signed-off-by: Rituparna Khaund <ritukhau@amazon.co.uk>
1 parent 19cc8f9 commit e4afdf3

4 files changed

Lines changed: 49 additions & 20 deletions

File tree

include/fluent-bit/aws/flb_aws_compress.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@
2828
#define FLB_AWS_COMPRESS_ZSTD 4
2929
#define FLB_AWS_COMPRESS_SNAPPY 5
3030

31+
/* Parquet page-level compression codecs */
32+
#define FLB_PARQUET_COMPRESSION_NONE 0
33+
#define FLB_PARQUET_COMPRESSION_SNAPPY 1
34+
#define FLB_PARQUET_COMPRESSION_GZIP 2
35+
#define FLB_PARQUET_COMPRESSION_ZSTD 3
36+
3137
/*
3238
* Get compression type from compression keyword. The return value is used to identify
3339
* what compression option to utilize.

src/aws/compression/arrow/compress.c

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,29 @@ int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_s
170170
}
171171

172172
#ifdef FLB_HAVE_ARROW_PARQUET
173-
static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table)
173+
#include <fluent-bit/aws/flb_aws_compress.h>
174+
175+
static GArrowCompressionType parquet_compression_to_garrow(int parquet_compression)
176+
{
177+
switch (parquet_compression) {
178+
case FLB_PARQUET_COMPRESSION_SNAPPY:
179+
return GARROW_COMPRESSION_TYPE_SNAPPY;
180+
case FLB_PARQUET_COMPRESSION_GZIP:
181+
return GARROW_COMPRESSION_TYPE_GZIP;
182+
case FLB_PARQUET_COMPRESSION_ZSTD:
183+
return GARROW_COMPRESSION_TYPE_ZSTD;
184+
default:
185+
return GARROW_COMPRESSION_TYPE_UNCOMPRESSED;
186+
}
187+
}
188+
189+
static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table,
190+
int parquet_compression)
174191
{
175192
GArrowResizableBuffer *buffer;
176193
GArrowBufferOutputStream *sink;
177194
GParquetArrowFileWriter *writer;
195+
GParquetWriterProperties *props;
178196
GArrowSchema *schema;
179197
GError *error = NULL;
180198
gboolean success;
@@ -199,14 +217,19 @@ static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table)
199217
return NULL;
200218
}
201219

202-
/* Create a new Parquet file writer */
220+
props = gparquet_writer_properties_new();
221+
gparquet_writer_properties_set_compression(
222+
props, parquet_compression_to_garrow(parquet_compression), NULL);
223+
203224
writer = gparquet_arrow_file_writer_new_arrow(schema,
204225
GARROW_OUTPUT_STREAM(sink),
205-
NULL, /* Arrow writer properties */
226+
props,
206227
&error);
207228
g_object_unref(schema);
229+
g_object_unref(props);
208230
if (writer == NULL) {
209-
flb_error("[aws][compress] Failed to create parquet writer: %s", error->message);
231+
flb_error("[aws][compress] Failed to create parquet writer: %s",
232+
error->message);
210233
g_error_free(error);
211234
g_object_unref(buffer);
212235
g_object_unref(sink);
@@ -215,18 +238,18 @@ static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table)
215238

216239
n_rows = garrow_table_get_n_rows(table);
217240

218-
/* Write the entire table to the Parquet file buffer */
219-
success = gparquet_arrow_file_writer_write_table(writer, table, n_rows, &error);
241+
success = gparquet_arrow_file_writer_write_table(writer, table,
242+
n_rows, &error);
220243
if (!success) {
221-
flb_error("[aws][compress] Failed to write table to parquet buffer: %s", error->message);
244+
flb_error("[aws][compress] Failed to write table to parquet "
245+
"buffer: %s", error->message);
222246
g_error_free(error);
223247
g_object_unref(buffer);
224248
g_object_unref(sink);
225249
g_object_unref(writer);
226250
return NULL;
227251
}
228252

229-
/* Close the writer to finalize the Parquet file metadata */
230253
success = gparquet_arrow_file_writer_close(writer, &error);
231254
if (!success) {
232255
g_error_free(error);
@@ -242,7 +265,8 @@ static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table)
242265
}
243266

244267

245-
int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out_size)
268+
int out_s3_compress_parquet(void *json, size_t size, void **out_buf,
269+
size_t *out_size, int parquet_compression)
246270
{
247271
GArrowTable *table;
248272
GArrowResizableBuffer *buffer;
@@ -253,14 +277,16 @@ int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out
253277

254278
table = parse_json((uint8_t *) json, size);
255279
if (table == NULL) {
256-
flb_error("[aws][compress] Failed to parse JSON into Arrow Table for Parquet conversion");
280+
flb_error("[aws][compress] Failed to parse JSON into Arrow Table"
281+
" for Parquet conversion");
257282
return -1;
258283
}
259284

260-
buffer = table_to_parquet_buffer(table);
285+
buffer = table_to_parquet_buffer(table, parquet_compression);
261286
g_object_unref(table);
262287
if (buffer == NULL) {
263-
flb_error("[aws][compress] Failed to convert Arrow Table into Parquet buffer");
288+
flb_error("[aws][compress] Failed to convert Arrow Table into"
289+
" Parquet buffer");
264290
return -1;
265291
}
266292

src/aws/compression/arrow/compress.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_s
2121
* `size` is the length of the json data (excluding the trailing
2222
* null-terminator character).
2323
*
24+
* `parquet_compression` specifies the page-level codec to use
25+
* (FLB_PARQUET_COMPRESSION_*).
26+
*
2427
* Return 0 on success (with `out_buf` and `out_size` updated),
2528
* and -1 on failure
2629
*/
27-
int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out_size);
30+
int out_s3_compress_parquet(void *json, size_t size, void **out_buf,
31+
size_t *out_size, int parquet_compression);
2832
#endif

src/aws/flb_aws_compress.c

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,6 @@ static const struct compression_option compression_options[] = {
8383
"arrow",
8484
&out_s3_compress_arrow
8585
},
86-
#endif
87-
#ifdef FLB_HAVE_ARROW_PARQUET
88-
{
89-
FLB_AWS_COMPRESS_PARQUET,
90-
"parquet",
91-
&out_s3_compress_parquet
92-
},
9386
#endif
9487
{ 0 }
9588
};

0 commit comments

Comments
 (0)