Skip to content

Commit 940a8f1

Browse files
author
Rituparna Khaund
committed
tests: integration: out_s3: add columnar format scenarios
Add out_s3 integration scenarios for the columnar formats: arrow and parquet with zstd, parquet and arrow with compression none, and a negative case asserting that an invalid format/compression combination (arrow + gzip) is rejected at startup. Each positive case verifies the ARROW1/PAR1 magic bytes and the absence of a Content-Encoding header. Signed-off-by: Rituparna Khaund <ritukhau@amazon.com>
1 parent 80db2f1 commit 940a8f1

6 files changed

Lines changed: 222 additions & 0 deletions

File tree

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
service:
2+
flush: 1
3+
grace: 3
4+
log_level: info
5+
http_server: on
6+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
7+
8+
pipeline:
9+
inputs:
10+
- name: dummy
11+
tag: out_s3
12+
dummy: '{"message":"hello arrow s3","source":"dummy"}'
13+
samples: 1
14+
15+
outputs:
16+
- name: s3
17+
match: out_s3
18+
bucket: test-bucket
19+
region: us-east-1
20+
endpoint: http://127.0.0.1:${TEST_SUITE_HTTP_PORT}
21+
use_put_object: true
22+
total_file_size: 1M
23+
upload_timeout: 2s
24+
s3_key_format: /payloads/$TAG/$UUID
25+
format: arrow
26+
compression: zstd
27+
store_dir: /tmp/fluent-bit-test-suite-s3-arrow
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
service:
2+
flush: 1
3+
grace: 3
4+
log_level: info
5+
http_server: on
6+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
7+
8+
pipeline:
9+
inputs:
10+
- name: dummy
11+
tag: out_s3
12+
dummy: '{"message":"hello arrow gzip s3","source":"dummy"}'
13+
samples: 1
14+
15+
outputs:
16+
- name: s3
17+
match: out_s3
18+
bucket: test-bucket
19+
region: us-east-1
20+
endpoint: http://127.0.0.1:${TEST_SUITE_HTTP_PORT}
21+
use_put_object: true
22+
total_file_size: 1M
23+
upload_timeout: 2s
24+
s3_key_format: /payloads/$TAG/$UUID
25+
format: arrow
26+
compression: gzip
27+
store_dir: /tmp/fluent-bit-test-suite-s3-arrow-gzip-invalid
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
service:
2+
flush: 1
3+
grace: 3
4+
log_level: info
5+
http_server: on
6+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
7+
8+
pipeline:
9+
inputs:
10+
- name: dummy
11+
tag: out_s3
12+
dummy: '{"message":"hello arrow none s3","source":"dummy"}'
13+
samples: 1
14+
15+
outputs:
16+
- name: s3
17+
match: out_s3
18+
bucket: test-bucket
19+
region: us-east-1
20+
endpoint: http://127.0.0.1:${TEST_SUITE_HTTP_PORT}
21+
use_put_object: true
22+
total_file_size: 1M
23+
upload_timeout: 2s
24+
s3_key_format: /payloads/$TAG/$UUID
25+
format: arrow
26+
compression: none
27+
store_dir: /tmp/fluent-bit-test-suite-s3-arrow-none
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
service:
2+
flush: 1
3+
grace: 3
4+
log_level: info
5+
http_server: on
6+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
7+
8+
pipeline:
9+
inputs:
10+
- name: dummy
11+
tag: out_s3
12+
dummy: '{"message":"hello parquet s3","source":"dummy"}'
13+
samples: 1
14+
15+
outputs:
16+
- name: s3
17+
match: out_s3
18+
bucket: test-bucket
19+
region: us-east-1
20+
endpoint: http://127.0.0.1:${TEST_SUITE_HTTP_PORT}
21+
use_put_object: true
22+
total_file_size: 1M
23+
upload_timeout: 2s
24+
s3_key_format: /payloads/$TAG/$UUID
25+
format: parquet
26+
compression: zstd
27+
store_dir: /tmp/fluent-bit-test-suite-s3-parquet
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
service:
2+
flush: 1
3+
grace: 3
4+
log_level: info
5+
http_server: on
6+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
7+
8+
pipeline:
9+
inputs:
10+
- name: dummy
11+
tag: out_s3
12+
dummy: '{"message":"hello parquet none s3","source":"dummy"}'
13+
samples: 1
14+
15+
outputs:
16+
- name: s3
17+
match: out_s3
18+
bucket: test-bucket
19+
region: us-east-1
20+
endpoint: http://127.0.0.1:${TEST_SUITE_HTTP_PORT}
21+
use_put_object: true
22+
total_file_size: 1M
23+
upload_timeout: 2s
24+
s3_key_format: /payloads/$TAG/$UUID
25+
format: parquet
26+
compression: none
27+
store_dir: /tmp/fluent-bit-test-suite-s3-parquet-none

tests/integration/scenarios/out_s3/tests/test_out_s3_001.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,22 @@ def _start_or_skip_unsupported_s3_format(service, format_name):
140140
raise
141141

142142

143+
def _start_or_skip_unsupported_columnar_format(service, requires_marker):
144+
"""Start the service, skipping if the columnar format support
145+
(arrow-glib/parquet-glib) was not compiled into the Fluent Bit binary."""
146+
try:
147+
service.start()
148+
except FluentBitStartupError:
149+
log_contents = ""
150+
if service.service.flb and service.service.flb.log_file:
151+
with open(service.service.flb.log_file, "r", encoding="utf-8", errors="replace") as file:
152+
log_contents = file.read()
153+
if requires_marker in log_contents or \
154+
"unknown configuration property 'format'" in log_contents:
155+
pytest.skip("columnar format support is not compiled into this Fluent Bit binary")
156+
raise
157+
158+
143159
def test_out_s3_put_object_uploads_json_lines_payload():
144160
service = Service("out_s3_basic.yaml")
145161
service.start()
@@ -241,3 +257,74 @@ def test_out_s3_default_retry_exhausted_action_quarantines_file():
241257
service.stop()
242258

243259
assert len(files) > 0
260+
261+
262+
def test_out_s3_format_arrow_uploads_feather_with_zstd():
263+
service = Service("out_s3_arrow.yaml")
264+
_start_or_skip_unsupported_columnar_format(service, "requires arrow-glib")
265+
request = service.wait_for_request()
266+
service.stop()
267+
268+
assert request["method"] == "PUT"
269+
assert request["path"].startswith("/test-bucket/payloads/out_s3/")
270+
body = request["body"]
271+
# Arrow/Feather V2 files begin with the "ARROW1" magic. The object is the
272+
# columnar file itself, so it must not carry a byte-level Content-Encoding.
273+
assert body[:6] == b"ARROW1"
274+
assert "Content-Encoding" not in request["headers"]
275+
276+
277+
def test_out_s3_format_parquet_uploads_parquet_with_zstd():
278+
service = Service("out_s3_parquet.yaml")
279+
_start_or_skip_unsupported_columnar_format(service, "requires parquet-glib")
280+
request = service.wait_for_request()
281+
service.stop()
282+
283+
assert request["method"] == "PUT"
284+
assert request["path"].startswith("/test-bucket/payloads/out_s3/")
285+
body = request["body"]
286+
# Parquet files start and end with the "PAR1" magic. Page-level zstd is
287+
# applied inside the file, so no byte-level Content-Encoding is expected.
288+
assert body[:4] == b"PAR1"
289+
assert body[-4:] == b"PAR1"
290+
assert "Content-Encoding" not in request["headers"]
291+
292+
293+
def test_out_s3_format_parquet_compression_none_is_accepted():
294+
# 'compression none' must be explicitly accepted (not rejected as an
295+
# unknown codec) and produce an uncompressed Parquet object with no
296+
# byte-level Content-Encoding header.
297+
service = Service("out_s3_parquet_none.yaml")
298+
_start_or_skip_unsupported_columnar_format(service, "requires parquet-glib")
299+
request = service.wait_for_request()
300+
service.stop()
301+
302+
assert request["method"] == "PUT"
303+
body = request["body"]
304+
assert body[:4] == b"PAR1"
305+
assert body[-4:] == b"PAR1"
306+
assert "Content-Encoding" not in request["headers"]
307+
308+
309+
def test_out_s3_format_arrow_compression_none_is_accepted():
310+
# 'compression none' must be explicitly accepted (not rejected as an
311+
# unknown codec) and produce an uncompressed Arrow/Feather object with no
312+
# byte-level Content-Encoding header.
313+
service = Service("out_s3_arrow_none.yaml")
314+
_start_or_skip_unsupported_columnar_format(service, "requires arrow-glib")
315+
request = service.wait_for_request()
316+
service.stop()
317+
318+
assert request["method"] == "PUT"
319+
body = request["body"]
320+
assert body[:6] == b"ARROW1"
321+
assert "Content-Encoding" not in request["headers"]
322+
323+
324+
def test_out_s3_format_arrow_compression_gzip_is_rejected():
325+
# format=arrow with compression=gzip is an invalid combination;
326+
# validate_format_compression() must reject it at plugin init.
327+
service = Service("out_s3_arrow_gzip_invalid.yaml")
328+
with pytest.raises(FluentBitStartupError):
329+
service.start()
330+
service.stop()

0 commit comments

Comments
 (0)