|
9 | 9 |
|
10 | 10 | class UrlRedactingFilter(logging.Filter): |
11 | 11 | """Redacts URLs in log messages to prevent credential leaks.""" |
12 | | - |
13 | | - URL_PATTERN = re.compile(r'https?://[^\s]+') |
14 | | - |
| 12 | + |
| 13 | + # Matches http/https, optional subdomains, mcp.gumloop.com, and the rest of the path |
| 14 | + # (?:[a-zA-Z0-9-]+\.)* matches optional subdomains before mcp.gumloop.com |
| 15 | + URL_PATTERN = re.compile(r"https?://(?:[a-zA-Z0-9-]+\.)*mcp\.gumloop\.com[^\s]*") |
| 16 | + |
15 | 17 | @staticmethod |
16 | 18 | def _redact_url(match): |
17 | 19 | """Redact a URL to only show scheme and netloc.""" |
18 | 20 | url = match.group(0) |
19 | 21 | parsed = urlparse(url) |
20 | 22 | return f"{parsed.scheme}://{parsed.netloc}" |
21 | | - |
| 23 | + |
22 | 24 | def filter(self, record: logging.LogRecord) -> bool: |
23 | 25 | if isinstance(record.msg, str): |
24 | 26 | record.msg = self.URL_PATTERN.sub(self._redact_url, record.msg) |
25 | | - |
| 27 | + |
26 | 28 | if record.args: |
27 | 29 | if isinstance(record.args, dict): |
28 | | - record.args = { |
29 | | - k: self.URL_PATTERN.sub(self._redact_url, str(v)) |
30 | | - for k, v in record.args.items() |
31 | | - } |
| 30 | + new_args = {} |
| 31 | + for k, v in record.args.items(): |
| 32 | + if isinstance(v, str): |
| 33 | + new_args[k] = self.URL_PATTERN.sub(self._redact_url, v) |
| 34 | + else: |
| 35 | + new_args[k] = v |
| 36 | + record.args = new_args |
32 | 37 | else: |
33 | 38 | record.args = tuple( |
34 | | - self.URL_PATTERN.sub(self._redact_url, str(arg)) |
| 39 | + ( |
| 40 | + self.URL_PATTERN.sub(self._redact_url, arg) |
| 41 | + if isinstance(arg, str) |
| 42 | + else arg |
| 43 | + ) |
35 | 44 | for arg in record.args |
36 | 45 | ) |
37 | | - |
| 46 | + |
38 | 47 | # Sanitize exception info if present |
39 | 48 | if record.exc_info and record.exc_info[1]: |
40 | 49 | exc = record.exc_info[1] |
41 | | - if hasattr(exc, 'args') and exc.args: |
| 50 | + if hasattr(exc, "args") and exc.args: |
42 | 51 | redacted_args = tuple( |
43 | | - self.URL_PATTERN.sub(self._redact_url, str(arg)) if isinstance(arg, str) else arg |
| 52 | + ( |
| 53 | + self.URL_PATTERN.sub(self._redact_url, arg) |
| 54 | + if isinstance(arg, str) |
| 55 | + else arg |
| 56 | + ) |
44 | 57 | for arg in exc.args |
45 | 58 | ) |
46 | 59 | exc.args = redacted_args |
47 | | - |
| 60 | + |
48 | 61 | return True |
49 | 62 |
|
50 | 63 |
|
51 | 64 | def redact_url_logs(logger: logging.Logger) -> None: |
52 | 65 | """Add URL redacting filter to logger to strip paths and credentials from URLs.""" |
53 | 66 | logger.addFilter(UrlRedactingFilter()) |
54 | | - |
|
0 commit comments