|
| 1 | +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. |
| 2 | +# SPDX-License-Identifier: MIT-0 |
| 3 | + |
| 4 | +"""Unit tests for the outbound SDK User-Agent solution tracking (ua).""" |
| 5 | + |
| 6 | +from __future__ import annotations |
| 7 | + |
| 8 | +import threading |
| 9 | + |
| 10 | +import pytest |
| 11 | + |
| 12 | +from ua import ( |
| 13 | + COMPONENT, |
| 14 | + SOLUTION_ID, |
| 15 | + STACK_NAME_ENV, |
| 16 | + client_config, |
| 17 | + get_trace, |
| 18 | + register_trace_appender, |
| 19 | + sanitize_ua_value, |
| 20 | + set_trace, |
| 21 | + static_user_agent_extra, |
| 22 | +) |
| 23 | + |
| 24 | + |
| 25 | +@pytest.fixture(autouse=True) |
| 26 | +def _reset(monkeypatch): |
| 27 | + """Clear trace state and the stack-name env var between tests.""" |
| 28 | + monkeypatch.delenv(STACK_NAME_ENV, raising=False) |
| 29 | + set_trace(None) |
| 30 | + yield |
| 31 | + set_trace(None) |
| 32 | + |
| 33 | + |
| 34 | +class TestSanitizeUaValue: |
| 35 | + def test_passthrough_for_token_safe_chars(self): |
| 36 | + assert sanitize_ua_value("backgroundagent-dev") == "backgroundagent-dev" |
| 37 | + assert sanitize_ua_value("A1!$%&'*+-.^_`|~z") == "A1!$%&'*+-.^_`|~z" |
| 38 | + |
| 39 | + def test_structural_separators_replaced(self): |
| 40 | + # '/' and '#' are the structural separators of the UA scheme and are |
| 41 | + # NOT in the UA token charset — both must become '-'. |
| 42 | + assert sanitize_ua_value("a/b#c") == "a-b-c" |
| 43 | + |
| 44 | + def test_non_ascii_replaced(self): |
| 45 | + assert sanitize_ua_value("stäck") == "st-ck" |
| 46 | + assert sanitize_ua_value("名前") == "--" |
| 47 | + |
| 48 | + def test_whitespace_and_controls_replaced(self): |
| 49 | + assert sanitize_ua_value("a b\tc\nd") == "a-b-c-d" |
| 50 | + |
| 51 | + def test_empty(self): |
| 52 | + assert sanitize_ua_value("") == "" |
| 53 | + |
| 54 | + |
| 55 | +class TestStaticUserAgentExtra: |
| 56 | + def test_without_stack_name_omits_app_segment(self): |
| 57 | + extra = static_user_agent_extra() |
| 58 | + assert extra == f"md/{SOLUTION_ID}#{COMPONENT}" |
| 59 | + assert "app/" not in extra |
| 60 | + |
| 61 | + def test_with_stack_name(self, monkeypatch): |
| 62 | + monkeypatch.setenv(STACK_NAME_ENV, "backgroundagent-dev") |
| 63 | + extra = static_user_agent_extra() |
| 64 | + assert extra == (f"app/{SOLUTION_ID}/backgroundagent-dev md/{SOLUTION_ID}#{COMPONENT}") |
| 65 | + |
| 66 | + def test_stack_name_sanitized_then_clipped(self, monkeypatch): |
| 67 | + # Sanitize FIRST, then clip to 34 — a multi-byte char near the cut |
| 68 | + # must already be '-' before clipping. |
| 69 | + hostile = "my/stack#nämé" + "x" * 40 |
| 70 | + monkeypatch.setenv(STACK_NAME_ENV, hostile) |
| 71 | + extra = static_user_agent_extra() |
| 72 | + app_value = extra.split(" ")[0].removeprefix("app/") |
| 73 | + assert app_value.startswith(f"{SOLUTION_ID}/my-stack-n-m-") |
| 74 | + # uksb-wt64nei4u6/ (16) + clipped stack (<=34) <= 50. |
| 75 | + assert len(app_value) <= 50 |
| 76 | + stack_part = app_value.removeprefix(f"{SOLUTION_ID}/") |
| 77 | + assert len(stack_part) == 34 |
| 78 | + assert "/" not in stack_part and "#" not in stack_part |
| 79 | + |
| 80 | + def test_longest_realistic_stack_name_within_budget(self, monkeypatch): |
| 81 | + # CloudFormation stack names max out at 128 chars [A-Za-z0-9-]. |
| 82 | + monkeypatch.setenv(STACK_NAME_ENV, "a" * 128) |
| 83 | + app_value = static_user_agent_extra().split(" ")[0].removeprefix("app/") |
| 84 | + assert len(app_value) == 50 |
| 85 | + |
| 86 | + def test_blank_stack_name_omits_app_segment(self, monkeypatch): |
| 87 | + monkeypatch.setenv(STACK_NAME_ENV, " ") |
| 88 | + assert static_user_agent_extra() == f"md/{SOLUTION_ID}#{COMPONENT}" |
| 89 | + |
| 90 | + |
| 91 | +class TestTraceState: |
| 92 | + def test_default_none(self): |
| 93 | + assert get_trace() is None |
| 94 | + |
| 95 | + def test_set_and_get(self): |
| 96 | + set_trace("01KTVYABCDEF") |
| 97 | + assert get_trace() == "01KTVYABCDEF" |
| 98 | + |
| 99 | + def test_sanitized_on_read(self): |
| 100 | + set_trace("trace/with#bad chars") |
| 101 | + assert get_trace() == "trace-with-bad-chars" |
| 102 | + |
| 103 | + def test_none_and_empty_clear(self): |
| 104 | + set_trace("x") |
| 105 | + set_trace(None) |
| 106 | + assert get_trace() is None |
| 107 | + set_trace("y") |
| 108 | + set_trace("") |
| 109 | + assert get_trace() is None |
| 110 | + |
| 111 | + def test_thread_safe_set(self): |
| 112 | + # Smoke test: concurrent set_trace calls must not corrupt state. |
| 113 | + def _spin(val: str): |
| 114 | + for _ in range(200): |
| 115 | + set_trace(val) |
| 116 | + |
| 117 | + threads = [threading.Thread(target=_spin, args=(f"t{i}",)) for i in range(4)] |
| 118 | + for t in threads: |
| 119 | + t.start() |
| 120 | + for t in threads: |
| 121 | + t.join() |
| 122 | + assert get_trace() in {"t0", "t1", "t2", "t3"} |
| 123 | + |
| 124 | + |
| 125 | +class TestClientConfig: |
| 126 | + def test_config_carries_static_extra(self, monkeypatch): |
| 127 | + monkeypatch.setenv(STACK_NAME_ENV, "mystack") |
| 128 | + cfg = client_config() |
| 129 | + assert cfg.user_agent_extra == static_user_agent_extra() |
| 130 | + |
| 131 | + |
| 132 | +class TestWireCapture: |
| 133 | + """Capture the actual outbound User-Agent header at the wire layer. |
| 134 | +
|
| 135 | + Uses a real botocore client with fake credentials and a registered |
| 136 | + ``before-send`` stub that short-circuits the HTTP send by returning a |
| 137 | + canned AWSResponse — no network, no moto. |
| 138 | + """ |
| 139 | + |
| 140 | + @pytest.fixture() |
| 141 | + def capture(self, monkeypatch): |
| 142 | + import boto3 |
| 143 | + from botocore.awsrequest import AWSResponse |
| 144 | + |
| 145 | + monkeypatch.setenv(STACK_NAME_ENV, "backgroundagent-dev") |
| 146 | + |
| 147 | + session = boto3.Session( |
| 148 | + aws_access_key_id="testing", |
| 149 | + aws_secret_access_key="testing", |
| 150 | + region_name="us-east-1", |
| 151 | + ) |
| 152 | + client = session.client("sts", config=client_config()) |
| 153 | + register_trace_appender(client.meta.events) |
| 154 | + |
| 155 | + captured: list[str] = [] |
| 156 | + |
| 157 | + def _short_circuit(request, **kwargs): |
| 158 | + # At the before-send stage the prepared request's header values |
| 159 | + # can be bytes; normalize so assertions read naturally. |
| 160 | + value = request.headers["User-Agent"] |
| 161 | + captured.append(value.decode("ascii") if isinstance(value, bytes) else value) |
| 162 | + body = ( |
| 163 | + b"<GetCallerIdentityResponse " |
| 164 | + b'xmlns="https://sts.amazonaws.com/doc/2011-06-15/">' |
| 165 | + b"<GetCallerIdentityResult><Arn>arn:aws:iam::123456789012:user/t</Arn>" |
| 166 | + b"<UserId>AIDA</UserId><Account>123456789012</Account>" |
| 167 | + b"</GetCallerIdentityResult></GetCallerIdentityResponse>" |
| 168 | + ) |
| 169 | + return AWSResponse(url=request.url, status_code=200, headers={}, raw=_FakeRaw(body)) |
| 170 | + |
| 171 | + # register_last so it runs AFTER the trace appender (register order |
| 172 | + # within the same event is what guarantees we see the final header). |
| 173 | + client.meta.events.register_last("before-send.sts.GetCallerIdentity", _short_circuit) |
| 174 | + return client, captured |
| 175 | + |
| 176 | + def test_both_segments_intact_no_trace(self, capture): |
| 177 | + client, captured = capture |
| 178 | + client.get_caller_identity() |
| 179 | + ua_header = captured[0] |
| 180 | + # Literal '/' in the app segment survived (raw path, NOT app-id field). |
| 181 | + assert f"app/{SOLUTION_ID}/backgroundagent-dev" in ua_header |
| 182 | + # Trace-absent: md segment ends exactly at the component label. |
| 183 | + assert ua_header.endswith(f"md/{SOLUTION_ID}#{COMPONENT}") |
| 184 | + assert not ua_header.endswith("#") |
| 185 | + |
| 186 | + def test_trace_appended_per_request_same_client(self, capture): |
| 187 | + client, captured = capture |
| 188 | + set_trace("01KTVYTRACE1") |
| 189 | + client.get_caller_identity() |
| 190 | + set_trace("01KTVYTRACE2") |
| 191 | + client.get_caller_identity() |
| 192 | + set_trace(None) |
| 193 | + client.get_caller_identity() |
| 194 | + assert captured[0].endswith(f"md/{SOLUTION_ID}#{COMPONENT}#01KTVYTRACE1") |
| 195 | + assert captured[1].endswith(f"md/{SOLUTION_ID}#{COMPONENT}#01KTVYTRACE2") |
| 196 | + assert captured[2].endswith(f"md/{SOLUTION_ID}#{COMPONENT}") |
| 197 | + |
| 198 | + def test_trace_sanitized_at_wire(self, capture): |
| 199 | + client, captured = capture |
| 200 | + set_trace("evil/trace#☃ value") |
| 201 | + client.get_caller_identity() |
| 202 | + assert captured[0].endswith(f"md/{SOLUTION_ID}#{COMPONENT}#evil-trace---value") |
| 203 | + |
| 204 | + |
| 205 | +class _FakeRaw: |
| 206 | + """Minimal raw-body shim for AWSResponse.""" |
| 207 | + |
| 208 | + def __init__(self, data: bytes): |
| 209 | + self._data = data |
| 210 | + |
| 211 | + def read(self, *args, **kwargs): |
| 212 | + data, self._data = self._data, b"" |
| 213 | + return data |
| 214 | + |
| 215 | + def stream(self, *args, **kwargs): # pragma: no cover - botocore fallback |
| 216 | + yield self.read() |
0 commit comments