Skip to content

Commit 3bef61c

Browse files
committed
tests: integration: cover bulk action metadata injection
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
1 parent b156f97 commit 3bef61c

8 files changed

Lines changed: 379 additions & 0 deletions
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
service:
2+
flush: 1
3+
grace: 1
4+
log_level: info
5+
http_server: on
6+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
7+
8+
pipeline:
9+
inputs:
10+
- name: dummy
11+
tag: out_es_id_key_ndjson
12+
dummy: '{"doc_id":"legit\"\n{\"delete\":{\"_index\":\"audit-logs\",\"_id\":\"critical-audit-record-12345\"}}\n{\"create\":{\"_index\":\"pad\",\"_id\":\"x","message":"id-key injection"}'
13+
samples: 1
14+
15+
outputs:
16+
- name: es
17+
match: out_es_id_key_ndjson
18+
host: 127.0.0.1
19+
port: ${TEST_SUITE_HTTP_PORT}
20+
index: fluent-bit
21+
suppress_type_name: on
22+
id_key: doc_id
23+
retry_limit: 0
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
service:
2+
flush: 1
3+
grace: 1
4+
log_level: info
5+
http_server: on
6+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
7+
8+
pipeline:
9+
inputs:
10+
- name: dummy
11+
tag: out_es_id_key_update_ndjson
12+
dummy: >-
13+
{"doc_id":"legit\"\n
14+
{\"delete\":{\"_index\":\"audit-logs\",\"_id\":\"critical-audit-record-12345\"}}\n
15+
{\"update\":{\"_index\":\"pad\",\"_id\":\"x",
16+
"message":"unsafe update id"}
17+
samples: 1
18+
19+
- name: dummy
20+
tag: out_es_id_key_update_ndjson
21+
dummy: '{"doc_id":"safe-update-id","message":"safe update id"}'
22+
samples: 1
23+
24+
outputs:
25+
- name: es
26+
match: out_es_id_key_update_ndjson
27+
host: 127.0.0.1
28+
port: ${TEST_SUITE_HTTP_PORT}
29+
index: fluent-bit
30+
suppress_type_name: on
31+
id_key: doc_id
32+
write_operation: update
33+
retry_limit: 0
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
service:
2+
flush: 1
3+
grace: 1
4+
log_level: info
5+
http_server: on
6+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
7+
8+
pipeline:
9+
inputs:
10+
- name: dummy
11+
tag: out_es_logstash_prefix_key_ndjson
12+
dummy: '{"idx":"victim-index\"\n{\"delete\":{\"_index\":\"audit-logs\",\"_id\":\"critical-audit-record-12345\"}}\n{\"create\":{\"_index\":\"pad","message":"prefix injection"}'
13+
samples: 1
14+
15+
outputs:
16+
- name: es
17+
match: out_es_logstash_prefix_key_ndjson
18+
host: 127.0.0.1
19+
port: ${TEST_SUITE_HTTP_PORT}
20+
logstash_format: on
21+
logstash_prefix_key: idx
22+
suppress_type_name: on
23+
retry_limit: 0
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
service:
2+
flush: 1
3+
grace: 1
4+
log_level: info
5+
http_server: on
6+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
7+
8+
pipeline:
9+
inputs:
10+
- name: dummy
11+
tag: out_opensearch_id_key_ndjson
12+
dummy: '{"doc_id":"legit\"\n{\"delete\":{\"_index\":\"audit-logs\",\"_id\":\"critical-audit-record-12345\"}}\n{\"create\":{\"_index\":\"pad\",\"_id\":\"x","message":"opensearch id-key injection"}'
13+
samples: 1
14+
15+
outputs:
16+
- name: opensearch
17+
match: out_opensearch_id_key_ndjson
18+
host: 127.0.0.1
19+
port: ${TEST_SUITE_HTTP_PORT}
20+
index: fluent-bit
21+
suppress_type_name: on
22+
id_key: doc_id
23+
retry_limit: 0
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
service:
2+
flush: 1
3+
grace: 1
4+
log_level: info
5+
http_server: on
6+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
7+
8+
pipeline:
9+
inputs:
10+
- name: dummy
11+
tag: out_opensearch_id_key_update_ndjson
12+
dummy: >-
13+
{"doc_id":"legit\"\n
14+
{\"delete\":{\"_index\":\"audit-logs\",\"_id\":\"critical-audit-record-12345\"}}\n
15+
{\"update\":{\"_index\":\"pad\",\"_id\":\"x",
16+
"message":"unsafe opensearch update id"}
17+
samples: 1
18+
19+
- name: dummy
20+
tag: out_opensearch_id_key_update_ndjson
21+
dummy: '{"doc_id":"safe-update-id","message":"safe opensearch update id"}'
22+
samples: 1
23+
24+
outputs:
25+
- name: opensearch
26+
match: out_opensearch_id_key_update_ndjson
27+
host: 127.0.0.1
28+
port: ${TEST_SUITE_HTTP_PORT}
29+
index: fluent-bit
30+
suppress_type_name: on
31+
id_key: doc_id
32+
write_operation: update
33+
retry_limit: 0
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
service:
2+
flush: 1
3+
grace: 1
4+
log_level: info
5+
http_server: on
6+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
7+
8+
pipeline:
9+
inputs:
10+
- name: dummy
11+
tag: out_opensearch_index_record_accessor_ndjson
12+
dummy: '{"idx":"victim-index\"\n{\"delete\":{\"_index\":\"audit-logs\",\"_id\":\"critical-audit-record-12345\"}}\n{\"create\":{\"_index\":\"pad","message":"opensearch index injection"}'
13+
samples: 1
14+
15+
outputs:
16+
- name: opensearch
17+
match: out_opensearch_index_record_accessor_ndjson
18+
host: 127.0.0.1
19+
port: ${TEST_SUITE_HTTP_PORT}
20+
index: $idx
21+
suppress_type_name: on
22+
retry_limit: 0
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
service:
2+
flush: 1
3+
grace: 1
4+
log_level: info
5+
http_server: on
6+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
7+
8+
pipeline:
9+
inputs:
10+
- name: dummy
11+
tag: out_opensearch_logstash_prefix_key_ndjson
12+
dummy: '{"idx":"victim-index\"\n{\"delete\":{\"_index\":\"audit-logs\",\"_id\":\"critical-audit-record-12345\"}}\n{\"create\":{\"_index\":\"pad","message":"opensearch prefix injection"}'
13+
samples: 1
14+
15+
outputs:
16+
- name: opensearch
17+
match: out_opensearch_logstash_prefix_key_ndjson
18+
host: 127.0.0.1
19+
port: ${TEST_SUITE_HTTP_PORT}
20+
logstash_format: on
21+
logstash_prefix_key: idx
22+
suppress_type_name: on
23+
retry_limit: 0
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
import json
2+
import os
3+
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
4+
import threading
5+
6+
import pytest
7+
8+
from utils.test_service import FluentBitTestService
9+
10+
11+
FORGED_DELETE_ID = "critical-audit-record-12345"
12+
SAFE_UPDATE_ID = "safe-update-id"
13+
14+
15+
class _BulkCaptureHandler(BaseHTTPRequestHandler):
16+
def log_message(self, fmt, *args):
17+
return
18+
19+
def do_POST(self):
20+
content_length = int(self.headers.get("Content-Length", "0"))
21+
body = self.rfile.read(content_length)
22+
23+
self.server.requests.append(
24+
{
25+
"path": self.path,
26+
"headers": dict(self.headers),
27+
"body": body.decode("utf-8", errors="replace"),
28+
}
29+
)
30+
31+
response = b'{"errors":false,"items":[{"create":{"status":201}}]}'
32+
self.send_response(200)
33+
self.send_header("Content-Type", "application/json")
34+
self.send_header("Content-Length", str(len(response)))
35+
self.end_headers()
36+
self.wfile.write(response)
37+
38+
39+
class _BulkCaptureServer(ThreadingHTTPServer):
40+
daemon_threads = True
41+
allow_reuse_address = True
42+
43+
def __init__(self, address):
44+
super().__init__(address, _BulkCaptureHandler)
45+
self.requests = []
46+
47+
48+
class Service:
49+
def __init__(self, config_file):
50+
self.config_file = os.path.abspath(
51+
os.path.join(os.path.dirname(__file__), "../config", config_file)
52+
)
53+
self.bulk_server = None
54+
self.bulk_server_thread = None
55+
self.service = FluentBitTestService(
56+
self.config_file,
57+
pre_start=self._start_receiver,
58+
post_stop=self._stop_receiver,
59+
)
60+
61+
def _start_receiver(self, service):
62+
self.bulk_server = _BulkCaptureServer(("127.0.0.1", service.test_suite_http_port))
63+
self.bulk_server_thread = threading.Thread(
64+
target=self.bulk_server.serve_forever,
65+
daemon=True,
66+
)
67+
self.bulk_server_thread.start()
68+
69+
def _stop_receiver(self, service):
70+
if self.bulk_server is None:
71+
return
72+
73+
self.bulk_server.shutdown()
74+
self.bulk_server.server_close()
75+
76+
if self.bulk_server_thread is not None:
77+
self.bulk_server_thread.join(timeout=5)
78+
79+
def start(self):
80+
self.service.start()
81+
82+
def stop(self):
83+
self.service.stop()
84+
85+
def wait_for_requests(self, minimum_count, timeout=10):
86+
if os.environ.get("VALGRIND"):
87+
timeout = max(timeout * 3, 30)
88+
89+
return self.service.wait_for_condition(
90+
lambda: self.bulk_server.requests
91+
if len(self.bulk_server.requests) >= minimum_count
92+
else None,
93+
timeout=timeout,
94+
interval=0.5,
95+
description=f"{minimum_count} Elasticsearch bulk requests",
96+
)
97+
98+
def wait_for_action_lines(self, minimum_count, timeout=10):
99+
if os.environ.get("VALGRIND"):
100+
timeout = max(timeout * 3, 30)
101+
102+
return self.service.wait_for_condition(
103+
lambda: self.bulk_server.requests
104+
if sum(len(_bulk_action_lines(request["body"]))
105+
for request in self.bulk_server.requests) >= minimum_count
106+
else None,
107+
timeout=timeout,
108+
interval=0.5,
109+
description=f"{minimum_count} Elasticsearch bulk action lines",
110+
)
111+
112+
113+
def _bulk_action_lines(body):
114+
actions = []
115+
116+
for line in body.splitlines():
117+
if not line:
118+
continue
119+
120+
try:
121+
value = json.loads(line)
122+
except json.JSONDecodeError:
123+
continue
124+
125+
if isinstance(value, dict) and any(
126+
key in value for key in ("create", "index", "update", "delete")
127+
):
128+
actions.append(value)
129+
130+
return actions
131+
132+
133+
def _assert_no_forged_delete(body):
134+
actions = _bulk_action_lines(body)
135+
deletes = [
136+
action["delete"]
137+
for action in actions
138+
if "delete" in action and action["delete"].get("_id") == FORGED_DELETE_ID
139+
]
140+
141+
assert len(actions) == 1
142+
assert deletes == []
143+
144+
145+
def _bulk_actions(requests):
146+
actions = []
147+
148+
for request in requests:
149+
actions.extend(_bulk_action_lines(request["body"]))
150+
151+
return actions
152+
153+
154+
@pytest.mark.parametrize(
155+
"config_file",
156+
[
157+
"out_es_logstash_prefix_key_ndjson.yaml",
158+
"out_es_id_key_ndjson.yaml",
159+
"out_opensearch_logstash_prefix_key_ndjson.yaml",
160+
"out_opensearch_index_record_accessor_ndjson.yaml",
161+
"out_opensearch_id_key_ndjson.yaml",
162+
],
163+
)
164+
def test_record_accessor_values_do_not_forge_bulk_action_lines(config_file):
165+
service = Service(config_file)
166+
167+
try:
168+
service.start()
169+
requests_seen = service.wait_for_requests(1)
170+
finally:
171+
service.stop()
172+
173+
bulk_body = requests_seen[0]["body"]
174+
assert requests_seen[0]["path"].startswith("/_bulk")
175+
_assert_no_forged_delete(bulk_body)
176+
177+
178+
@pytest.mark.parametrize(
179+
"config_file",
180+
[
181+
"out_es_id_key_update_ndjson.yaml",
182+
"out_opensearch_id_key_update_ndjson.yaml",
183+
],
184+
)
185+
def test_unsafe_required_id_key_does_not_emit_idless_update(config_file):
186+
service = Service(config_file)
187+
188+
try:
189+
service.start()
190+
requests_seen = service.wait_for_action_lines(1)
191+
finally:
192+
service.stop()
193+
194+
actions = _bulk_actions(requests_seen)
195+
updates = [action["update"] for action in actions if "update" in action]
196+
197+
assert all(request["path"].startswith("/_bulk") for request in requests_seen)
198+
assert len(actions) == 1
199+
assert updates == [{"_index": "fluent-bit", "_id": SAFE_UPDATE_ID}]

0 commit comments

Comments
 (0)