|
5 | 5 | import logging |
6 | 6 | import os |
7 | 7 | import sys |
| 8 | +from datetime import datetime |
8 | 9 | from enum import StrEnum |
9 | 10 | from pathlib import Path |
10 | 11 | from typing import Any |
11 | 12 |
|
12 | 13 | import pytest |
13 | | -from aws_durable_execution_sdk_python.lambda_service import ( |
14 | | - ErrorObject, |
15 | | - OperationPayload, |
16 | | -) |
17 | | -from aws_durable_execution_sdk_python.serdes import ExtendedTypeSerDes |
18 | | - |
19 | 14 | from aws_durable_execution_sdk_python_testing.runner import ( |
20 | 15 | DurableFunctionCloudTestRunner, |
21 | 16 | DurableFunctionTestResult, |
22 | 17 | DurableFunctionTestRunner, |
23 | 18 | ) |
24 | 19 |
|
| 20 | +from aws_durable_execution_sdk_python.lambda_service import ( |
| 21 | + ErrorObject, |
| 22 | + OperationPayload, |
| 23 | +) |
| 24 | +from aws_durable_execution_sdk_python.serdes import ExtendedTypeSerDes |
| 25 | + |
25 | 26 |
|
26 | 27 | # Add examples/src to Python path for imports |
27 | 28 | examples_src = Path(__file__).parent.parent / "src" |
@@ -266,3 +267,109 @@ def _get_deployed_function_name( |
266 | 267 | pytest.skip( |
267 | 268 | f"Test '{lambda_function_name}' doesn't match LAMBDA_FUNCTION_TEST_NAME '{env_function_name}'" |
268 | 269 | ) |
| 270 | + |
| 271 | + |
| 272 | +# X-Ray ingestion is eventually consistent; give the backend time to receive and |
| 273 | +# index spans before querying, then retry a few times. |
| 274 | +_XRAY_QUERY_RETRIES = 3 |
| 275 | +_XRAY_RETRY_DELAY_SECONDS = 10 |
| 276 | + |
| 277 | + |
| 278 | +class XRaySpanFetcher: |
| 279 | + """Encapsulates all AWS X-Ray interaction for span-validation tests. |
| 280 | +
|
| 281 | + Wraps a boto3 X-Ray client and exposes a single high-level operation that |
| 282 | + queries trace summaries in a time window (with retries for eventual |
| 283 | + consistency), batch-fetches the full traces, and locates the trace whose |
| 284 | + segment documents reference a marker span name. |
| 285 | + """ |
| 286 | + |
| 287 | + def __init__(self, client: Any): |
| 288 | + """Initialize with a boto3 X-Ray client.""" |
| 289 | + self._client = client |
| 290 | + |
| 291 | + def _query_trace_summaries( |
| 292 | + self, start_time: datetime, end_time: datetime |
| 293 | + ) -> list[dict]: |
| 294 | + """Query trace summaries in a window, retrying for consistency.""" |
| 295 | + import time |
| 296 | + |
| 297 | + for attempt in range(_XRAY_QUERY_RETRIES): |
| 298 | + response = self._client.get_trace_summaries( |
| 299 | + StartTime=start_time, |
| 300 | + EndTime=end_time, |
| 301 | + TimeRangeType="Event", |
| 302 | + Sampling=False, |
| 303 | + ) |
| 304 | + summaries = response.get("TraceSummaries", []) |
| 305 | + if summaries: |
| 306 | + return summaries |
| 307 | + |
| 308 | + logger.info( |
| 309 | + "X-Ray query returned 0 traces, retrying in %ss (attempt %d/%d)", |
| 310 | + _XRAY_RETRY_DELAY_SECONDS, |
| 311 | + attempt + 1, |
| 312 | + _XRAY_QUERY_RETRIES, |
| 313 | + ) |
| 314 | + time.sleep(_XRAY_RETRY_DELAY_SECONDS) |
| 315 | + return [] |
| 316 | + |
| 317 | + def fetch_trace_with_span( |
| 318 | + self, |
| 319 | + start_time: datetime, |
| 320 | + end_time: datetime, |
| 321 | + marker_span: str, |
| 322 | + ) -> tuple[str, str]: |
| 323 | + """Find the trace containing ``marker_span`` and return its segment text. |
| 324 | +
|
| 325 | + Queries trace summaries in the window, then batch-fetches full traces |
| 326 | + (X-Ray caps BatchGetTraces at 5 trace IDs per call) and locates the |
| 327 | + trace whose segment documents reference the marker span name. |
| 328 | +
|
| 329 | + Args: |
| 330 | + start_time: Start of the X-Ray query window. |
| 331 | + end_time: End of the X-Ray query window. |
| 332 | + marker_span: A span name expected to appear in the target trace. |
| 333 | +
|
| 334 | + Returns: |
| 335 | + A tuple of (trace_id, concatenated segment-document JSON text). |
| 336 | + """ |
| 337 | + summaries = self._query_trace_summaries(start_time, end_time) |
| 338 | + assert summaries, "Expected at least one trace in X-Ray after execution" |
| 339 | + |
| 340 | + trace_ids = [s["Id"] for s in summaries] |
| 341 | + |
| 342 | + for i in range(0, len(trace_ids), 5): |
| 343 | + batch = trace_ids[i : i + 5] |
| 344 | + result = self._client.batch_get_traces(TraceIds=batch) |
| 345 | + for trace in result.get("Traces", []): |
| 346 | + documents = [ |
| 347 | + seg.get("Document", "") for seg in trace.get("Segments", []) |
| 348 | + ] |
| 349 | + segment_text = "\n".join(documents) |
| 350 | + if marker_span in segment_text: |
| 351 | + return trace["Id"], segment_text |
| 352 | + |
| 353 | + pytest.fail( |
| 354 | + f"Did not find a trace containing span '{marker_span}' in the time " |
| 355 | + f"window across {len(trace_ids)} trace(s)" |
| 356 | + ) |
| 357 | + |
| 358 | + |
| 359 | +@pytest.fixture |
| 360 | +def xray_spans(request): |
| 361 | + """Provide an XRaySpanFetcher for cloud-mode span validation tests. |
| 362 | +
|
| 363 | + The underlying boto3 X-Ray client is created in the same region as the |
| 364 | + cloud runner (AWS_REGION, default us-west-2). In local mode there is no |
| 365 | + X-Ray backend, so the fixture skips the test, mirroring the cloud-only |
| 366 | + gating of the durable_runner cloud path. |
| 367 | + """ |
| 368 | + runner_mode: str = request.config.getoption("--runner-mode") |
| 369 | + if runner_mode != RunnerMode.CLOUD: |
| 370 | + pytest.skip("X-Ray span validation only runs in cloud mode") |
| 371 | + |
| 372 | + import boto3 |
| 373 | + |
| 374 | + region = os.environ.get("AWS_REGION", "us-west-2") |
| 375 | + return XRaySpanFetcher(boto3.client("xray", region_name=region)) |
0 commit comments