Skip to content

Commit 2f9af92

Browse files
Rituparna KhaundRituparna Khaund
authored andcommitted
tests: internal: aws_compress: add parquet page-level compression tests
Add unit tests for out_s3_compress_parquet covering snappy, zstd, gzip and uncompressed page-level codecs, verifying the PAR1 magic markers and that compression reduces the output size for repetitive input. Signed-off-by: Rituparna Khaund <ritukhau@amazon.co.uk>
1 parent 59f6103 commit 2f9af92

1 file changed

Lines changed: 131 additions & 7 deletions

File tree

tests/internal/aws_compress.c

Lines changed: 131 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ void test_compression_snappy_return_value_normalization()
105105
{
106106
/* This test verifies that the snappy wrapper correctly normalizes return values
107107
* to conform to the AWS compression interface contract: -1 on error, 0 on success.
108-
*
108+
*
109109
* The test uses the actual flb_aws_compression_compress function which internally
110110
* uses the wrapper. We verify that successful compression returns exactly 0,
111111
* demonstrating that the wrapper properly normalizes the return value.
@@ -115,17 +115,17 @@ void test_compression_snappy_return_value_normalization()
115115
size_t out_len = 0;
116116
int compression_type;
117117
char test_data[] = "test data for compression";
118-
118+
119119
compression_type = flb_aws_compression_get_type("snappy");
120120
TEST_CHECK(compression_type != -1);
121-
121+
122122
/* Test successful compression - should return exactly 0 (not any other value) */
123-
ret = flb_aws_compression_compress(compression_type, test_data,
123+
ret = flb_aws_compression_compress(compression_type, test_data,
124124
strlen(test_data), &out_data, &out_len);
125125
TEST_CHECK(ret == 0);
126126
TEST_MSG("Expected return value 0 on success, got: %d", ret);
127127
TEST_MSG("This verifies the wrapper returns 0 (not passthrough of underlying function)");
128-
128+
129129
if (ret == 0 && out_data != NULL) {
130130
TEST_CHECK(out_len > 0);
131131
TEST_MSG("Compressed data length: %zu", out_len);
@@ -334,6 +334,122 @@ void test_b64_truncated_gzip_boundary()
334334
flb_aws_compress_truncate_b64_test_cases__gzip_decode(cases, 40);
335335
}
336336

337+
#ifdef FLB_HAVE_ARROW_PARQUET
338+
void test_parquet_format_snappy()
339+
{
340+
int ret;
341+
void *out_buf = NULL;
342+
size_t out_size = 0;
343+
char *json = "{\"key\":\"value\",\"num\":42}\n"
344+
"{\"key\":\"other\",\"num\":99}\n";
345+
size_t json_len = strlen(json);
346+
347+
ret = out_s3_compress_parquet(json, json_len, &out_buf, &out_size,
348+
FLB_PARQUET_COMPRESSION_SNAPPY);
349+
if (!TEST_CHECK(ret == 0 && out_buf != NULL && out_size >= 8)) {
350+
TEST_MSG("Parquet SNAPPY conversion failed");
351+
return;
352+
}
353+
TEST_CHECK(memcmp(out_buf, "PAR1", 4) == 0);
354+
TEST_CHECK(memcmp((char *)out_buf + out_size - 4, "PAR1", 4) == 0);
355+
flb_free(out_buf);
356+
}
357+
358+
void test_parquet_format_zstd()
359+
{
360+
int ret;
361+
void *out_buf = NULL;
362+
size_t out_size = 0;
363+
char *json = "{\"key\":\"value\",\"num\":42}\n"
364+
"{\"key\":\"other\",\"num\":99}\n";
365+
size_t json_len = strlen(json);
366+
367+
ret = out_s3_compress_parquet(json, json_len, &out_buf, &out_size,
368+
FLB_PARQUET_COMPRESSION_ZSTD);
369+
if (!TEST_CHECK(ret == 0 && out_buf != NULL && out_size >= 8)) {
370+
TEST_MSG("Parquet ZSTD conversion failed");
371+
return;
372+
}
373+
TEST_CHECK(memcmp(out_buf, "PAR1", 4) == 0);
374+
TEST_CHECK(memcmp((char *)out_buf + out_size - 4, "PAR1", 4) == 0);
375+
flb_free(out_buf);
376+
}
377+
378+
void test_parquet_format_gzip()
379+
{
380+
int ret;
381+
void *out_buf = NULL;
382+
size_t out_size = 0;
383+
char *json = "{\"key\":\"value\",\"num\":42}\n"
384+
"{\"key\":\"other\",\"num\":99}\n";
385+
size_t json_len = strlen(json);
386+
387+
ret = out_s3_compress_parquet(json, json_len, &out_buf, &out_size,
388+
FLB_PARQUET_COMPRESSION_GZIP);
389+
if (!TEST_CHECK(ret == 0 && out_buf != NULL && out_size >= 8)) {
390+
TEST_MSG("Parquet GZIP conversion failed");
391+
return;
392+
}
393+
TEST_CHECK(memcmp(out_buf, "PAR1", 4) == 0);
394+
TEST_CHECK(memcmp((char *)out_buf + out_size - 4, "PAR1", 4) == 0);
395+
flb_free(out_buf);
396+
}
397+
398+
void test_parquet_format_uncompressed()
399+
{
400+
int ret;
401+
void *out_buf = NULL;
402+
size_t out_size = 0;
403+
char *json = "{\"key\":\"value\",\"num\":42}\n"
404+
"{\"key\":\"other\",\"num\":99}\n";
405+
size_t json_len = strlen(json);
406+
407+
ret = out_s3_compress_parquet(json, json_len, &out_buf, &out_size,
408+
FLB_PARQUET_COMPRESSION_NONE);
409+
if (!TEST_CHECK(ret == 0 && out_buf != NULL && out_size >= 8)) {
410+
TEST_MSG("Parquet NONE conversion failed");
411+
return;
412+
}
413+
TEST_CHECK(memcmp(out_buf, "PAR1", 4) == 0);
414+
TEST_CHECK(memcmp((char *)out_buf + out_size - 4, "PAR1", 4) == 0);
415+
flb_free(out_buf);
416+
}
417+
418+
void test_parquet_compression_reduces_size()
419+
{
420+
int ret;
421+
void *buf_none = NULL;
422+
void *buf_snappy = NULL;
423+
size_t size_none = 0;
424+
size_t size_snappy = 0;
425+
char *json = "{\"msg\":\"hello hello hello hello hello hello\"}\n"
426+
"{\"msg\":\"hello hello hello hello hello hello\"}\n"
427+
"{\"msg\":\"hello hello hello hello hello hello\"}\n"
428+
"{\"msg\":\"hello hello hello hello hello hello\"}\n"
429+
"{\"msg\":\"hello hello hello hello hello hello\"}\n";
430+
size_t json_len = strlen(json);
431+
432+
ret = out_s3_compress_parquet(json, json_len, &buf_none, &size_none,
433+
FLB_PARQUET_COMPRESSION_NONE);
434+
if (!TEST_CHECK(ret == 0 && buf_none != NULL)) {
435+
TEST_MSG("Parquet NONE conversion failed");
436+
return;
437+
}
438+
439+
ret = out_s3_compress_parquet(json, json_len, &buf_snappy, &size_snappy,
440+
FLB_PARQUET_COMPRESSION_SNAPPY);
441+
if (!TEST_CHECK(ret == 0 && buf_snappy != NULL)) {
442+
TEST_MSG("Parquet SNAPPY conversion failed");
443+
flb_free(buf_none);
444+
return;
445+
}
446+
TEST_CHECK(size_snappy <= size_none);
447+
448+
flb_free(buf_none);
449+
flb_free(buf_snappy);
450+
}
451+
#endif
452+
337453
TEST_LIST = {
338454
{ "test_compression_gzip", test_compression_gzip },
339455
{ "test_compression_zstd", test_compression_zstd },
@@ -352,6 +468,14 @@ TEST_LIST = {
352468
test_b64_truncated_gzip_truncation_multi_rounds },
353469
{ "test_b64_truncated_gzip_boundary",
354470
test_b64_truncated_gzip_boundary },
471+
#ifdef FLB_HAVE_ARROW_PARQUET
472+
{ "test_parquet_format_snappy", test_parquet_format_snappy },
473+
{ "test_parquet_format_zstd", test_parquet_format_zstd },
474+
{ "test_parquet_format_gzip", test_parquet_format_gzip },
475+
{ "test_parquet_format_uncompressed", test_parquet_format_uncompressed },
476+
{ "test_parquet_compression_reduces_size",
477+
test_parquet_compression_reduces_size },
478+
#endif
355479
{ 0 }
356480
};
357481

@@ -419,8 +543,8 @@ static void flb_aws_compress_general_test_cases(int test_type,
419543
while (tcase->compression_keyword != 0) {
420544

421545
size_t in_data_len = strlen(tcase->in_data);
422-
compression_type = flb_aws_compression_get_type(tcase->compression_keyword);
423-
546+
compression_type = flb_aws_compression_get_type(tcase->compression_keyword);
547+
424548
TEST_CHECK(compression_type != -1);
425549
TEST_MSG("| flb_aws_get_compression_type: failed to get compression type for "
426550
"keyword "

0 commit comments

Comments
 (0)