Skip to content

Commit 73e5c7a

Browse files
cosmo0920edsiper
authored andcommitted
tests: integration: Add test cases for limit of long command lines
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
1 parent 89fe03f commit 73e5c7a

1 file changed

Lines changed: 148 additions & 0 deletions

File tree

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
import contextlib
2+
import http.server
3+
import os
4+
import shlex
5+
import sys
6+
import threading
7+
8+
import pytest
9+
10+
from utils.data_utils import read_file
11+
from utils.test_service import FluentBitTestService
12+
13+
14+
class _KubeApiHandler(http.server.BaseHTTPRequestHandler):
15+
def do_GET(self):
16+
payload = b"{}"
17+
self.send_response(200)
18+
self.send_header("Content-Type", "application/json")
19+
self.send_header("Content-Length", str(len(payload)))
20+
self.end_headers()
21+
self.wfile.write(payload)
22+
23+
def log_message(self, fmt, *args):
24+
return
25+
26+
27+
@contextlib.contextmanager
28+
def _run_kube_api_server():
29+
server = http.server.ThreadingHTTPServer(("127.0.0.1", 0), _KubeApiHandler)
30+
port = server.server_address[1]
31+
thread = threading.Thread(target=server.serve_forever, daemon=True)
32+
thread.start()
33+
34+
try:
35+
yield port
36+
finally:
37+
server.shutdown()
38+
server.server_close()
39+
thread.join()
40+
41+
42+
class Service:
43+
def __init__(self, config_file):
44+
self.config_file = os.path.abspath(config_file)
45+
self.service = FluentBitTestService(self.config_file)
46+
47+
def start(self):
48+
self.service.start()
49+
self.flb = self.service.flb
50+
51+
def stop(self):
52+
self.service.stop()
53+
54+
def wait_for_log_contains(self, text, timeout=20):
55+
return self.service.wait_for_condition(
56+
lambda: read_file(self.flb.log_file) if text in read_file(self.flb.log_file) else None,
57+
timeout=timeout,
58+
interval=0.5,
59+
description=f"log text {text!r}",
60+
)
61+
62+
def read_log(self):
63+
return read_file(self.flb.log_file)
64+
65+
66+
def _write_script(tmp_path, name, line_count):
67+
script_file = tmp_path / name
68+
script_file.write_text(
69+
f"import sys\nsys.stdout.write('tkn\\n' * {line_count})\n",
70+
encoding="utf-8",
71+
)
72+
return script_file
73+
74+
75+
def _write_config(tmp_path, name, script_file, kube_api_port):
76+
docker_id = "a" * 64
77+
token_command = "{} {}".format(
78+
shlex.quote(sys.executable),
79+
shlex.quote(str(script_file)),
80+
)
81+
82+
config_file = tmp_path / name
83+
config_file.write_text(
84+
"\n".join(
85+
[
86+
"[SERVICE]",
87+
" Flush 1",
88+
" Grace 1",
89+
" Log_Level info",
90+
" HTTP_Server On",
91+
" HTTP_Port ${FLUENT_BIT_HTTP_MONITORING_PORT}",
92+
"",
93+
"[INPUT]",
94+
" Name dummy",
95+
" Dummy {\"message\":\"kube token command test\"}",
96+
f" Tag kube.var.log.containers.testpod_default_testcontainer-{docker_id}.log",
97+
" Samples 1",
98+
"",
99+
"[FILTER]",
100+
" Name kubernetes",
101+
" Match kube.*",
102+
f" Kube_URL http://127.0.0.1:{kube_api_port}",
103+
" Kube_Tag_Prefix kube.var.log.containers.",
104+
" tls.verify Off",
105+
f" Kube_Token_Command {token_command}",
106+
"",
107+
"[OUTPUT]",
108+
" Name stdout",
109+
" Match *",
110+
]
111+
),
112+
encoding="utf-8",
113+
)
114+
return config_file
115+
116+
117+
@pytest.mark.skipif(sys.platform != "linux", reason="Kube_Token_Command test is Linux-only")
118+
def test_filter_kubernetes_token_command_accepts_multiline_output_over_8kb(tmp_path):
119+
script_file = _write_script(tmp_path, "token_large.py", 3000)
120+
with _run_kube_api_server() as kube_api_port:
121+
config_file = _write_config(tmp_path, "token_large.conf", script_file, kube_api_port)
122+
123+
service = Service(str(config_file))
124+
service.start()
125+
log_text = service.wait_for_log_contains("kube token command test", timeout=25)
126+
service.stop()
127+
128+
assert "failed to run command" not in log_text
129+
assert "kube token command test" in log_text
130+
131+
132+
@pytest.mark.skipif(sys.platform != "linux", reason="Kube_Token_Command test is Linux-only")
133+
def test_filter_kubernetes_token_command_rejects_multiline_output_over_limit(tmp_path):
134+
script_file = _write_script(tmp_path, "token_huge.py", 270000)
135+
with _run_kube_api_server() as kube_api_port:
136+
config_file = _write_config(tmp_path, "token_huge.conf", script_file, kube_api_port)
137+
138+
service = Service(str(config_file))
139+
log_text = None
140+
service.start()
141+
try:
142+
log_text = service.wait_for_log_contains("failed to run command", timeout=25)
143+
log_text = service.wait_for_log_contains("kube token command test", timeout=25)
144+
finally:
145+
service.stop()
146+
147+
assert "failed to run command" in log_text
148+
assert "kube token command test" in log_text

0 commit comments

Comments
 (0)