|
| 1 | +import json |
| 2 | +import logging |
| 3 | +import os |
| 4 | +import shutil |
| 5 | +import time |
| 6 | +from typing import Any, Optional |
| 7 | + |
| 8 | +from requests import post |
| 9 | +from requests.exceptions import ConnectionError as ReqConnectionError |
| 10 | +from testcontainers.core.container import DockerContainer |
| 11 | + |
| 12 | +logger = logging.getLogger(__name__) |
| 13 | + |
| 14 | + |
| 15 | +class WeaverContainer(DockerContainer): |
| 16 | + def __init__( |
| 17 | + self, |
| 18 | + schema_version: Optional[str] = None, |
| 19 | + report_dir: Optional[str] = None, |
| 20 | + weaver_version: str = "v0.20.0", |
| 21 | + policies_dir: Optional[str] = None, |
| 22 | + templates_dir: Optional[str] = None, |
| 23 | + inactivity_timeout: int = 30, |
| 24 | + ): |
| 25 | + try: |
| 26 | + image = f"otel/weaver:{weaver_version}" |
| 27 | + super().__init__(image) # type: ignore |
| 28 | + self._ready = False |
| 29 | + self._stopped = False |
| 30 | + self._clean_report_dir = report_dir is None |
| 31 | + |
| 32 | + self.with_exposed_ports(4317, 4320) # type: ignore |
| 33 | + self.with_bind_ports(4317, 4317) |
| 34 | + self.with_bind_ports(4320, 4320) |
| 35 | + self.with_name("weaver-live-check") |
| 36 | + |
| 37 | + command = f"registry live-check --inactivity-timeout={inactivity_timeout} --format json" |
| 38 | + |
| 39 | + if report_dir is not None: |
| 40 | + self._report_dir = report_dir |
| 41 | + else: |
| 42 | + self._report_dir = "./weaver-report" |
| 43 | + self._clean_report_dir = True |
| 44 | + |
| 45 | + self._report_dir = os.path.abspath(self._report_dir) |
| 46 | + os.makedirs(self._report_dir, exist_ok=True) |
| 47 | + |
| 48 | + command += " --output /weaver/report" |
| 49 | + self.with_volume_mapping( |
| 50 | + self._report_dir, "/weaver/report", mode="rw" |
| 51 | + ) |
| 52 | + logger.debug("Mapped report directory: %s", self._report_dir) |
| 53 | + |
| 54 | + if policies_dir: |
| 55 | + policies_dir = os.path.abspath(policies_dir) |
| 56 | + command += " --advice-policies /weaver/policies" |
| 57 | + self.with_volume_mapping( |
| 58 | + policies_dir, "/weaver/policies", mode="ro" |
| 59 | + ) |
| 60 | + logger.debug("Mapped policies directory: %s", policies_dir) |
| 61 | + if templates_dir: |
| 62 | + templates_dir = os.path.abspath(templates_dir) |
| 63 | + command += " --templates /weaver/templates" |
| 64 | + self.with_volume_mapping( |
| 65 | + templates_dir, "/weaver/templates", mode="ro" |
| 66 | + ) |
| 67 | + logger.debug("Mapped templates directory: %s", templates_dir) |
| 68 | + |
| 69 | + if schema_version: |
| 70 | + command += f" --registry https://github.com/open-telemetry/semantic-conventions/archive/refs/tags/v{schema_version}.tar.gz[model]" |
| 71 | + |
| 72 | + self.with_command(command) |
| 73 | + logger.debug("Weaver command: %s", command) |
| 74 | + except Exception as e: |
| 75 | + logger.error("Error initializing WeaverContainer: %s", e) |
| 76 | + raise |
| 77 | + |
| 78 | + def start(self, timeout: int = 60) -> "WeaverContainer": |
| 79 | + # remove files from report dir before starting |
| 80 | + if os.path.exists(self._report_dir): |
| 81 | + logger.debug("Cleaning up report directory: %s", self._report_dir) |
| 82 | + shutil.rmtree(self._report_dir) |
| 83 | + |
| 84 | + logger.debug("Starting Weaver container...") |
| 85 | + super().start() |
| 86 | + try: |
| 87 | + self._wait_for_ready(timeout=timeout) |
| 88 | + except Exception as e: |
| 89 | + logger.error( |
| 90 | + "Error while waiting for Weaver container to be ready, %s", e |
| 91 | + ) |
| 92 | + raise |
| 93 | + self._ready = True |
| 94 | + return self |
| 95 | + |
| 96 | + def get_otlp_endpoint(self) -> str: |
| 97 | + host = self.get_container_host_ip() |
| 98 | + port = self.get_exposed_port(4317) # type: ignore |
| 99 | + return f"http://{host}:{port}" |
| 100 | + |
| 101 | + def _wait_for_ready(self, timeout: int = 60) -> None: |
| 102 | + for i in range(timeout): |
| 103 | + try: |
| 104 | + # can't get exposed port before container is fully started |
| 105 | + response = post("http://localhost:4320", timeout=5) |
| 106 | + if response.status_code == 404: |
| 107 | + return |
| 108 | + logger.debug( |
| 109 | + "Weaver live-check container not ready yet, status %s, try %s", |
| 110 | + response.status_code, |
| 111 | + i, |
| 112 | + ) |
| 113 | + except ReqConnectionError as e: |
| 114 | + logger.debug("Health check exception: %s", e) |
| 115 | + pass |
| 116 | + time.sleep(1) |
| 117 | + raise TimeoutError( |
| 118 | + "Weaver live-check container did not become ready in time" |
| 119 | + ) |
| 120 | + |
| 121 | + def end_live_check(self, timeout: int = 30) -> dict[str, Any]: |
| 122 | + if self._stopped: |
| 123 | + return {} |
| 124 | + self._stopped = True |
| 125 | + |
| 126 | + try: |
| 127 | + if self._ready: |
| 128 | + response = post("http://localhost:4320/stop", timeout=5) |
| 129 | + response.raise_for_status() |
| 130 | + logger.debug("Weaver live-check stopped successfully") |
| 131 | + result = self.get_wrapped_container().wait(timeout=timeout) |
| 132 | + exit_code = str(result["StatusCode"]) |
| 133 | + else: |
| 134 | + exit_code = "container could not start" |
| 135 | + |
| 136 | + if exit_code == "0": |
| 137 | + return self._read_report() |
| 138 | + |
| 139 | + self._clean_report_dir = False # keep report for debugging |
| 140 | + logs = self._read_weaver_logs() |
| 141 | + violations = self._read_violations() |
| 142 | + error_message = ( |
| 143 | + f"violations: {violations}" if violations else f"logs: {logs}" |
| 144 | + ) |
| 145 | + raise Exception( |
| 146 | + f"Exited with non-zero status: {exit_code}, {error_message}" |
| 147 | + ) |
| 148 | + except Exception as e: |
| 149 | + self._clean_report_dir = False # keep report for debugging |
| 150 | + logs = self._read_weaver_logs() |
| 151 | + logger.error( |
| 152 | + "Error during weaver live-check: %s, logs: %s", e, logs |
| 153 | + ) |
| 154 | + raise |
| 155 | + |
| 156 | + def _read_violations(self) -> str: |
| 157 | + try: |
| 158 | + violations_path = os.path.join(self._report_dir, "violations.md") |
| 159 | + with open(violations_path, "r") as f: |
| 160 | + content = f.read().strip() |
| 161 | + logger.debug("Weaver violations report content: %s", content) |
| 162 | + return content |
| 163 | + except Exception as e: |
| 164 | + logger.error("Could not read violations report: %s", e) |
| 165 | + raise |
| 166 | + |
| 167 | + def _read_report(self) -> dict[str, Any]: |
| 168 | + try: |
| 169 | + report_path = os.path.join(self._report_dir, "full_report.json") |
| 170 | + with open(report_path, "r") as f: |
| 171 | + report_content = f.read() |
| 172 | + return json.loads(report_content) |
| 173 | + except Exception as e: |
| 174 | + logger.error("Error checking Weaver report: %s", e) |
| 175 | + raise |
| 176 | + |
| 177 | + def _read_weaver_logs(self) -> Optional[str]: |
| 178 | + try: |
| 179 | + (err, out) = self.get_logs() |
| 180 | + logs = f"{err.decode('utf-8')}\n{out.decode('utf-8')}" |
| 181 | + logger.debug("Weaver live-check logs: %s", logs) |
| 182 | + return logs |
| 183 | + except Exception as e: |
| 184 | + logger.error("Could not get weaver logs: %s", e) |
| 185 | + return None |
| 186 | + |
| 187 | + def stop(self, force: bool = True, delete_volume: bool = True) -> None: |
| 188 | + try: |
| 189 | + self.end_live_check() |
| 190 | + finally: |
| 191 | + super().stop(force, delete_volume) |
| 192 | + if self._clean_report_dir: |
| 193 | + try: |
| 194 | + shutil.rmtree(self._report_dir) |
| 195 | + except Exception: |
| 196 | + pass |
0 commit comments