|
| 1 | +"""Internal log-format primitives for test-result simulation logs. |
| 2 | +
|
| 3 | +Two files per run: |
| 4 | +
|
| 5 | +* **Log file** (e.g. ``foo.jsonl``) - append-only record of each logged API call, |
| 6 | + one line per call. Written by :func:`log_request_to_file` in the test process |
| 7 | + and read by :func:`iter_log_data_lines` / the replay subprocess. Has no header: |
| 8 | + every line is a data line. |
| 9 | +* **Tracking sidecar** (``foo.jsonl.tracking``) - small JSON file holding the |
| 10 | + incremental replay cursor (``lastUploadedLine``) and the simulated-to-real ID |
| 11 | + map. Written only by the replay subprocess via :meth:`LogTracking.save` using |
| 12 | + a temp-file + ``os.replace`` so a crash can't leave a half-written sidecar. |
| 13 | + Read once at replay start via :meth:`LogTracking.load`. Never touched by the |
| 14 | + test process. |
| 15 | +
|
| 16 | +# Concurrency |
| 17 | +
|
| 18 | +With tracking moved out of the main log, the log file becomes strictly |
| 19 | +append-only and has exactly one in-place mutator (the writer) and one scanner |
| 20 | +(the replay subprocess). POSIX guarantees that an ``O_APPEND`` write atomically |
| 21 | +bumps the EOF, so parallel writers can't lose data. To keep a concurrent reader |
| 22 | +from observing a mid-append partial final line we still take ``LOCK_EX`` on the |
| 23 | +writer's single append and ``LOCK_SH`` on the reader's ``readlines()``; there |
| 24 | +is never any exclusive-vs-exclusive contention because nothing rewrites the |
| 25 | +file any more. |
| 26 | +
|
| 27 | +The sidecar has a single writer (the replay subprocess) and no live reader, so |
| 28 | +it needs no locking. Atomic rename is still used to keep the on-disk contents |
| 29 | +valid across crashes. |
| 30 | +
|
| 31 | +``flock`` is advisory, so this contract only holds for processes that use these |
| 32 | +helpers; ad-hoc writers are not protected. |
| 33 | +""" |
| 34 | + |
| 35 | +from __future__ import annotations |
| 36 | + |
| 37 | +import fcntl |
| 38 | +import json |
| 39 | +import os |
| 40 | +import re |
| 41 | +from dataclasses import dataclass, field |
| 42 | +from pathlib import Path |
| 43 | +from typing import TYPE_CHECKING, Any, Generator |
| 44 | + |
| 45 | +from google.protobuf import json_format |
| 46 | + |
| 47 | +if TYPE_CHECKING: |
| 48 | + from sift_client.sift_types.test_report import TestMeasurement, TestReport, TestStep |
| 49 | + |
| 50 | + |
| 51 | +def _client_version() -> str: |
| 52 | + from importlib.metadata import PackageNotFoundError, version |
| 53 | + |
| 54 | + try: |
| 55 | + return version("sift_stack_py") |
| 56 | + except PackageNotFoundError: |
| 57 | + return "unknown" |
| 58 | + |
| 59 | + |
| 60 | +@dataclass |
| 61 | +class LogTracking: |
| 62 | + """Incremental-replay cursor and simulated-to-real ID map. |
| 63 | +
|
| 64 | + Persisted beside the log file (see module docstring for layout). The log |
| 65 | + file itself is append-only and stores only API-call data lines. |
| 66 | +
|
| 67 | + * ``last_uploaded_line`` is the count of data lines that have been |
| 68 | + successfully replayed against the server. Each data line corresponds to a |
| 69 | + single API call, so line granularity matches the atomic unit of work: a |
| 70 | + line is either fully replayed or must be retried in its entirety. Data |
| 71 | + lines are strictly append-only, so this counter is stable across runs. |
| 72 | + * ``id_map`` maps simulated response IDs (created during the original test |
| 73 | + run) to the real IDs assigned by the server during replay. Subsequent |
| 74 | + ``Update*`` entries consult this map to translate IDs. |
| 75 | + """ |
| 76 | + |
| 77 | + last_uploaded_line: int = 0 |
| 78 | + id_map: dict[str, str] = field(default_factory=dict) |
| 79 | + client_version: str = field(default_factory=_client_version) |
| 80 | + |
| 81 | + @staticmethod |
| 82 | + def sidecar_path(log_path: str | Path) -> Path: |
| 83 | + """Return the sidecar path for a given log file (``<log>.tracking``).""" |
| 84 | + p = Path(log_path) |
| 85 | + return p.with_name(p.name + ".tracking") |
| 86 | + |
| 87 | + @classmethod |
| 88 | + def load(cls, log_path: str | Path) -> LogTracking: |
| 89 | + """Read tracking state for ``log_path``; return a fresh instance if missing or corrupt. |
| 90 | +
|
| 91 | + A missing sidecar is the normal state before the first incremental tick. |
| 92 | + A malformed sidecar is treated the same so a crash mid-write can't brick |
| 93 | + replay; the worst case is a re-replay of already-uploaded lines, which |
| 94 | + the server must be prepared for anyway. |
| 95 | + """ |
| 96 | + sidecar = cls.sidecar_path(log_path) |
| 97 | + try: |
| 98 | + data = json.loads(sidecar.read_text()) |
| 99 | + except (FileNotFoundError, json.JSONDecodeError, OSError): |
| 100 | + return cls() |
| 101 | + return cls( |
| 102 | + last_uploaded_line=data.get("lastUploadedLine", 0), |
| 103 | + id_map=data.get("idMap", {}), |
| 104 | + client_version=data.get("clientVersion", "unknown"), |
| 105 | + ) |
| 106 | + |
| 107 | + def save(self, log_path: str | Path) -> None: |
| 108 | + """Atomically write tracking state to the sidecar for ``log_path``. |
| 109 | +
|
| 110 | + Uses temp-file + ``os.replace`` so readers (and crash recovery) never |
| 111 | + observe a partially written sidecar. |
| 112 | + """ |
| 113 | + sidecar = self.sidecar_path(log_path) |
| 114 | + sidecar.parent.mkdir(parents=True, exist_ok=True) |
| 115 | + payload = json.dumps( |
| 116 | + { |
| 117 | + "clientVersion": self.client_version, |
| 118 | + "lastUploadedLine": self.last_uploaded_line, |
| 119 | + "idMap": self.id_map, |
| 120 | + }, |
| 121 | + separators=(",", ":"), |
| 122 | + ) |
| 123 | + tmp = sidecar.with_name(sidecar.name + ".tmp") |
| 124 | + tmp.write_text(payload) |
| 125 | + os.replace(tmp, sidecar) |
| 126 | + |
| 127 | + |
| 128 | +@dataclass |
| 129 | +class _ReplayState: |
| 130 | + """Mutable state accumulated during log replay.""" |
| 131 | + |
| 132 | + report: TestReport | None = None |
| 133 | + steps_by_id: dict[str, TestStep] = field(default_factory=dict) |
| 134 | + steps_order: list[str] = field(default_factory=list) |
| 135 | + measurements_by_id: dict[str, TestMeasurement] = field(default_factory=dict) |
| 136 | + measurements_order: list[str] = field(default_factory=list) |
| 137 | + |
| 138 | + |
| 139 | +@dataclass |
| 140 | +class ReplayResult: |
| 141 | + """Result of replaying a log file.""" |
| 142 | + |
| 143 | + report: TestReport |
| 144 | + steps: list[TestStep] = field(default_factory=list) |
| 145 | + measurements: list[TestMeasurement] = field(default_factory=list) |
| 146 | + |
| 147 | + |
| 148 | +def log_request_to_file( |
| 149 | + log_file: str | Path, |
| 150 | + request_type: str, |
| 151 | + request: Any, |
| 152 | + response_id: str | None = None, |
| 153 | +) -> None: |
| 154 | + """Append a request as a JSON-encoded line to ``log_file``. |
| 155 | +
|
| 156 | + Takes ``LOCK_EX`` across the append so a concurrent reader holding |
| 157 | + ``LOCK_SH`` in :func:`iter_log_data_lines` can't see a mid-write partial |
| 158 | + final line. See the module docstring for the full concurrency model. |
| 159 | +
|
| 160 | + Args: |
| 161 | + log_file: Path to the log file. |
| 162 | + request_type: Type of request being logged. |
| 163 | + request: The protobuf request to log. |
| 164 | + response_id: Optional ID from the simulated response, embedded in the tag |
| 165 | + for create operations so replay can map previously simulated IDs used |
| 166 | + by simulated updates. |
| 167 | + """ |
| 168 | + log_path = Path(log_file) |
| 169 | + log_path.parent.mkdir(parents=True, exist_ok=True) |
| 170 | + tag = f"{request_type}:{response_id}" if response_id else request_type |
| 171 | + request_dict = json_format.MessageToDict(request) |
| 172 | + request_json = json.dumps(request_dict, separators=(",", ":")) |
| 173 | + line = f"[{tag}] {request_json}\n" |
| 174 | + with open(log_path, "a") as f: |
| 175 | + fcntl.flock(f, fcntl.LOCK_EX) |
| 176 | + # Closing the file flushes and releases the flock atomically; no |
| 177 | + # explicit unlock needed here. |
| 178 | + f.write(line) |
| 179 | + |
| 180 | + |
| 181 | +def iter_log_data_lines( |
| 182 | + log_path: Path, |
| 183 | + start_line: int = 0, |
| 184 | +) -> Generator[tuple[str, str | None, str], None, None]: |
| 185 | + """Parse data lines from a log file. |
| 186 | +
|
| 187 | + Yields ``(request_type, response_id, json_str)`` tuples. Each yielded item |
| 188 | + corresponds to one logged API call. |
| 189 | +
|
| 190 | + ``start_line`` is the count of data lines (1-based) already uploaded; the |
| 191 | + iterator skips the first ``start_line`` lines and yields the rest. Pass 0 |
| 192 | + to read all data lines. |
| 193 | +
|
| 194 | + Acquires ``LOCK_SH`` only while snapshotting the file into memory, then |
| 195 | + releases before yielding. Lines appended by a concurrent |
| 196 | + :func:`log_request_to_file` after the snapshot are not visible this call -- |
| 197 | + they will be picked up on the next invocation. |
| 198 | + """ |
| 199 | + line_pattern = re.compile(r"^\[(\w+)(?::([^\]]+))?\]\s*(.+)$") |
| 200 | + with open(log_path) as f: |
| 201 | + fcntl.flock(f, fcntl.LOCK_SH) |
| 202 | + raw_lines = f.readlines() |
| 203 | + |
| 204 | + data_line_count = 0 |
| 205 | + for raw_line in raw_lines: |
| 206 | + line = raw_line.strip() |
| 207 | + if not line: |
| 208 | + continue |
| 209 | + match = line_pattern.match(line) |
| 210 | + if not match: |
| 211 | + raise ValueError(f"Invalid log line: {line}") |
| 212 | + data_line_count += 1 |
| 213 | + if data_line_count <= start_line: |
| 214 | + continue |
| 215 | + yield (match.group(1), match.group(2), match.group(3)) |
0 commit comments