-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathtest_otel_logger_example.py
More file actions
72 lines (54 loc) · 2.39 KB
/
Copy pathtest_otel_logger_example.py
File metadata and controls
72 lines (54 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""Tests for the OTel-enriched logger example."""
import time
from datetime import UTC, datetime
import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType
from src.otel import otel_logger_example
from test.conftest import deserialize_operation_payload
# X-Ray ingestion is eventually consistent; wait before querying so the backend
# has received and indexed the exported spans.
_XRAY_INGESTION_DELAY_SECONDS = 20
@pytest.mark.example
@pytest.mark.durable_execution(
handler=otel_logger_example.handler,
lambda_function_name="Otel Logger Example",
)
def test_otel_logger_example(durable_runner):
"""Verify the OTel logger example runs and produces the expected result."""
with durable_runner:
result = durable_runner.run(input="{}", timeout=10)
assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == "hello world | hello nested"
# The top-level step is named "top-greet".
top_step = result.get_step("top-greet")
assert deserialize_operation_payload(top_step.result) == "hello world"
# The child context wraps a nested step, so a CONTEXT operation exists.
context_ops = [
op for op in result.operations if op.operation_type is OperationType.CONTEXT
]
assert len(context_ops) >= 1
@pytest.mark.example
@pytest.mark.durable_execution(
handler=otel_logger_example.handler,
lambda_function_name="Otel Logger Example",
)
def test_otel_logger_example_spans_in_xray(durable_runner, xray_spans):
"""Single-invocation example: spans land in one X-Ray trace.
Runs only in cloud mode;
"""
start_time = datetime.now(UTC)
with durable_runner:
result = durable_runner.run(input="{}", timeout=60)
assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == "hello world | hello nested"
# Allow X-Ray time to ingest the exported spans.
time.sleep(_XRAY_INGESTION_DELAY_SECONDS)
_trace_id, segment_text = xray_spans.fetch_trace_with_span(
start_time, datetime.now(UTC), marker_span="top-greet"
)
# Expected spans for the single-invocation example.
assert "invocation" in segment_text
assert "top-greet" in segment_text
assert "child-context" in segment_text
assert "child-greet" in segment_text