Skip to content

Commit 2b2f2a4

Browse files
committed
tests: integration: cover otlp resource schema url grouping
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
1 parent 70597b1 commit 2b2f2a4

2 files changed

Lines changed: 131 additions & 3 deletions

File tree

tests/integration/scenarios/in_opentelemetry/tests/test_in_opentelemetry_001.py

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ def maybe_read_prometheus_metric_value(metrics_text, metric_name, input_name):
252252
return None
253253

254254

255-
def build_resource_collision_logs_payload(user_id, body):
255+
def build_resource_collision_logs_payload(user_id, body, schema_url=None):
256256
payload = {
257257
"resourceLogs": [
258258
{
@@ -283,9 +283,49 @@ def build_resource_collision_logs_payload(user_id, body):
283283
],
284284
}
285285

286+
if schema_url is not None:
287+
payload["resourceLogs"][0]["schemaUrl"] = schema_url
288+
286289
return json_format.Parse(json.dumps(payload), ExportLogsServiceRequest()).SerializeToString()
287290

288291

292+
def build_resource_collision_logs_json_payload(user_id, body, schema_url=None):
293+
payload = {
294+
"resourceLogs": [
295+
{
296+
"resource": {
297+
"attributes": [
298+
{
299+
"key": "user.id",
300+
"value": {
301+
"stringValue": user_id,
302+
},
303+
}
304+
],
305+
},
306+
"scopeLogs": [
307+
{
308+
"scope": {},
309+
"logRecords": [
310+
{
311+
"timeUnixNano": "1640995200000000000",
312+
"body": {
313+
"stringValue": body,
314+
},
315+
}
316+
],
317+
}
318+
],
319+
}
320+
],
321+
}
322+
323+
if schema_url is not None:
324+
payload["resourceLogs"][0]["schemaUrl"] = schema_url
325+
326+
return json.dumps(payload).encode("utf-8")
327+
328+
289329
class Service:
290330
def __init__(self, config_file, *, use_auth_server=False):
291331
# Compose the absolute path for the Fluent Bit configuration file
@@ -709,6 +749,49 @@ def test_in_opentelemetry_stdout_otlp_json_logs_preserve_resources_across_reques
709749
assert len(output["resourceLogs"]) == 2
710750

711751

752+
@pytest.mark.parametrize(
753+
"content_type,payload_builder",
754+
[
755+
("application/x-protobuf", build_resource_collision_logs_payload),
756+
("application/json", build_resource_collision_logs_json_payload),
757+
],
758+
)
759+
def test_in_opentelemetry_stdout_otlp_json_logs_preserve_resource_schema_urls(
760+
content_type,
761+
payload_builder,
762+
):
763+
service = Service("stdout-otlp-json-slow-flush.yaml")
764+
service.start()
765+
766+
response = service.send_raw_request(
767+
"/v1/logs",
768+
payload_builder("same-user", "event-a", "schema-a"),
769+
content_type=content_type,
770+
)
771+
assert 200 <= response.status_code < 300
772+
773+
response = service.send_raw_request(
774+
"/v1/logs",
775+
payload_builder("same-user", "event-b", "schema-b"),
776+
content_type=content_type,
777+
)
778+
assert 200 <= response.status_code < 300
779+
780+
output = read_stdout_otlp_json(service, "resourceLogs", timeout=10)
781+
service.stop()
782+
783+
body_to_schema_url = {
784+
record["body"]["stringValue"]: resource_log["schemaUrl"]
785+
for resource_log in output["resourceLogs"]
786+
for scope_log in resource_log["scopeLogs"]
787+
for record in scope_log["logRecords"]
788+
}
789+
790+
assert body_to_schema_url["event-a"] == "schema-a"
791+
assert body_to_schema_url["event-b"] == "schema-b"
792+
assert len(output["resourceLogs"]) == 2
793+
794+
712795
def test_in_opentelemetry_stdout_otlp_json_metrics():
713796
service = Service("003-stdout-otlp-json.yaml")
714797
service.start()

tests/integration/scenarios/out_kafka/tests/test_out_kafka_001.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,8 @@ def _build_multi_resource_payload(service, signal_type, json_file):
218218
return payload
219219

220220

221-
def _build_resource_collision_payload(user_id, body):
222-
return {
221+
def _build_resource_collision_payload(user_id, body, schema_url=None):
222+
payload = {
223223
"resource_logs": [
224224
{
225225
"resource": {
@@ -249,6 +249,11 @@ def _build_resource_collision_payload(user_id, body):
249249
],
250250
}
251251

252+
if schema_url is not None:
253+
payload["resource_logs"][0]["schema_url"] = schema_url
254+
255+
return payload
256+
252257

253258
def _decode_kafka_payload(message, format_name, signal_type):
254259
if format_name == "otlp_json":
@@ -600,3 +605,43 @@ def test_out_kafka_otlp_logs_preserve_resources_across_requests_in_same_chunk(
600605
assert body_to_user["event-a"] == "user-a"
601606
assert body_to_user["event-b"] == "user-b"
602607
assert len(resources) == 2
608+
609+
610+
@pytest.mark.parametrize(
611+
"format_name,config_file",
612+
[
613+
("otlp_json", "out_kafka_otlp_json_slow_flush.yaml"),
614+
("otlp_proto", "out_kafka_otlp_proto_slow_flush.yaml"),
615+
],
616+
)
617+
def test_out_kafka_otlp_logs_preserve_resource_schema_urls_across_requests(
618+
format_name,
619+
config_file,
620+
):
621+
service = Service(config_file)
622+
service.start()
623+
service.send_payload_dict(
624+
_build_resource_collision_payload("same-user", "event-a", "schema-a"),
625+
"logs",
626+
)
627+
service.send_payload_dict(
628+
_build_resource_collision_payload("same-user", "event-b", "schema-b"),
629+
"logs",
630+
)
631+
632+
messages = service.wait_for_messages(1, timeout=10)
633+
service.stop()
634+
635+
assert len(messages) == 1
636+
637+
resources = _collect_resources(messages[:1], format_name, "logs")
638+
body_to_schema_url = {
639+
record["body"]["stringValue"]: resource["schemaUrl"]
640+
for resource in resources
641+
for scope in resource["scopeLogs"]
642+
for record in scope["logRecords"]
643+
}
644+
645+
assert body_to_schema_url["event-a"] == "schema-a"
646+
assert body_to_schema_url["event-b"] == "schema-b"
647+
assert len(resources) == 2

0 commit comments

Comments
 (0)