-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathlog_uploader.py
More file actions
112 lines (93 loc) · 3.21 KB
/
Copy pathlog_uploader.py
File metadata and controls
112 lines (93 loc) · 3.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Buffer the CLI's local log records and POST them in batches to
/python-cli-runs/<run_id>/logs so the dashboard's view of a CLI run
mirrors what the user sees in their terminal.
Behavior:
- daemon thread, 5s flush
- swallow all network errors (debug log only)
- skip empty buffers
- drain on shutdown
- at-most-once semantics (failed batches dropped, not retried)
A thread-local recursion guard prevents the uploader's own request-error
log lines (emitted by `cli_client.py`'s `socketdev` logger) from being
re-enqueued during a flush.
"""
import json
import logging
import threading
from datetime import datetime, timezone
from typing import Optional
from .cli_client import CliClient
log = logging.getLogger(__name__)
_FLUSH_GUARD = threading.local()
def _now_str() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
class BatchedLogUploader:
def __init__(
self,
client: CliClient,
run_id: str,
flush_interval: float = 5.0,
):
self._client = client
self._run_id = run_id
self._flush_interval = flush_interval
self._buf: list = []
self._lock = threading.Lock()
self._stop = threading.Event()
self._thread: Optional[threading.Thread] = None
def add(self, entry: dict) -> None:
with self._lock:
self._buf.append(entry)
def start(self) -> None:
if self._thread is not None:
return
self._thread = threading.Thread(
target=self._run,
name=f"socket-log-uploader-{self._run_id[:8]}",
daemon=True,
)
self._thread.start()
def stop(self, timeout: float = 2.0) -> None:
if self._thread is not None:
self._stop.set()
self._thread.join(timeout=timeout)
self._thread = None
self._flush()
def _run(self) -> None:
while not self._stop.is_set():
self._flush()
self._stop.wait(self._flush_interval)
def _flush(self) -> None:
with self._lock:
if not self._buf:
return
batch = self._buf
self._buf = []
_FLUSH_GUARD.active = True
try:
self._client.request(
path=f"python-cli-runs/{self._run_id}/logs",
method="POST",
payload=json.dumps({"logs": batch}),
)
except Exception as e:
log.debug(f"log upload failed (swallowed, {len(batch)} entries dropped): {e}")
finally:
_FLUSH_GUARD.active = False
class UploadingLogHandler(logging.Handler):
def __init__(self, uploader: BatchedLogUploader, context: str = "socket-python-cli"):
super().__init__()
self._uploader = uploader
self._context = context
def emit(self, record: logging.LogRecord) -> None:
if getattr(_FLUSH_GUARD, "active", False):
return
try:
self._uploader.add({
"timestamp": _now_str(),
"level": logging.getLevelName(record.levelno),
"message": self.format(record),
"context": self._context,
})
except Exception:
self.handleError(record)