-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathstreaming.py
More file actions
87 lines (70 loc) · 2.94 KB
/
Copy pathstreaming.py
File metadata and controls
87 lines (70 loc) · 2.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
"""Wire the server log streaming pipeline for one CLI run.
`setup_streaming` registers the run with the backend, attaches handlers that
route the CLI's own log output through both the local terminal and a batched
uploader, and forces the loggers into DEBUG so the upload captures everything
regardless of local terminal verbosity.
Returns a teardown callable to invoke on exit (typically via `atexit.register`).
Returns None if registration failed; in that case nothing was wired up.
"""
import logging
from typing import Callable, Optional
from .cli_client import CliClient
from .cli_run import finalize_cli_run, register_cli_run
from .log_uploader import BatchedLogUploader, UploadingLogHandler
_run_status: str = "success"
_report_run_id: Optional[str] = None
def set_run_status(status: str) -> None:
global _run_status
_run_status = status
def set_report_run_id(report_run_id: Optional[str]) -> None:
global _report_run_id
_report_run_id = report_run_id
def setup_streaming(
*,
client: CliClient,
cli_logger: logging.Logger,
sdk_logger: logging.Logger,
client_version: str,
share_logs: bool,
decline_logs: bool,
enable_debug: bool,
) -> Optional[Callable[[], None]]:
run_id = register_cli_run(
client,
client_version=client_version,
share_logs=share_logs,
decline_logs=decline_logs,
)
if not run_id:
cli_logger.debug("server log streaming not active for this run")
return None
log_uploader = BatchedLogUploader(client, run_id)
log_uploader.start()
upload_handler = UploadingLogHandler(log_uploader, context="socket-python-cli")
upload_handler.setFormatter(logging.Formatter("%(message)s"))
terminal_handler = logging.StreamHandler()
terminal_handler.setLevel(logging.DEBUG if enable_debug else logging.INFO)
terminal_handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
saved_levels = (cli_logger.level, sdk_logger.level)
saved_propagate = (cli_logger.propagate, sdk_logger.propagate)
cli_logger.setLevel(logging.DEBUG)
sdk_logger.setLevel(logging.DEBUG)
cli_logger.propagate = False
sdk_logger.propagate = False
cli_logger.addHandler(terminal_handler)
sdk_logger.addHandler(terminal_handler)
cli_logger.addHandler(upload_handler)
sdk_logger.addHandler(upload_handler)
cli_logger.debug(f"server log streaming enabled (run_id={run_id})")
def teardown() -> None:
cli_logger.removeHandler(upload_handler)
sdk_logger.removeHandler(upload_handler)
log_uploader.stop()
finalize_cli_run(client, run_id, status=_run_status, report_run_id=_report_run_id)
cli_logger.removeHandler(terminal_handler)
sdk_logger.removeHandler(terminal_handler)
cli_logger.setLevel(saved_levels[0])
sdk_logger.setLevel(saved_levels[1])
cli_logger.propagate = saved_propagate[0]
sdk_logger.propagate = saved_propagate[1]
return teardown