Skip to content

Commit ac5ca94

Browse files
committed
tests: integration: cover internal HTTP self requests
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
1 parent a0aa168 commit ac5ca94

2 files changed

Lines changed: 111 additions & 0 deletions

File tree

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
service:
2+
flush: 1
3+
grace: 1
4+
log_level: info
5+
http_server: on
6+
http_listen: 127.0.0.1
7+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
8+
health_check: on
9+
storage.metrics: on
10+
11+
pipeline:
12+
inputs:
13+
- name: exec
14+
tag: test
15+
command: curl -s http://127.0.0.1:${FLUENT_BIT_HTTP_MONITORING_PORT}/api/v1/metrics/prometheus
16+
interval_sec: 1
17+
buf_size: 128k
18+
19+
outputs:
20+
- name: null
21+
match: "*"
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import concurrent.futures
2+
import os
3+
import time
4+
5+
from utils.http_matrix import run_curl_request
6+
from utils.test_service import FluentBitTestService
7+
8+
9+
class Service:
10+
"""Run Fluent Bit with an exec input that curls the monitoring server."""
11+
12+
def __init__(self):
13+
config_dir = os.path.dirname(__file__)
14+
self.config_file = os.path.abspath(
15+
os.path.join(config_dir, "../config/internal_http_server_exec_deadlock.yaml")
16+
)
17+
self.service = FluentBitTestService(self.config_file)
18+
19+
def start(self):
20+
"""Start Fluent Bit and record the monitoring base URL."""
21+
try:
22+
self.service.start()
23+
except Exception:
24+
self.service.stop()
25+
raise
26+
27+
self.flb = self.service.flb
28+
self.base_url = f"http://127.0.0.1:{self.flb.http_monitoring_port}"
29+
30+
def stop(self):
31+
"""Stop Fluent Bit and restore the test environment."""
32+
self.service.stop()
33+
34+
def request(self, path, *, method="GET", http_mode="http1.1"):
35+
"""Issue a request against the internal monitoring server."""
36+
return run_curl_request(
37+
f"{self.base_url}{path}",
38+
method=method,
39+
http_mode=http_mode,
40+
)
41+
42+
43+
def test_http_server_stays_responsive_after_exec_self_request():
44+
"""The monitoring server must not share the blocked exec collector loop."""
45+
service = Service()
46+
service.start()
47+
48+
try:
49+
result = service.request("/api/v1/uptime")
50+
assert result["status_code"] == 200
51+
assert "uptime_sec" in result["body"]
52+
53+
time.sleep(2)
54+
55+
endpoints = [
56+
("/api/v1/uptime", "uptime_sec"),
57+
("/api/v1/health", "ok"),
58+
("/api/v1/metrics", "output"),
59+
("/api/v1/metrics/prometheus", "fluentbit_uptime"),
60+
("/api/v1/storage", "chunks"),
61+
("/api/v2/metrics", "fluentbit_uptime"),
62+
("/api/v2/metrics/prometheus", "fluentbit_uptime"),
63+
]
64+
65+
for path, pattern in endpoints:
66+
result = service.service.wait_for_condition(
67+
lambda: (
68+
response
69+
if response["status_code"] == 200 and pattern in response["body"]
70+
else None
71+
) if (response := service.request(path)) else None,
72+
timeout=10,
73+
interval=0.5,
74+
description=f"internal endpoint {path}",
75+
)
76+
assert result["status_code"] == 200
77+
assert pattern in result["body"]
78+
79+
def fetch(endpoint):
80+
path, pattern = endpoint
81+
response = service.request(path)
82+
assert response["status_code"] == 200
83+
assert pattern in response["body"]
84+
85+
with concurrent.futures.ThreadPoolExecutor(max_workers=7) as executor:
86+
futures = [executor.submit(fetch, endpoint) for endpoint in endpoints * 4]
87+
for future in futures:
88+
future.result()
89+
finally:
90+
service.stop()

0 commit comments

Comments
 (0)