-
Notifications
You must be signed in to change notification settings - Fork 10
feat: stream CLI log transcripts and run status to Socket backend #201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Benjamin Barslev Nielsen (barslev)
merged 17 commits into
main
from
barslev/streaming-logs
Jun 12, 2026
Merged
Changes from 11 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
681f078
feat: stream CLI logs to /python-cli-runs/* lifecycle endpoints
barslev b07e621
chore: drop per-batch size chunking to match upstream uploader
barslev 03b6126
chore: drop integration field from cli-run register payload
barslev d18795d
feat: link cli-run to its full_scan via report_run_id on finalize
barslev 0e8746d
chore: bump version to 2.2.87 for streaming logs feature
barslev 19216ce
feat: flip streaming logs to opt-in via --upload-logs
barslev 27d84d5
Merge remote-tracking branch 'origin/main' into barslev/streaming-logs
barslev 06982fb
chore: regenerate uv.lock for version 2.4.8
barslev 3b154de
feat: add --no-upload-logs to explicitly decline log upload
barslev 4c50f11
Merge remote-tracking branch 'origin/main' into barslev/streaming-logs
barslev 2b9a9b7
chore: bump version to 2.4.9
barslev 678518c
chore: bump __version__ to 2.4.9
barslev 900f74b
refactor: address PR review on streaming logs
barslev d630013
test(log_uploader): cover cross-thread emit during active flush
barslev 7561b2f
refactor(config): use store_const + mutually exclusive group for log-…
barslev a2d5733
refactor: push upload_logs tri-state down to the API boundary
barslev 008999a
fix(cli_run): broaden register exception handling to honor 'never bre…
barslev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| """Lifecycle helpers for a CLI run on the Socket backend. | ||
|
|
||
| A "run" represents a single CLI invocation. `register_cli_run` opens it and | ||
| returns a server-issued `run_id` when streaming is enabled; `finalize_cli_run` | ||
| closes it on exit. The run_id keys the rows that `BatchedLogUploader` POSTs to | ||
| `/python-cli-runs/<run_id>/logs` during the run so the dashboard can show | ||
| what the user saw in their terminal. | ||
|
|
||
| Streaming is opt-in via the `share_logs` field on register. The server may | ||
| also force-enable streaming for an org regardless of the client's request, | ||
| so the CLI always calls register and gates on the response's | ||
| `log_streaming_enabled` flag rather than the client's intent. | ||
|
|
||
| Both calls are best-effort: failures fall back to no-streaming and never | ||
| prevent the scan from running. | ||
| """ | ||
|
|
||
| import json | ||
| import logging | ||
| from typing import Optional | ||
|
|
||
| from .cli_client import CliClient | ||
| from .exceptions import APIFailure | ||
|
|
||
| log = logging.getLogger("socketcli") | ||
|
|
||
|
|
||
| def register_cli_run( | ||
| client: CliClient, | ||
| client_version: str, | ||
| share_logs: bool, | ||
| decline_logs: bool, | ||
|
BarrensZeppelin marked this conversation as resolved.
Outdated
|
||
| ) -> Optional[str]: | ||
| try: | ||
| resp = client.request( | ||
| path="python-cli-runs", | ||
| method="POST", | ||
| payload=json.dumps({ | ||
| "client_version": client_version, | ||
| "share_logs": share_logs, | ||
| "decline_logs": decline_logs, | ||
| }), | ||
| ) | ||
| except APIFailure as e: | ||
| log.debug(f"cli-run register failed (streaming disabled): {e}") | ||
| return None | ||
|
|
||
| try: | ||
| body = resp.json() | ||
| except (ValueError, json.JSONDecodeError) as e: | ||
| log.debug(f"cli-run register: bad JSON body: {e}") | ||
| return None | ||
|
|
||
| if not body.get("log_streaming_enabled"): | ||
| log.debug("cli-run register: log streaming not enabled by server") | ||
| return None | ||
|
|
||
| run_id = body.get("run_id") | ||
| if not isinstance(run_id, str) or not run_id: | ||
| log.debug(f"cli-run register: enabled but missing run_id in response: {body!r}") | ||
| return None | ||
| return run_id | ||
|
|
||
|
|
||
| def finalize_cli_run( | ||
| client: CliClient, | ||
| run_id: str, | ||
| status: str = "success", | ||
| report_run_id: Optional[str] = None, | ||
| ) -> None: | ||
| try: | ||
| client.request( | ||
| path=f"python-cli-runs/{run_id}/finalize", | ||
| method="POST", | ||
| payload=json.dumps({"status": status, "report_run_id": report_run_id}), | ||
| ) | ||
| except Exception as e: | ||
| log.debug(f"cli-run finalize failed (swallowed): {e}") | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| """Buffer the CLI's local log records and POST them in batches to | ||
| /python-cli-runs/<run_id>/logs so the dashboard's view of a CLI run | ||
| mirrors what the user sees in their terminal. | ||
|
|
||
| Behavior: | ||
| - daemon thread, 5s flush | ||
| - swallow all network errors (debug log only) | ||
| - skip empty buffers | ||
| - drain on shutdown | ||
| - at-most-once semantics (failed batches dropped, not retried) | ||
|
|
||
| A thread-local recursion guard prevents the uploader's own request-error | ||
| log lines (emitted by `cli_client.py`'s `socketdev` logger) from being | ||
| re-enqueued during a flush. | ||
| """ | ||
|
|
||
| import json | ||
| import logging | ||
| import threading | ||
| from datetime import datetime, timezone | ||
| from typing import Optional | ||
|
|
||
| from .cli_client import CliClient | ||
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
||
| _FLUSH_GUARD = threading.local() | ||
|
|
||
| _LEVEL_MAP = { | ||
| logging.DEBUG: "DEBUG", | ||
| logging.INFO: "INFO", | ||
| logging.WARNING: "WARN", | ||
| logging.ERROR: "ERROR", | ||
| logging.CRITICAL: "ERROR", | ||
|
BarrensZeppelin marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
|
|
||
| def _now_str() -> str: | ||
| return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] | ||
|
|
||
|
|
||
| class BatchedLogUploader: | ||
| def __init__( | ||
| self, | ||
| client: CliClient, | ||
| run_id: str, | ||
| flush_interval: float = 5.0, | ||
| ): | ||
| self._client = client | ||
| self._run_id = run_id | ||
| self._flush_interval = flush_interval | ||
| self._buf: list = [] | ||
| self._lock = threading.Lock() | ||
| self._stop = threading.Event() | ||
| self._thread: Optional[threading.Thread] = None | ||
|
|
||
| def add(self, entry: dict) -> None: | ||
| with self._lock: | ||
| self._buf.append(entry) | ||
|
|
||
| def start(self) -> None: | ||
| if self._thread is not None: | ||
| return | ||
| self._thread = threading.Thread( | ||
| target=self._run, | ||
| name=f"socket-log-uploader-{self._run_id[:8]}", | ||
| daemon=True, | ||
| ) | ||
| self._thread.start() | ||
|
|
||
| def stop(self, timeout: float = 2.0) -> None: | ||
| if self._thread is None: | ||
| self._flush() | ||
| return | ||
| self._stop.set() | ||
| self._thread.join(timeout=timeout) | ||
| self._thread = None | ||
|
BarrensZeppelin marked this conversation as resolved.
Outdated
|
||
| self._flush() | ||
|
|
||
| def _run(self) -> None: | ||
| while not self._stop.is_set(): | ||
| self._flush() | ||
| self._stop.wait(self._flush_interval) | ||
|
|
||
| def _flush(self) -> None: | ||
| with self._lock: | ||
| if not self._buf: | ||
| return | ||
| batch = self._buf | ||
| self._buf = [] | ||
|
|
||
| _FLUSH_GUARD.active = True | ||
| try: | ||
| self._client.request( | ||
| path=f"python-cli-runs/{self._run_id}/logs", | ||
| method="POST", | ||
| payload=json.dumps({"logs": batch}), | ||
| ) | ||
| except Exception as e: | ||
| log.debug(f"log upload failed (swallowed, {len(batch)} entries dropped): {e}") | ||
| finally: | ||
| _FLUSH_GUARD.active = False | ||
|
|
||
|
|
||
| class UploadingLogHandler(logging.Handler): | ||
| def __init__(self, uploader: BatchedLogUploader, context: str = "socket-python-cli"): | ||
| super().__init__() | ||
| self._uploader = uploader | ||
| self._context = context | ||
|
|
||
| def emit(self, record: logging.LogRecord) -> None: | ||
| if getattr(_FLUSH_GUARD, "active", False): | ||
| return | ||
|
BarrensZeppelin marked this conversation as resolved.
|
||
| try: | ||
| self._uploader.add({ | ||
| "timestamp": _now_str(), | ||
| "level": _LEVEL_MAP.get(record.levelno, "INFO"), | ||
| "message": self.format(record), | ||
| "context": self._context, | ||
| }) | ||
| except Exception: | ||
| self.handleError(record) | ||
|
BarrensZeppelin marked this conversation as resolved.
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| """Wire the server log streaming pipeline for one CLI run. | ||
|
|
||
| `setup_streaming` registers the run with the backend, attaches handlers that | ||
| route the CLI's own log output through both the local terminal and a batched | ||
| uploader, and forces the loggers into DEBUG so the upload captures everything | ||
| regardless of local terminal verbosity. | ||
|
|
||
| Returns a teardown callable to invoke on exit (typically via `atexit.register`). | ||
| Returns None if registration failed; in that case nothing was wired up. | ||
| """ | ||
|
|
||
| import logging | ||
| from typing import Callable, Optional | ||
|
|
||
| from .cli_client import CliClient | ||
| from .cli_run import finalize_cli_run, register_cli_run | ||
| from .log_uploader import BatchedLogUploader, UploadingLogHandler | ||
|
|
||
| _run_status: str = "success" | ||
| _report_run_id: Optional[str] = None | ||
|
|
||
|
|
||
| def set_run_status(status: str) -> None: | ||
| global _run_status | ||
| _run_status = status | ||
|
|
||
|
|
||
| def set_report_run_id(report_run_id: Optional[str]) -> None: | ||
| global _report_run_id | ||
| _report_run_id = report_run_id | ||
|
|
||
|
|
||
| def setup_streaming( | ||
|
BarrensZeppelin marked this conversation as resolved.
Outdated
|
||
| *, | ||
| client: CliClient, | ||
| cli_logger: logging.Logger, | ||
| sdk_logger: logging.Logger, | ||
| client_version: str, | ||
| share_logs: bool, | ||
| decline_logs: bool, | ||
| enable_debug: bool, | ||
| ) -> Optional[Callable[[], None]]: | ||
| run_id = register_cli_run( | ||
| client, | ||
| client_version=client_version, | ||
| share_logs=share_logs, | ||
| decline_logs=decline_logs, | ||
| ) | ||
| if not run_id: | ||
| cli_logger.debug("server log streaming not active for this run") | ||
| return None | ||
|
|
||
| log_uploader = BatchedLogUploader(client, run_id) | ||
| log_uploader.start() | ||
| upload_handler = UploadingLogHandler(log_uploader, context="socket-python-cli") | ||
| upload_handler.setFormatter(logging.Formatter("%(message)s")) | ||
|
|
||
| terminal_handler = logging.StreamHandler() | ||
| terminal_handler.setLevel(logging.DEBUG if enable_debug else logging.INFO) | ||
| terminal_handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s")) | ||
|
|
||
| saved_levels = (cli_logger.level, sdk_logger.level) | ||
| saved_propagate = (cli_logger.propagate, sdk_logger.propagate) | ||
| cli_logger.setLevel(logging.DEBUG) | ||
| sdk_logger.setLevel(logging.DEBUG) | ||
| cli_logger.propagate = False | ||
| sdk_logger.propagate = False | ||
| cli_logger.addHandler(terminal_handler) | ||
| sdk_logger.addHandler(terminal_handler) | ||
| cli_logger.addHandler(upload_handler) | ||
| sdk_logger.addHandler(upload_handler) | ||
|
BarrensZeppelin marked this conversation as resolved.
Outdated
|
||
|
|
||
| cli_logger.debug(f"server log streaming enabled (run_id={run_id})") | ||
|
|
||
| def teardown() -> None: | ||
| cli_logger.removeHandler(upload_handler) | ||
| sdk_logger.removeHandler(upload_handler) | ||
| log_uploader.stop() | ||
| finalize_cli_run(client, run_id, status=_run_status, report_run_id=_report_run_id) | ||
| cli_logger.removeHandler(terminal_handler) | ||
| sdk_logger.removeHandler(terminal_handler) | ||
| cli_logger.setLevel(saved_levels[0]) | ||
| sdk_logger.setLevel(saved_levels[1]) | ||
| cli_logger.propagate = saved_propagate[0] | ||
| sdk_logger.propagate = saved_propagate[1] | ||
|
BarrensZeppelin marked this conversation as resolved.
Outdated
|
||
|
|
||
| return teardown | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.