diff --git a/src/kimi_cli/__main__.py b/src/kimi_cli/__main__.py index 9f5cd4f38..42ade705a 100644 --- a/src/kimi_cli/__main__.py +++ b/src/kimi_cli/__main__.py @@ -10,8 +10,11 @@ def _prog_name() -> str: def main(argv: Sequence[str] | None = None) -> int | str | None: + from kimi_cli.telemetry.crash import install_crash_handlers, set_phase from kimi_cli.utils.proxy import normalize_proxy_env + # Install excepthook before anything else so startup-phase crashes are captured. + install_crash_handlers() normalize_proxy_env() args = list(sys.argv[1:] if argv is None else argv) @@ -28,6 +31,8 @@ def main(argv: Sequence[str] | None = None) -> int | str | None: return cli(args=args, prog_name=_prog_name()) except SystemExit as exc: return exc.code + finally: + set_phase("shutdown") if __name__ == "__main__": diff --git a/src/kimi_cli/acp/server.py b/src/kimi_cli/acp/server.py index 8461d2286..0b6b32538 100644 --- a/src/kimi_cli/acp/server.py +++ b/src/kimi_cli/acp/server.py @@ -58,6 +58,14 @@ async def initialize( ) self.client_capabilities = client_capabilities + if client_info is not None: + from kimi_cli.telemetry import set_client_info + + set_client_info( + name=client_info.name, + version=getattr(client_info, "version", None), + ) + # get command and args of current process for terminal-auth command = sys.argv[0] args: list[str] = [] @@ -155,6 +163,7 @@ async def new_session( cli_instance = await KimiCLI.create( session, mcp_configs=[mcp_config], + ui_mode="acp", ) config = cli_instance.soul.runtime.config acp_kaos = ACPKaos(self.conn, session.id, self.client_capabilities) @@ -225,6 +234,7 @@ async def _setup_session( session, mcp_configs=[mcp_config], resumed=True, # _setup_session loads existing sessions + ui_mode="acp", ) config = cli_instance.soul.runtime.config acp_kaos = ACPKaos(self.conn, session.id, self.client_capabilities) diff --git a/src/kimi_cli/app.py b/src/kimi_cli/app.py index 12cc45895..684d447c6 100644 --- a/src/kimi_cli/app.py +++ b/src/kimi_cli/app.py @@ -4,6 +4,7 @@ import contextlib import dataclasses import sys +import time import warnings from collections.abc import AsyncGenerator, Callable from pathlib import Path @@ -14,10 +15,11 @@ from pydantic import SecretStr from kimi_cli.agentspec import DEFAULT_AGENT_FILE -from kimi_cli.auth.oauth import OAuthManager +from kimi_cli.auth.oauth import KIMI_CODE_OAUTH_KEY, OAuthManager, get_device_id from kimi_cli.background.models import is_terminal_status from kimi_cli.cli import InputFormat, OutputFormat from kimi_cli.config import Config, LLMModel, LLMProvider, load_config +from kimi_cli.constant import VERSION from kimi_cli.llm import augment_provider_with_env_vars, create_llm, model_display_name from kimi_cli.session import Session from kimi_cli.share import get_share_dir @@ -26,6 +28,7 @@ from kimi_cli.soul.context import Context from kimi_cli.soul.kimisoul import KimiSoul from kimi_cli.utils.aioqueue import QueueShutDown +from kimi_cli.utils.envvar import get_env_bool from kimi_cli.utils.logging import logger, open_original_stderr, redirect_stderr_to_logger from kimi_cli.utils.path import shorten_home from kimi_cli.wire import Wire, WireUISide @@ -126,6 +129,7 @@ async def create( yolo: bool = False, plan_mode: bool = False, resumed: bool = False, + ui_mode: str = "shell", # Extensions agent_file: Path | None = None, mcp_configs: list[MCPConfig] | list[dict[str, Any]] | None = None, @@ -174,10 +178,15 @@ async def create( MCPRuntimeError(KimiCLIException, RuntimeError): When any MCP server cannot be connected. """ + _create_t0 = time.monotonic() + _phase_timings_ms: dict[str, int] = {} + if startup_progress is not None: startup_progress("Loading configuration...") + _phase_t = time.monotonic() config = config if isinstance(config, Config) else load_config(config) + _phase_timings_ms["config_ms"] = int((time.monotonic() - _phase_t) * 1000) if max_steps_per_turn is not None: config.loop_control.max_steps_per_turn = max_steps_per_turn if max_retries_per_step is not None: @@ -186,6 +195,7 @@ async def create( config.loop_control.max_ralph_iterations = max_ralph_iterations logger.info("Loaded config: {config}", config=config) + _phase_t = time.monotonic() oauth = OAuthManager(config) bg_refresh_task = asyncio.create_task(_refresh_managed_models_silent(config)) @@ -248,6 +258,7 @@ async def create( runtime.notifications.recover() runtime.background_tasks.reconcile() _cleanup_stale_foreground_subagents(runtime) + _phase_timings_ms["init_ms"] = int((time.monotonic() - _phase_t) * 1000) # Refresh plugin configs with fresh credentials (e.g. OAuth tokens) try: @@ -268,12 +279,14 @@ async def create( if startup_progress is not None: startup_progress("Loading agent...") + _phase_t = time.monotonic() agent = await load_agent( agent_file, runtime, mcp_configs=mcp_configs or [], start_mcp_loading=not defer_mcp_loading, ) + _phase_timings_ms["mcp_ms"] = int((time.monotonic() - _phase_t) * 1000) if startup_progress is not None: startup_progress("Restoring conversation...") @@ -301,6 +314,47 @@ async def create( soul.set_hook_engine(hook_engine) runtime.hook_engine = hook_engine + # --- Initialize telemetry --- + from kimi_cli.telemetry import attach_sink, set_context + from kimi_cli.telemetry import disable as disable_telemetry + + telemetry_disabled = not config.telemetry or get_env_bool("KIMI_DISABLE_TELEMETRY") + if telemetry_disabled: + disable_telemetry() + else: + device_id = get_device_id() + set_context(device_id=device_id, session_id=session.id) + from kimi_cli.telemetry.sink import EventSink + from kimi_cli.telemetry.transport import AsyncTransport + + def _get_token() -> str | None: + return oauth.get_cached_access_token(KIMI_CODE_OAUTH_KEY) + + transport = AsyncTransport(device_id=device_id, get_access_token=_get_token) + sink = EventSink( + transport, + version=VERSION, + model=model.model if model else "", + ui_mode=ui_mode, + ) + attach_sink(sink) + + from kimi_cli.telemetry import track + from kimi_cli.telemetry.crash import install_asyncio_handler, set_phase + + # App init finished — enter runtime phase and hook asyncio crashes. + install_asyncio_handler() + set_phase("runtime") + + track("started", resumed=resumed, yolo=yolo) + track( + "startup_perf", + duration_ms=int((time.monotonic() - _create_t0) * 1000), + config_ms=_phase_timings_ms.get("config_ms", 0), + init_ms=_phase_timings_ms.get("init_ms", 0), + mcp_ms=_phase_timings_ms.get("mcp_ms", 0), + ) + return KimiCLI(soul, runtime, env_overrides, bg_refresh_task) def __init__( diff --git a/src/kimi_cli/auth/oauth.py b/src/kimi_cli/auth/oauth.py index 25da57125..29eb9b600 100644 --- a/src/kimi_cli/auth/oauth.py +++ b/src/kimi_cli/auth/oauth.py @@ -209,6 +209,9 @@ def get_device_id() -> str: device_id = uuid.uuid4().hex path.write_text(device_id, encoding="utf-8") _ensure_private_file(path) + from kimi_cli.telemetry import track + + track("first_launch") return device_id @@ -783,6 +786,10 @@ def _cache_access_token(self, ref: OAuthRef, token: OAuthToken) -> None: return self._access_tokens[ref.key] = token.access_token + def get_cached_access_token(self, key: str) -> str | None: + """Get a cached access token by key, or None if not available.""" + return self._access_tokens.get(key) + def common_headers(self) -> dict[str, str]: return _common_headers() @@ -961,15 +968,24 @@ async def _refresh_tokens( "OAuth credentials rejected: {error}", error=exc, ) + from kimi_cli.telemetry import track + + track("oauth_refresh", success=False, reason="unauthorized") return except Exception as exc: if force: raise logger.warning("Failed to refresh OAuth token: {error}", error=exc) + from kimi_cli.telemetry import track + + track("oauth_refresh", success=False, reason="network_or_other") return save_tokens(ref, refreshed) self._cache_access_token(ref, refreshed) self._apply_access_token(runtime, refreshed.access_token) + from kimi_cli.telemetry import track + + track("oauth_refresh", success=True) finally: xlock.release() diff --git a/src/kimi_cli/background/manager.py b/src/kimi_cli/background/manager.py index f2543a73c..dc85eb3bd 100644 --- a/src/kimi_cli/background/manager.py +++ b/src/kimi_cli/background/manager.py @@ -177,6 +177,9 @@ def create_bash_task( timeout_s=timeout_s, ) self._store.create_task(spec) + from kimi_cli.telemetry import track + + track("background_task_created") runtime = self._store.read_runtime(task_id) task_dir = self._store.task_dir(task_id) @@ -577,6 +580,11 @@ def _mark_task_completed(self, task_id: str) -> None: runtime.finished_at = runtime.updated_at runtime.failure_reason = None self._store.write_runtime(task_id, runtime) + from kimi_cli.telemetry import track + + if runtime.started_at and runtime.finished_at: + duration = runtime.finished_at - runtime.started_at + track("background_task_completed", success=True, duration_s=duration) def _mark_task_failed(self, task_id: str, reason: str) -> None: runtime = self._store.read_runtime(task_id) @@ -587,6 +595,16 @@ def _mark_task_failed(self, task_id: str, reason: str) -> None: runtime.finished_at = runtime.updated_at runtime.failure_reason = reason self._store.write_runtime(task_id, runtime) + from kimi_cli.telemetry import track + + if runtime.started_at and runtime.finished_at: + duration = runtime.finished_at - runtime.started_at + track( + "background_task_completed", + success=False, + duration_s=duration, + reason="error", + ) def _mark_task_timed_out(self, task_id: str, reason: str) -> None: runtime = self._store.read_runtime(task_id) @@ -599,6 +617,16 @@ def _mark_task_timed_out(self, task_id: str, reason: str) -> None: runtime.timed_out = True runtime.failure_reason = reason self._store.write_runtime(task_id, runtime) + from kimi_cli.telemetry import track + + if runtime.started_at and runtime.finished_at: + duration = runtime.finished_at - runtime.started_at + track( + "background_task_completed", + success=False, + duration_s=duration, + reason="timeout", + ) def _mark_task_killed(self, task_id: str, reason: str) -> None: runtime = self._store.read_runtime(task_id) @@ -610,3 +638,13 @@ def _mark_task_killed(self, task_id: str, reason: str) -> None: runtime.interrupted = True runtime.failure_reason = reason self._store.write_runtime(task_id, runtime) + from kimi_cli.telemetry import track + + if runtime.started_at and runtime.finished_at: + duration = runtime.finished_at - runtime.started_at + track( + "background_task_completed", + success=False, + duration_s=duration, + reason="killed", + ) diff --git a/src/kimi_cli/cli/__init__.py b/src/kimi_cli/cli/__init__.py index 393c60639..0080d7937 100644 --- a/src/kimi_cli/cli/__init__.py +++ b/src/kimi_cli/cli/__init__.py @@ -620,6 +620,7 @@ async def _run(session_id: str | None, prefill_text: str | None = None) -> tuple max_ralph_iterations=max_ralph_iterations, startup_progress=startup_progress.update if ui == "shell" else None, defer_mcp_loading=ui == "shell" and prompt is None, + ui_mode=ui, ) startup_progress.stop() diff --git a/src/kimi_cli/cli/__main__.py b/src/kimi_cli/cli/__main__.py index 3ad011483..d8771cf87 100644 --- a/src/kimi_cli/cli/__main__.py +++ b/src/kimi_cli/cli/__main__.py @@ -5,7 +5,15 @@ from kimi_cli.cli import cli if __name__ == "__main__": + from kimi_cli.telemetry.crash import install_crash_handlers, set_phase from kimi_cli.utils.proxy import normalize_proxy_env + # Same entry treatment as kimi_cli.__main__: install excepthook before + # anything else so startup-phase crashes in subcommand subprocesses + # (background-task-worker, __web-worker, acp via toad) are captured. + install_crash_handlers() normalize_proxy_env() - sys.exit(cli()) + try: + sys.exit(cli()) + finally: + set_phase("shutdown") diff --git a/src/kimi_cli/config.py b/src/kimi_cli/config.py index 7adf6b3c6..f2dc07c07 100644 --- a/src/kimi_cli/config.py +++ b/src/kimi_cli/config.py @@ -235,6 +235,10 @@ class Config(BaseModel): "instead of using only the first one found" ), ) + telemetry: bool = Field( + default=True, + description="Enable anonymous telemetry to help improve kimi-cli. Set to false to disable.", + ) @model_validator(mode="after") def validate_model(self) -> Self: diff --git a/src/kimi_cli/hooks/engine.py b/src/kimi_cli/hooks/engine.py index 8a361568b..918eb4064 100644 --- a/src/kimi_cli/hooks/engine.py +++ b/src/kimi_cli/hooks/engine.py @@ -234,13 +234,27 @@ async def trigger( return [] try: - return await self._execute_hooks( + results = await self._execute_hooks( event, matcher_value, server_matched, wire_matched, input_data ) except Exception: logger.warning("Hook engine error for {}, failing open", event) return [] + # Telemetry runs outside the fail-open try: a telemetry failure + # must NEVER discard hook results. For security-critical hooks + # (PreToolUse block), treating a sink failure as fail-open would + # silently bypass the block. + try: + from kimi_cli.telemetry import track + + has_block = any(r.action == "block" for r in results) + track("hook_triggered", event_type=event, action="block" if has_block else "allow") + except Exception: + logger.debug("Telemetry for hook_triggered failed") + + return results + async def _execute_hooks( self, event: str, diff --git a/src/kimi_cli/session_state.py b/src/kimi_cli/session_state.py index b8f031cde..7ad9811a8 100644 --- a/src/kimi_cli/session_state.py +++ b/src/kimi_cli/session_state.py @@ -103,8 +103,11 @@ def load_session_state(session_dir: Path) -> SessionState: try: with open(state_file, encoding="utf-8") as f: state = SessionState.model_validate(json.load(f)) - except (json.JSONDecodeError, ValidationError, UnicodeDecodeError): + except (json.JSONDecodeError, ValidationError, UnicodeDecodeError) as e: logger.warning("Corrupted state file, using defaults: {path}", path=state_file) + from kimi_cli.telemetry import track + + track("session_load_failed", reason=type(e).__name__) state = SessionState() # One-time migration from legacy metadata.json (best-effort) diff --git a/src/kimi_cli/soul/approval.py b/src/kimi_cli/soul/approval.py index c3279264b..bb8bf81ba 100644 --- a/src/kimi_cli/soul/approval.py +++ b/src/kimi_cli/soul/approval.py @@ -133,9 +133,23 @@ async def request( description=description, ) if self._state.yolo: + from kimi_cli.telemetry import track + + track( + "tool_approved", + tool_name=tool_call.function.name, + approval_mode="yolo", + ) return ApprovalResult(approved=True) if action in self._state.auto_approve_actions: + from kimi_cli.telemetry import track + + track( + "tool_approved", + tool_name=tool_call.function.name, + approval_mode="auto_session", + ) return ApprovalResult(approved=True) request_id = str(uuid.uuid4()) @@ -156,11 +170,30 @@ async def request( try: response, feedback = await self._runtime.wait_for_response(request_id) except ApprovalCancelledError: + from kimi_cli.telemetry import track + + track( + "tool_rejected", + tool_name=tool_call.function.name, + approval_mode="cancelled", + ) return ApprovalResult(approved=False) + from kimi_cli.telemetry import track + match response: case "approve": + track( + "tool_approved", + tool_name=tool_call.function.name, + approval_mode="manual", + ) return ApprovalResult(approved=True) case "approve_for_session": + track( + "tool_approved", + tool_name=tool_call.function.name, + approval_mode="manual", + ) self._state.auto_approve_actions.add(action) self._state.notify_change() for pending in self._runtime.list_pending(): @@ -168,6 +201,16 @@ async def request( self._runtime.resolve(pending.id, "approve") return ApprovalResult(approved=True) case "reject": + track( + "tool_rejected", + tool_name=tool_call.function.name, + approval_mode="manual", + ) return ApprovalResult(approved=False, feedback=feedback) case _: + track( + "tool_rejected", + tool_name=tool_call.function.name, + approval_mode="manual", + ) return ApprovalResult(approved=False) diff --git a/src/kimi_cli/soul/kimisoul.py b/src/kimi_cli/soul/kimisoul.py index 13d0d556b..1bc51f8ce 100644 --- a/src/kimi_cli/soul/kimisoul.py +++ b/src/kimi_cli/soul/kimisoul.py @@ -96,6 +96,46 @@ def type_check(soul: KimiSoul): DEFAULT_MAX_FLOW_MOVES = 1000 +def classify_api_error(e: Exception) -> tuple[str, int | None]: + """Classify an LLM API exception into (error_type, status_code). + + Exposed at module level so telemetry tests can import the real function + instead of duplicating the classification table. + + Returns: + (error_type, status_code) where status_code is None for non-HTTP errors. + """ + status_code: int | None = None + if isinstance(e, APIStatusError): + status = getattr(e, "status_code", getattr(e, "status", 0)) + status_code = int(status) if status else None + if status == 429: + return "rate_limit", status_code + if status in (401, 403): + return "auth", status_code + if status >= 500: + return "5xx_server", status_code + if 400 <= status < 500: + msg_lower = str(e).lower() + if ( + "context length" in msg_lower + or "context_length" in msg_lower + or "max tokens" in msg_lower + or "maximum context" in msg_lower + or "too many tokens" in msg_lower + ): + return "context_overflow", status_code + return "4xx_client", status_code + return "api", status_code + if isinstance(e, APIConnectionError): + return "network", None + if isinstance(e, (APITimeoutError, TimeoutError)): + return "timeout", None + if isinstance(e, APIEmptyResponseError): + return "empty_response", None + return "other", None + + type StepStopReason = Literal["no_tool_calls", "tool_rejected"] @@ -665,6 +705,9 @@ def _find_slash_command(self, name: str) -> SlashCommand[Any] | None: def _make_skill_runner(self, skill: Skill) -> Callable[[KimiSoul, str], None | Awaitable[None]]: async def _run_skill(soul: KimiSoul, args: str, *, _skill: Skill = skill) -> None: + from kimi_cli.telemetry import track + + track("skill_invoked", skill_name=_skill.name) skill_text = await read_skill_text(_skill) if skill_text is None: wire_send( @@ -695,17 +738,38 @@ async def _agent_loop(self) -> TurnOutcome: wire_send(MCPLoadingBegin()) try: await self.wait_for_background_mcp_loading() + # Track MCP connection result + if loading: + from kimi_cli.telemetry import track as _track_mcp + + mcp_snap = self._mcp_status_snapshot() + if mcp_snap: + if mcp_snap.connected > 0: + _track_mcp( + "mcp_connected", + server_count=mcp_snap.connected, + total_count=mcp_snap.total, + ) + _failed = mcp_snap.total - mcp_snap.connected + if _failed > 0: + _track_mcp( + "mcp_failed", + failed_count=_failed, + total_count=mcp_snap.total, + ) finally: if loading: wire_send(StatusUpdate(mcp_status=self._mcp_status_snapshot())) wire_send(MCPLoadingEnd()) step_no = 0 + self._current_step_no = 0 while True: step_no += 1 if step_no > self._loop_control.max_steps_per_turn: raise MaxStepsReached(self._loop_control.max_steps_per_turn) + self._current_step_no = step_no wire_send(StepBegin(n=step_no)) back_to_the_future: BackToTheFuture | None = None step_outcome: StepOutcome | None = None @@ -747,6 +811,14 @@ async def _agent_loop(self) -> TurnOutcome: request_id=req_id, ) wire_send(StepInterrupted()) + # Track API/step errors + from kimi_cli.telemetry import track + + error_type, status_code = classify_api_error(e) + if status_code is not None: + track("api_error", error_type=error_type, status_code=status_code) + else: + track("api_error", error_type=error_type) # --- StopFailure hook --- from kimi_cli.hooks import events as _hook_events @@ -1003,6 +1075,7 @@ async def _compact_with_retry() -> CompactionResult: ) trigger_reason = "manual" if custom_instruction else "auto" + before_tokens = self._context.token_count from kimi_cli.hooks import events await self._hook_engine.trigger( @@ -1012,12 +1085,23 @@ async def _compact_with_retry() -> CompactionResult: session_id=self._runtime.session.id, cwd=str(Path.cwd()), trigger=trigger_reason, - token_count=self._context.token_count, + token_count=before_tokens, ), ) wire_send(CompactionBegin()) - compaction_result = await _compact_with_retry() + try: + compaction_result = await _compact_with_retry() + except Exception: + from kimi_cli.telemetry import track + + track( + "compaction_triggered", + trigger_type=trigger_reason, + before_tokens=before_tokens, + success=False, + ) + raise await self._context.clear() await self._context.write_system_prompt(self._agent.system_prompt) await self._checkpoint() @@ -1045,6 +1129,16 @@ async def _compact_with_retry() -> CompactionResult: wire_send(CompactionEnd()) + from kimi_cli.telemetry import track + + track( + "compaction_triggered", + trigger_type=trigger_reason, + before_tokens=before_tokens, + after_tokens=estimated_token_count, + success=True, + ) + _hook_task = asyncio.create_task( self._hook_engine.trigger( "PostCompact", @@ -1230,6 +1324,10 @@ async def run(self, soul: KimiSoul, args: str) -> None: command = f"/{FLOW_COMMAND_PREFIX}{self._name}" if self._name else "/flow" logger.warning("Agent flow {command} ignores args: {args}", command=command, args=args) return + if self._name: + from kimi_cli.telemetry import track + + track("flow_invoked", flow_name=self._name) current_id = self._flow.begin_id moves = 0 diff --git a/src/kimi_cli/soul/slash.py b/src/kimi_cli/soul/slash.py index fbe5a4541..1531db4e8 100644 --- a/src/kimi_cli/soul/slash.py +++ b/src/kimi_cli/soul/slash.py @@ -50,6 +50,9 @@ async def init(soul: KimiSoul, args: str): f"Latest AGENTS.md file content:\n{agents_md}" ) await soul.context.append_message(Message(role="user", content=[system_message])) + from kimi_cli.telemetry import track + + track("init_complete") @registry.command @@ -92,11 +95,15 @@ async def clear(soul: KimiSoul, args: str): @registry.command async def yolo(soul: KimiSoul, args: str): """Toggle YOLO mode (auto-approve all actions)""" + from kimi_cli.telemetry import track + if soul.runtime.approval.is_yolo(): soul.runtime.approval.set_yolo(False) + track("yolo_toggle", enabled=False) wire_send(TextPart(text="You only die once! Actions will require approval.")) else: soul.runtime.approval.set_yolo(True) + track("yolo_toggle", enabled=True) wire_send(TextPart(text="You only live once! All actions will be auto-approved.")) diff --git a/src/kimi_cli/soul/toolset.py b/src/kimi_cli/soul/toolset.py index 8616e9adf..58030c15c 100644 --- a/src/kimi_cli/soul/toolset.py +++ b/src/kimi_cli/soul/toolset.py @@ -189,6 +189,7 @@ async def _call(): try: ret = await tool.call(arguments) except Exception as e: + tool_elapsed = time.monotonic() - t0 logger.exception( "Tool execution failed: {tool_name} (call_id={call_id})", tool_name=tool_call.function.name, @@ -212,6 +213,21 @@ async def _call(): _hook_task.add_done_callback( lambda t: t.exception() if not t.cancelled() else None ) + from kimi_cli.telemetry import track + + _error_type = type(e).__name__ + track( + "tool_error", + tool_name=tool_call.function.name, + error_type=_error_type, + ) + track( + "tool_call", + tool_name=tool_call.function.name, + success=False, + duration_ms=int(tool_elapsed * 1000), + error_type=_error_type, + ) return ToolResult( tool_call_id=tool_call.id, return_value=ToolRuntimeError(str(e)), @@ -224,6 +240,14 @@ async def _call(): elapsed=tool_elapsed, call_id=tool_call.id, ) + from kimi_cli.telemetry import track as _track_tool_call + + _track_tool_call( + "tool_call", + tool_name=tool_call.function.name, + success=not isinstance(ret, ToolError), + duration_ms=int(tool_elapsed * 1000), + ) # --- PostToolUse (fire-and-forget) --- _hook_task = asyncio.create_task( diff --git a/src/kimi_cli/subagents/runner.py b/src/kimi_cli/subagents/runner.py index 2dfeb0f01..746598282 100644 --- a/src/kimi_cli/subagents/runner.py +++ b/src/kimi_cli/subagents/runner.py @@ -381,6 +381,9 @@ async def _prepare_instance(self, req: ForegroundRunRequest) -> PreparedInstance effective_model=req.model or type_def.default_model, ), ) + from kimi_cli.telemetry import track + + track("subagent_created") return PreparedInstance( record=record, actual_type=actual_type, diff --git a/src/kimi_cli/telemetry/__init__.py b/src/kimi_cli/telemetry/__init__.py new file mode 100644 index 000000000..22833c1b3 --- /dev/null +++ b/src/kimi_cli/telemetry/__init__.py @@ -0,0 +1,158 @@ +""" +Telemetry event tracking for kimi-cli. + +This module has NO dependencies on other kimi_cli modules to avoid import cycles. +track() can be called at any point during startup, even before the sink is attached. +Events are buffered in memory and flushed once the sink is ready. + +Usage: + from kimi_cli.telemetry import track, set_context, attach_sink + + # Early in startup — events queue in memory + track("first_launch") + + # After app init — attach sink to start flushing + set_context(device_id="abc", session_id="def") + attach_sink(sink) +""" + +from __future__ import annotations + +import atexit +import time +import uuid +from collections import deque +from contextlib import suppress +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from kimi_cli.telemetry.sink import EventSink + +# --------------------------------------------------------------------------- +# Module-level state (zero dependencies) +# --------------------------------------------------------------------------- + +_MAX_QUEUE_SIZE = 1000 +"""Maximum number of events to buffer before sink is attached.""" + +_event_queue: deque[dict[str, Any]] = deque(maxlen=_MAX_QUEUE_SIZE) +"""Events buffered before sink is attached.""" + +_device_id: str | None = None +_session_id: str | None = None +_client_info: tuple[str, str | None] | None = None +"""(name, version) tuple, set atomically via set_client_info.""" +_sink: EventSink | None = None +_disabled: bool = False + + +def set_context(*, device_id: str, session_id: str) -> None: + """Set device and session identifiers. Call once after app init.""" + global _device_id, _session_id + _device_id = device_id + _session_id = session_id + + +def set_client_info(*, name: str, version: str | None = None) -> None: + """Set the wire/acp client name and version (e.g. VSCode 1.90.0, zed 0.180.0). + + Called by wire/acp servers after receiving the client's initialize message. + Values are passed through verbatim — backend is responsible for any + validation, normalization or alerting on anomalous values. + """ + global _client_info + if not name: + return + _client_info = (name, version) + + +def get_client_info() -> tuple[str, str | None] | None: + """Return the current (name, version) tuple, or None if unset. + + Exposed for the sink to enrich events with the latest client info. + """ + return _client_info + + +def disable() -> None: + """Permanently disable telemetry for this process. Events are silently dropped.""" + global _disabled + _disabled = True + _event_queue.clear() + if _sink is not None: + _sink.clear_buffer() + + +def attach_sink(sink: EventSink) -> None: + """Attach the event sink and drain any queued events. + + Multi-session ACP mode calls ``KimiCLI.create()`` per session, which + means ``attach_sink`` runs again while a previous sink may hold + un-flushed buffered events. Flush the old sink synchronously (writes + any pending events to the disk fallback) before replacing it, so + earlier sessions' events are not silently orphaned. + """ + global _sink + if _sink is not None and _sink is not sink: + # flush_sync already swallows its own transport failures; + # ``suppress`` guards against truly unexpected errors so sink + # replacement is never blocked by a flaky predecessor. + with suppress(Exception): + _sink.flush_sync() + _sink = sink + # Drain events that were queued before sink was ready, + # backfilling device_id/session_id for events tracked before set_context(). + if _event_queue: + for event in _event_queue: + if event.get("device_id") is None: + event["device_id"] = _device_id + if event.get("session_id") is None: + event["session_id"] = _session_id + _sink.accept(event) + _event_queue.clear() + + +def track(event: str, **properties: bool | int | float | str | None) -> None: + """ + Record a telemetry event. + + This function is non-blocking: it appends to an in-memory list. + Safe to call from synchronous prompt_toolkit key handlers. + + Args: + event: Event name (e.g. "input_command"). + **properties: Event properties. String values should only be used for + known enum-like values (command names, mode names, error types). + NEVER pass user input, file paths, or code snippets. + """ + if _disabled: + return + + record = { + "event_id": uuid.uuid4().hex, + "device_id": _device_id, + "session_id": _session_id, + "event": event, + "timestamp": time.time(), + "properties": properties if properties else {}, + } + + if _sink is not None: + _sink.accept(record) + else: + _event_queue.append(record) + + +def get_sink() -> EventSink | None: + """Return the current sink, or None if not attached.""" + return _sink + + +def flush_sync() -> None: + """Synchronously flush any buffered events. Called on exit.""" + if _sink is not None: + _sink.flush_sync() + + +# Register atexit handler to flush remaining events on normal exit +atexit.register(flush_sync) diff --git a/src/kimi_cli/telemetry/crash.py b/src/kimi_cli/telemetry/crash.py new file mode 100644 index 000000000..9b4eb8b8a --- /dev/null +++ b/src/kimi_cli/telemetry/crash.py @@ -0,0 +1,149 @@ +"""Crash telemetry: capture uncaught exceptions via sys.excepthook and +asyncio's exception handler, emit a ``crash`` event, then delegate to the +original handler so the traceback still gets printed. + +Privacy: only the exception *class name* is emitted. No message, no stack +trace — those can contain file paths, user input fragments, or URLs. +""" + +from __future__ import annotations + +import asyncio +import sys +from types import TracebackType +from typing import Any + +# --------------------------------------------------------------------------- +# Phase tracking +# --------------------------------------------------------------------------- + +_phase: str = "startup" +"""Coarse lifecycle bucket recorded on each crash event. + +Valid values: ``startup`` (before app init finishes), ``runtime`` +(normal operation), ``shutdown`` (after the main entry point returns). +""" + + +def set_phase(phase: str) -> None: + """Update the current lifecycle phase. Called by app entry points.""" + global _phase + _phase = phase + + +# --------------------------------------------------------------------------- +# Filters +# --------------------------------------------------------------------------- + + +def _should_ignore_for_excepthook(exc_type: type[BaseException]) -> bool: + """Return True for exceptions that are not programming bugs. + + - KeyboardInterrupt: Ctrl+C, already covered by the ``cancel`` event. + - SystemExit: deliberate exit, not a crash. + - click.ClickException (UsageError / BadParameter / ...): user-facing + CLI input errors, not program bugs. + """ + if issubclass(exc_type, (KeyboardInterrupt, SystemExit)): + return True + try: + import click + + if issubclass(exc_type, click.exceptions.ClickException): + return True + except ImportError: + pass + return False + + +# --------------------------------------------------------------------------- +# sys.excepthook +# --------------------------------------------------------------------------- + +_original_excepthook: Any = None + + +def _excepthook( + exc_type: type[BaseException], + exc: BaseException, + tb: TracebackType | None, +) -> None: + if not _should_ignore_for_excepthook(exc_type): + # Any failure inside telemetry must not mask the underlying crash. + try: + from kimi_cli.telemetry import track + + track( + "crash", + error_type=exc_type.__name__, + where=_phase, + source="excepthook", + ) + except Exception: + pass + + # Always delegate so the traceback is still printed. + handler = _original_excepthook if _original_excepthook is not None else sys.__excepthook__ + handler(exc_type, exc, tb) + + +def install_crash_handlers() -> None: + """Install the process-level ``sys.excepthook``. + + Idempotent: repeated calls are no-ops. Should be called as early as + possible in the entry point so startup-phase exceptions are captured. + """ + global _original_excepthook + if sys.excepthook is _excepthook: + return + _original_excepthook = sys.excepthook + sys.excepthook = _excepthook + + +# --------------------------------------------------------------------------- +# asyncio exception handler +# --------------------------------------------------------------------------- + +_original_asyncio_handler: Any = None + + +def _asyncio_handler( + loop: asyncio.AbstractEventLoop, + context: dict[str, Any], +) -> None: + exc = context.get("exception") + # CancelledError during shutdown/cancellation is normal control flow. + if exc is not None and not isinstance(exc, asyncio.CancelledError): + try: + from kimi_cli.telemetry import track + + track( + "crash", + error_type=type(exc).__name__, + where=_phase, + source="asyncio_task", + ) + except Exception: + pass + + # Delegate so the original logging behavior (or custom handler) runs. + if _original_asyncio_handler is not None: + _original_asyncio_handler(loop, context) + else: + loop.default_exception_handler(context) + + +def install_asyncio_handler(loop: asyncio.AbstractEventLoop | None = None) -> None: + """Install the crash handler on the given (or current running) loop. + + Idempotent on the same loop. If a custom handler was already installed, + it is preserved and still invoked after the crash event is recorded. + """ + global _original_asyncio_handler + if loop is None: + loop = asyncio.get_running_loop() + current = loop.get_exception_handler() + if current is _asyncio_handler: + return + _original_asyncio_handler = current + loop.set_exception_handler(_asyncio_handler) diff --git a/src/kimi_cli/telemetry/sink.py b/src/kimi_cli/telemetry/sink.py new file mode 100644 index 000000000..72b9168e4 --- /dev/null +++ b/src/kimi_cli/telemetry/sink.py @@ -0,0 +1,149 @@ +""" +EventSink: opt-out check, context enrichment, buffer management, timed flush. +""" + +from __future__ import annotations + +import asyncio +import locale +import os +import platform +import threading +from typing import Any + +from kimi_cli.telemetry.transport import AsyncTransport +from kimi_cli.utils.logging import logger + + +class EventSink: + """Buffers telemetry events and flushes them in batches.""" + + FLUSH_INTERVAL_S = 30.0 + FLUSH_THRESHOLD = 50 + + def __init__( + self, + transport: AsyncTransport, + *, + version: str = "", + model: str = "", + ui_mode: str = "shell", + ) -> None: + self._transport = transport + self._buffer: list[dict[str, Any]] = [] + self._lock = threading.Lock() + self._flush_task: asyncio.Task[None] | None = None + # Static context enrichment + self._context: dict[str, Any] = { + "version": version, + "runtime": "python", + "platform": platform.system().lower(), + "arch": platform.machine(), + "python_version": platform.python_version(), + "os_version": platform.release(), + "ci": bool(os.environ.get("CI")), + "locale": locale.getlocale()[0] or "", + "terminal": os.environ.get("TERM_PROGRAM", ""), + } + self._model = model + self._ui_mode = ui_mode + + def accept(self, event: dict[str, Any]) -> None: + """Accept an event into the buffer. Non-blocking, thread-safe.""" + # Enrich with static context (copy to avoid mutating the caller's dict) + ctx = {**self._context, "ui_mode": self._ui_mode} + if self._model: + ctx["model"] = self._model + # Read the client_info tuple atomically (single pointer load) so we + # never observe a half-updated pair. + from kimi_cli.telemetry import get_client_info + + client_info = get_client_info() + if client_info is not None: + ctx["client_name"] = client_info[0] + if client_info[1]: + ctx["client_version"] = client_info[1] + enriched = {**event, "context": ctx} + + with self._lock: + self._buffer.append(enriched) + should_flush = len(self._buffer) >= self.FLUSH_THRESHOLD + + if should_flush: + self._schedule_async_flush() + + def start_periodic_flush(self, loop: asyncio.AbstractEventLoop | None = None) -> None: + """Start a background task that flushes every FLUSH_INTERVAL_S seconds.""" + if self._flush_task is not None: + return + + async def _periodic() -> None: + try: + while True: + await asyncio.sleep(self.FLUSH_INTERVAL_S) + await self._flush_async() + except asyncio.CancelledError: + pass + + if loop is None: + loop = asyncio.get_running_loop() + self._flush_task = loop.create_task(_periodic()) + + async def retry_disk_events(self) -> None: + """Retry sending any events that were previously saved to disk.""" + await self._transport.retry_disk_events() + + def clear_buffer(self) -> None: + """Discard all buffered events without sending them.""" + with self._lock: + self._buffer.clear() + + def stop_periodic_flush(self) -> None: + """Cancel the periodic flush task.""" + if self._flush_task is not None: + self._flush_task.cancel() + self._flush_task = None + + async def flush(self) -> None: + """Async flush: send all buffered events.""" + await self._flush_async() + + def flush_sync(self) -> None: + """Synchronous flush for atexit / signal handlers. + + Writes remaining events to disk fallback file so they can be + retried on next startup. Does NOT attempt HTTP (no event loop). + """ + with self._lock: + if not self._buffer: + return + events = list(self._buffer) + self._buffer.clear() + + try: + self._transport.save_to_disk(events) + except Exception: + logger.debug("Failed to save telemetry events to disk on exit") + + async def _flush_async(self) -> None: + """Take all buffered events and send them.""" + with self._lock: + if not self._buffer: + return + events = list(self._buffer) + self._buffer.clear() + + try: + await self._transport.send(events) + except Exception: + # Transport handles disk fallback internally; log and move on + logger.debug("Telemetry flush failed, events saved to disk for retry") + + def _schedule_async_flush(self) -> None: + """Schedule an async flush from any thread.""" + try: + loop = asyncio.get_running_loop() + loop.create_task(self._flush_async()) + except RuntimeError: + # No running event loop — will be flushed by periodic task or on exit + pass diff --git a/src/kimi_cli/telemetry/transport.py b/src/kimi_cli/telemetry/transport.py new file mode 100644 index 000000000..13419e56b --- /dev/null +++ b/src/kimi_cli/telemetry/transport.py @@ -0,0 +1,318 @@ +""" +AsyncTransport: HTTP sending with 401 fallback, disk persistence, startup retry. +""" + +from __future__ import annotations + +import asyncio +import json +import os +import time +import uuid +from collections.abc import Callable +from contextlib import suppress +from pathlib import Path +from typing import Any + +import aiohttp + +from kimi_cli.share import get_share_dir +from kimi_cli.utils.logging import logger + +TELEMETRY_ENDPOINT = "https://telemetry-logs.kimi.com/v1/event" + +SEND_TIMEOUT = aiohttp.ClientTimeout(total=10, sock_connect=5) +DISK_EVENT_MAX_AGE_S = 7 * 24 * 3600 # 7 days + +# In-process retry schedule: 1s, 4s, 16s backoff between attempts. +# Total attempts = len(RETRY_BACKOFFS_S) + 1 initial = 4 attempts max. +# Transient failures exhausted here are written to disk for next-startup retry. +RETRY_BACKOFFS_S = (1.0, 4.0, 16.0) + +# Server-side event namespace. Client code uses bare business names +# (``track("started")``); the prefix is applied only at the outbound +# HTTP boundary. Keeping it as a single constant means changing the +# server-side namespace in the future is a one-line change. +SERVER_EVENT_PREFIX = "kfc_" + +# Prefix for the payload-level ``user_id``. The full id is +# ``USER_ID_PREFIX + device_id``, e.g. ``kfc_device_id_a1b2c3...``. +USER_ID_PREFIX = "kfc_device_id_" + + +def _build_user_id(device_id: str) -> str: + """Derive the payload-level ``user_id`` from the local ``device_id``.""" + return USER_ID_PREFIX + device_id + + +def _apply_server_prefix_one(event: dict[str, Any]) -> dict[str, Any]: + """Return an outbound copy of ``event`` with ``SERVER_EVENT_PREFIX`` on its name. + + Idempotent: events already carrying the prefix pass through unchanged. + Non-string / empty / missing ``event`` fields pass through without copy. + Does not mutate the input. + """ + name = event.get("event", "") + if isinstance(name, str) and name and not name.startswith(SERVER_EVENT_PREFIX): + return {**event, "event": SERVER_EVENT_PREFIX + name} + return event + + +def _assert_primitive(scope: str, key: str, value: Any) -> None: + """Raise ``TypeError`` if ``value`` is not a telemetry-safe primitive. + + The ``track()`` signature already restricts properties to primitives, but + ``sink`` enriches events with hand-built context dicts; this runtime + guardrail catches accidental nested structures before they hit the backend. + """ + if value is None or isinstance(value, (bool, int, float, str)): + return + raise TypeError(f"telemetry {scope}.{key} must be primitive, got {type(value).__name__}") + + +def _flatten_event(event: dict[str, Any]) -> dict[str, Any]: + """Expand ``properties``/``context`` into ``property_*``/``context_*`` keys. + + Top-level event fields are preserved unchanged. Unknown future top-level + keys are passed through without transformation. + + Raises ``TypeError`` on nested dict / list values inside properties or + context. Does not mutate the input. + """ + out: dict[str, Any] = {} + for key, value in event.items(): + if key == "properties": + props: dict[str, Any] = value or {} + for pk, pv in props.items(): + _assert_primitive("property", pk, pv) + out[f"property_{pk}"] = pv + elif key == "context": + ctx: dict[str, Any] = value or {} + for ck, cv in ctx.items(): + _assert_primitive("context", ck, cv) + out[f"context_{ck}"] = cv + else: + out[key] = value + return out + + +def _build_payload(events: list[dict[str, Any]], device_id: str) -> dict[str, Any]: + """Assemble the outbound HTTP payload. + + The payload carries a single ``user_id`` at the top (derived from + ``device_id``) and a list of flat, prefixed events underneath. + """ + flat_events: list[dict[str, Any]] = [] + for event in events: + flat_events.append(_flatten_event(_apply_server_prefix_one(event))) + return { + "user_id": _build_user_id(device_id), + "events": flat_events, + } + + +def _telemetry_dir() -> Path: + path = get_share_dir() / "telemetry" + path.mkdir(parents=True, exist_ok=True) + # Restrict to user-only: these JSONL files carry device_id / session_id / + # terminal / locale / os_version and should not be world-readable. + with suppress(OSError): + os.chmod(path, 0o700) + return path + + +class AsyncTransport: + """Sends telemetry events over HTTP with disk fallback.""" + + def __init__( + self, + *, + device_id: str = "", + get_access_token: Callable[[], str | None] | None = None, + endpoint: str = TELEMETRY_ENDPOINT, + retry_backoffs_s: tuple[float, ...] | None = None, + ) -> None: + """ + Args: + device_id: Local device UUID, used to derive the payload-level + ``user_id``. Defaults to empty string for test convenience; + production callers always pass the real device id. + get_access_token: Callable that returns the current OAuth access token + (or None if not logged in). Read-only, must not trigger refresh. + endpoint: HTTP endpoint to POST events to. + retry_backoffs_s: Sleep durations between attempts on transient errors. + Pass an empty tuple in tests to disable in-process retry. + """ + self._device_id = device_id + self._get_access_token = get_access_token + self._endpoint = endpoint + self._retry_backoffs = ( + retry_backoffs_s if retry_backoffs_s is not None else RETRY_BACKOFFS_S + ) + + async def send(self, events: list[dict[str, Any]]) -> None: + """Send a batch of events with in-process retry, falling back to disk.""" + if not events: + return + + # Assemble the outbound payload at the transport boundary. + # ``events`` itself is kept untouched (nested, bare names) so + # ``save_to_disk`` below persists the original shape. + try: + payload = _build_payload(events, self._device_id) + except TypeError as exc: + # Schema violation: a caller slipped a non-primitive value into + # properties/context. Retrying would hit the same TypeError on + # every reload, so falling back to disk would just create a + # permanently stuck file — drop with a warning instead. + logger.warning( + "Telemetry payload schema violation, dropping {count} events: {err}", + count=len(events), + err=exc, + ) + return + + try: + for attempt_idx in range(len(self._retry_backoffs) + 1): + try: + await self._send_http(payload) + return + except _TransientError as exc: + if attempt_idx >= len(self._retry_backoffs): + logger.debug( + "Telemetry send transient failure after {attempts} attempts: {err}", + attempts=attempt_idx + 1, + err=exc, + ) + break + backoff = self._retry_backoffs[attempt_idx] + await asyncio.sleep(backoff) + except Exception: + logger.debug("Telemetry send failed unexpectedly") + break + except asyncio.CancelledError: + # Task cancelled (e.g. exit timeout) — persist events before propagating. + # This covers cancellation during _send_http OR asyncio.sleep. + self.save_to_disk(events) + raise + + self.save_to_disk(events) + + async def _send_http(self, payload: dict[str, Any]) -> None: + """Attempt HTTP POST with 401 anonymous fallback.""" + from kimi_cli.utils.aiohttp import new_client_session + + token = self._get_access_token() if self._get_access_token else None + headers = {"Content-Type": "application/json"} + if token: + headers["Authorization"] = f"Bearer {token}" + + async with new_client_session(timeout=SEND_TIMEOUT) as session: + try: + async with session.post(self._endpoint, json=payload, headers=headers) as resp: + if resp.status == 401 and token: + # Auth failed — retry without token (anonymous) + headers.pop("Authorization", None) + async with session.post( + self._endpoint, json=payload, headers=headers + ) as retry_resp: + if retry_resp.status >= 500 or retry_resp.status == 429: + raise _TransientError(f"HTTP {retry_resp.status}") + elif retry_resp.status >= 400: + # Client error (4xx, except 429) — not recoverable, don't retry + logger.debug( + "Anonymous retry got client error HTTP {status}, dropping", + status=retry_resp.status, + ) + return + return + elif resp.status >= 500 or resp.status == 429: + raise _TransientError(f"HTTP {resp.status}") + elif resp.status >= 400: + # Client error (4xx, except 429) — not recoverable, don't retry. + # Avoids endless disk-spool accumulation from schema-mismatch + # or auth-shape errors that will never succeed on re-send. + logger.debug( + "Telemetry got client error HTTP {status}, dropping", + status=resp.status, + ) + return + except (aiohttp.ClientError, TimeoutError) as exc: + raise _TransientError(str(exc)) from exc + + def save_to_disk(self, events: list[dict[str, Any]]) -> None: + """Persist events to disk for later retry. Append-only JSONL. + + Stores the original nested shape (bare event names, ``properties`` + and ``context`` sub-dicts). The outbound pipeline is re-applied on + retry, so the server-side prefix and user_id are added fresh each time. + """ + if not events: + return + try: + path = _telemetry_dir() / f"failed_{uuid.uuid4().hex[:12]}.jsonl" + # Create with 0o600 up-front — avoids a race window where the + # file is briefly world-readable before a post-hoc chmod. + fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o600) + with open(fd, "w", encoding="utf-8") as f: + for event in events: + f.write(json.dumps(event, ensure_ascii=False, separators=(",", ":"))) + f.write("\n") + logger.debug( + "Saved {count} telemetry events to {path}", + count=len(events), + path=path, + ) + except Exception: + logger.debug("Failed to save telemetry events to disk") + + async def retry_disk_events(self) -> None: + """On startup, scan disk for persisted events and resend them.""" + telemetry_dir = _telemetry_dir() + failed_files = list(telemetry_dir.glob("failed_*.jsonl")) + if not failed_files: + return + + now = time.time() + for path in failed_files: + # Delete files older than DISK_EVENT_MAX_AGE_S + try: + if now - path.stat().st_mtime > DISK_EVENT_MAX_AGE_S: + logger.debug("Removing expired telemetry file: {path}", path=path) + path.unlink(missing_ok=True) + continue + except OSError: + pass + + try: + events: list[dict[str, Any]] = [] + with open(path, encoding="utf-8") as f: + for line in f: + line = line.strip() + if line: + events.append(json.loads(line)) + if events: + # Same outbound-only rules as ``send``; disk JSONL stored + # the bare, nested client-side shape. + await self._send_http(_build_payload(events, self._device_id)) + # Success — delete the file + path.unlink(missing_ok=True) + logger.debug( + "Retried {count} telemetry events from {path}", + count=len(events), + path=path, + ) + except _TransientError: + # Still failing — leave file for next startup + logger.debug("Retry of {path} failed, will try again later", path=path) + except json.JSONDecodeError: + # Corrupted file — delete it + logger.debug("Removing corrupted telemetry file: {path}", path=path) + path.unlink(missing_ok=True) + except Exception: + # Unexpected error — leave file for next startup + logger.debug("Unexpected error retrying {path}, will try again later", path=path) + + +class _TransientError(Exception): + """Raised on transient HTTP/network errors to trigger disk fallback.""" diff --git a/src/kimi_cli/ui/shell/__init__.py b/src/kimi_cli/ui/shell/__init__.py index 2e35bcb65..d6ac88d61 100644 --- a/src/kimi_cli/ui/shell/__init__.py +++ b/src/kimi_cli/ui/shell/__init__.py @@ -340,6 +340,8 @@ async def _route_prompt_events( await idle_events.put(_PromptEvent(kind="input", user_input=user_input)) async def run(self, command: str | None = None) -> bool: + _run_start_time = time.monotonic() + # Initialize theme from config if isinstance(self.soul, KimiSoul): from kimi_cli.ui.theme import set_active_theme @@ -364,6 +366,14 @@ async def run(self, command: str | None = None) -> bool: _print_welcome_info(self.soul.name or "Kimi Code CLI", self._welcome_info) + # Start telemetry periodic flush and disk retry + from kimi_cli.telemetry import get_sink + + _telemetry_sink = get_sink() + if _telemetry_sink is not None: + _telemetry_sink.start_periodic_flush() + self._start_background_task(_telemetry_sink.retry_disk_events()) + if isinstance(self.soul, KimiSoul): watcher = NotificationWatcher( self.soul.runtime.notifications, @@ -593,6 +603,9 @@ def _can_auto_trigger_pending() -> bool: ) action = classify_input(input_text, is_streaming=False) if action.kind == InputAction.BTW and isinstance(self.soul, KimiSoul): + from kimi_cli.telemetry import track + + track("input_btw") await self._run_btw_modal(action.args, prompt_session) resume_prompt.set() continue @@ -607,6 +620,9 @@ def _can_auto_trigger_pending() -> bool: and shell_slash_registry.find_command(slash_cmd_call.name) is None ) if is_soul_slash: + from kimi_cli.telemetry import track + + track("input_command", command=slash_cmd_call.name) background_autotrigger_armed = True resume_prompt.set() await self.run_soul_command(slash_cmd_call.raw_input) @@ -637,6 +653,21 @@ def _can_auto_trigger_pending() -> bool: self._approval_modal = None self._prompt_session = None self._cancel_background_tasks() + # Track exit and flush remaining telemetry events. + # Cap the exit-path flush at 3 s so we don't block for ~50 s + # when the endpoint is unreachable (in-process retry backoff). + # On timeout the CancelledError handler in transport.send() + # persists in-flight events to disk; flush_sync() catches any + # events still in the buffer. + from kimi_cli.telemetry import track + + track("exit", duration_s=time.monotonic() - _run_start_time) + if _telemetry_sink is not None: + _telemetry_sink.stop_periodic_flush() + try: + await asyncio.wait_for(_telemetry_sink.flush(), timeout=3.0) + except (TimeoutError, Exception): + _telemetry_sink.flush_sync() ensure_tty_sane() return shell_ok @@ -673,6 +704,9 @@ async def _run_shell_command(self, command: str) -> None: return logger.info("Running shell command: {cmd}", cmd=command) + from kimi_cli.telemetry import track + + track("input_bash") proc: asyncio.subprocess.Process | None = None @@ -700,15 +734,19 @@ def _handler(): async def _run_slash_command(self, command_call: SlashCommandCall) -> None: from kimi_cli.cli import Reload, SwitchToVis, SwitchToWeb + from kimi_cli.telemetry import track if command_call.name not in self._available_slash_commands: logger.info("Unknown slash command /{command}", command=command_call.name) + track("input_command_invalid") console.print( f'[red]Unknown slash command "/{command_call.name}", ' 'type "/" for all available commands[/red]' ) return + track("input_command", command=command_call.name) + command = shell_slash_registry.find_command(command_call.name) if command is None: # the input is a soul-level slash command call @@ -926,6 +964,12 @@ def _on_view_ready(view: Any) -> None: ) except RunCancelled: logger.info("Cancelled by user") + from kimi_cli.telemetry import track + + _at_step = ( + getattr(self.soul, "_current_step_no", 0) if isinstance(self.soul, KimiSoul) else 0 + ) + track("turn_interrupted", at_step=_at_step) console.print("[red]Interrupted by user[/red]") except Exception as e: logger.exception("Unexpected error:") @@ -1444,6 +1488,9 @@ def _print_welcome_info(name: str, info_items: list[WelcomeInfoItem]) -> None: f"Please run `{_update_mod.UPGRADE_COMMAND}` to upgrade.[/yellow]" ) ) + from kimi_cli.telemetry import track + + track("update_prompted", current=current_version, latest=latest_version) console.print( Panel( diff --git a/src/kimi_cli/ui/shell/export_import.py b/src/kimi_cli/ui/shell/export_import.py index a1e28c4e7..4fb1c2392 100644 --- a/src/kimi_cli/ui/shell/export_import.py +++ b/src/kimi_cli/ui/shell/export_import.py @@ -44,6 +44,9 @@ async def export(app: Shell, args: str): return output, count = result + from kimi_cli.telemetry import track + + track("export") display = shorten_home(KaosPath(str(output))) console.print(f"[green]Exported {count} messages to {display}[/green]") console.print( @@ -93,6 +96,9 @@ async def import_context(app: Shell, args: str): return source_desc, content_len = result + from kimi_cli.telemetry import track + + track("import") # Write to wire file so the import appears in session replay await soul.wire_file.append_message( diff --git a/src/kimi_cli/ui/shell/oauth.py b/src/kimi_cli/ui/shell/oauth.py index 059bd1582..30ba0dc41 100644 --- a/src/kimi_cli/ui/shell/oauth.py +++ b/src/kimi_cli/ui/shell/oauth.py @@ -73,6 +73,9 @@ async def login(app: Shell, args: str) -> None: ok = await setup_platform(platform) if not ok: return + from kimi_cli.telemetry import track + + track("login", provider=platform.id) await asyncio.sleep(1) console.clear() raise Reload @@ -138,6 +141,9 @@ async def logout(app: Shell, args: str) -> None: save_config(config) console.print("[green]✓[/green] Logged out successfully.") + from kimi_cli.telemetry import track + + track("logout") await asyncio.sleep(1) console.clear() raise Reload diff --git a/src/kimi_cli/ui/shell/prompt.py b/src/kimi_cli/ui/shell/prompt.py index 3373e5478..5a192b93f 100644 --- a/src/kimi_cli/ui/shell/prompt.py +++ b/src/kimi_cli/ui/shell/prompt.py @@ -1274,6 +1274,9 @@ def _(event: KeyPressEvent) -> None: if self._active_prompt_delegate() is not None: return self._mode = self._mode.toggle() + from kimi_cli.telemetry import track + + track("shortcut_mode_switch", to_mode=self._mode.value) # Apply mode-specific settings self._apply_mode(event) # Redraw UI @@ -1289,6 +1292,9 @@ def _(event: KeyPressEvent) -> None: async def _toggle() -> None: assert self._plan_mode_toggle_callback is not None new_state = await self._plan_mode_toggle_callback() + from kimi_cli.telemetry import track + + track("shortcut_plan_toggle", enabled=new_state) if new_state: toast("plan mode ON", topic="plan_mode", duration=3.0, immediate=True) else: @@ -1302,11 +1308,17 @@ async def _toggle() -> None: @_kb.add("c-j", eager=True) def _(event: KeyPressEvent) -> None: """Insert a newline when Alt-Enter or Ctrl-J is pressed.""" + from kimi_cli.telemetry import track + + track("shortcut_newline") event.current_buffer.insert_text("\n") @_kb.add("c-o", eager=True) def _(event: KeyPressEvent) -> None: """Open current buffer in external editor.""" + from kimi_cli.telemetry import track + + track("shortcut_editor") self._open_in_external_editor(event) @_kb.add( @@ -1461,6 +1473,9 @@ def _(event: KeyPressEvent) -> None: @_kb.add("c-v", eager=True) def _(event: KeyPressEvent) -> None: + from kimi_cli.telemetry import track + + track("shortcut_paste") if self._try_paste_media(event): return clipboard_data = event.app.clipboard.get_data() diff --git a/src/kimi_cli/ui/shell/slash.py b/src/kimi_cli/ui/shell/slash.py index f33b5e43c..ba26de40e 100644 --- a/src/kimi_cli/ui/shell/slash.py +++ b/src/kimi_cli/ui/shell/slash.py @@ -257,6 +257,12 @@ async def model(app: Shell, args: str): console.print(f"[red]Failed to save config: {exc}[/red]") return + from kimi_cli.telemetry import track + + if model_changed: + track("model_switch", model=selected_model_name) + if thinking_changed: + track("thinking_toggle", enabled=new_thinking) console.print( f"[green]Switched to {selected_display} " f"with thinking {'on' if new_thinking else 'off'}. " @@ -470,6 +476,9 @@ def _fallback_to_issues(): ): pass session_id = soul.runtime.session.id + from kimi_cli.telemetry import track + + track("feedback_submitted") console.print( f"[green]Feedback submitted, thank you! Your session ID is: {session_id}[/green]" ) @@ -491,6 +500,9 @@ async def clear(app: Shell, args: str): """Clear the context""" if ensure_kimi_soul(app) is None: return + from kimi_cli.telemetry import track + + track("clear") await app.run_soul_command("/clear") raise Reload() @@ -509,6 +521,9 @@ async def new(app: Shell, args: str): if current_session.is_empty(): await current_session.delete() session = await Session.create(work_dir) + from kimi_cli.telemetry import track + + track("session_new") console.print("[green]New session created. Switching...[/green]") raise Reload(session_id=session.id) @@ -569,6 +584,9 @@ async def list_sessions(app: Shell, args: str): console.print(f"[yellow]Session is in a different directory. Run:[/yellow]\n {cmd}") return + from kimi_cli.telemetry import track + + track("session_resume") console.print(f"[green]Switching to session {selection}...[/green]") raise Reload(session_id=selection) @@ -633,6 +651,9 @@ def theme(app: Shell, args: str): console.print(f"[red]Failed to save config: {exc}[/red]") return + from kimi_cli.telemetry import track + + track("theme_switch", theme=arg) console.print(f"[green]Switched to {arg} theme. Reloading...[/green]") raise Reload(session_id=soul.runtime.session.id) @@ -640,6 +661,9 @@ def theme(app: Shell, args: str): @registry.command def web(app: Shell, args: str): """Open Kimi Code Web UI in browser""" + from kimi_cli.telemetry import track + + track("web_opened") soul = ensure_kimi_soul(app) session_id = soul.runtime.session.id if soul else None raise SwitchToWeb(session_id=session_id) @@ -648,6 +672,9 @@ def web(app: Shell, args: str): @registry.command def vis(app: Shell, args: str): """Open Kimi Agent Tracing Visualizer in browser""" + from kimi_cli.telemetry import track + + track("vis_opened") soul = ensure_kimi_soul(app) session_id = soul.runtime.session.id if soul else None raise SwitchToVis(session_id=session_id) @@ -785,6 +812,9 @@ async def undo(app: Shell, args: str): source_title=session.title, ) + from kimi_cli.telemetry import track + + track("undo") console.print(f"[green]Forked at turn {turn_index}. Switching to new session...[/green]") raise Reload(session_id=new_session_id, prefill_text=user_text) @@ -807,6 +837,9 @@ async def fork(app: Shell, args: str): source_title=session.title, ) + from kimi_cli.telemetry import track + + track("session_fork") console.print("[green]Session forked. Switching to new session...[/green]") raise Reload(session_id=new_session_id) diff --git a/src/kimi_cli/ui/shell/visualize/_interactive.py b/src/kimi_cli/ui/shell/visualize/_interactive.py index 980b63cc7..d2278ae85 100644 --- a/src/kimi_cli/ui/shell/visualize/_interactive.py +++ b/src/kimi_cli/ui/shell/visualize/_interactive.py @@ -293,6 +293,9 @@ def handle_local_input(self, user_input: UserInput) -> None: ) return self._queued_messages.append(user_input) + from kimi_cli.telemetry import track + + track("input_queue") # Invalidate directly — _flush_prompt_refresh() is gated by # _need_recompose which may be False between wire events. self._prompt_session.invalidate() @@ -335,6 +338,9 @@ def handle_immediate_steer(self, user_input: UserInput) -> None: return # Print permanently in conversation flow (shows placeholder for pasted text) console.print(render_user_echo_text(user_input.command)) + from kimi_cli.telemetry import track + + track("input_steer") # Track that we originated this steer locally (FIFO counter for dedup) self._pending_local_steer_count += 1 self._steer(user_input.content) diff --git a/src/kimi_cli/ui/shell/visualize/_live_view.py b/src/kimi_cli/ui/shell/visualize/_live_view.py index 1c6324bc1..53b180c7d 100644 --- a/src/kimi_cli/ui/shell/visualize/_live_view.py +++ b/src/kimi_cli/ui/shell/visualize/_live_view.py @@ -167,6 +167,9 @@ async def keyboard_handler(listener: KeyboardListener, event: KeyEvent) -> None: # Handle Ctrl+E specially - pause Live while the pager is active if event == KeyEvent.CTRL_E: if self.has_expandable_panel(): + from kimi_cli.telemetry import track + + track("shortcut_expand") await listener.pause() live.stop() try: @@ -472,13 +475,16 @@ def dispatch_wire_message(self, msg: WireMessage) -> None: case _: pass - def _try_submit_question(self) -> None: + def _try_submit_question(self, method: str = "enter") -> None: """Submit the current question answer; if all done, resolve and advance.""" panel = self._current_question_panel if panel is None: return all_done = panel.submit() if all_done: + from kimi_cli.telemetry import track + + track("question_answered", method=method) panel.request.resolve(panel.get_answers()) self.show_next_question_request() @@ -498,11 +504,14 @@ def dispatch_keyboard_event(self, event: KeyEvent) -> None: if self._current_question_panel.is_multi_select: self._current_question_panel.toggle_select() else: - self._try_submit_question() + self._try_submit_question(method="space") case KeyEvent.ENTER: # "Other" is handled in keyboard_handler (async context) - self._try_submit_question() + self._try_submit_question(method="enter") case KeyEvent.ESCAPE: + from kimi_cli.telemetry import track + + track("question_dismissed") self._current_question_panel.request.resolve({}) self.show_next_question_request() case ( @@ -529,7 +538,7 @@ def dispatch_keyboard_event(self, event: KeyEvent) -> None: panel.toggle_select() elif not panel.is_other_selected: # Auto-submit for single-select (unless "Other") - self._try_submit_question() + self._try_submit_question(method="number_key") case _: pass self.refresh_soon() @@ -537,6 +546,9 @@ def dispatch_keyboard_event(self, event: KeyEvent) -> None: # handle ESC key to cancel the run if event == KeyEvent.ESCAPE and self._cancel_event is not None: + from kimi_cli.telemetry import track + + track("cancel") self._cancel_event.set() return diff --git a/src/kimi_cli/web/runner/worker.py b/src/kimi_cli/web/runner/worker.py index 2686fd9d9..807095bc9 100644 --- a/src/kimi_cli/web/runner/worker.py +++ b/src/kimi_cli/web/runner/worker.py @@ -51,14 +51,16 @@ async def run_worker(session_id: UUID) -> None: # Create KimiCLI instance with MCP configuration try: - kimi_cli = await KimiCLI.create(session, mcp_configs=mcp_configs or None, resumed=resumed) + kimi_cli = await KimiCLI.create( + session, mcp_configs=mcp_configs or None, resumed=resumed, ui_mode="wire" + ) except MCPConfigError as exc: logger.warning( "Invalid MCP config in {path}: {error}. Starting without MCP.", path=default_mcp_file, error=exc, ) - kimi_cli = await KimiCLI.create(session, mcp_configs=None, resumed=resumed) + kimi_cli = await KimiCLI.create(session, mcp_configs=None, resumed=resumed, ui_mode="wire") # Run in wire stdio mode await kimi_cli.run_wire_stdio() diff --git a/src/kimi_cli/wire/server.py b/src/kimi_cli/wire/server.py index 9fdce1a97..aa87e8602 100644 --- a/src/kimi_cli/wire/server.py +++ b/src/kimi_cli/wire/server.py @@ -601,6 +601,11 @@ def _sync_plan_mode_tool_visibility(self, toolset: KimiToolset) -> None: ) def _apply_wire_client_info(self, client: ClientInfo | None) -> None: + if client is not None: + from kimi_cli.telemetry import set_client_info + + set_client_info(name=client.name, version=client.version) + if not isinstance(self._soul, KimiSoul): return llm = self._soul.runtime.llm diff --git a/tests/core/test_config.py b/tests/core/test_config.py index ce0fff56c..a5c9e54bc 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -57,6 +57,7 @@ def test_default_config_dump(): "mcp": {"client": {"tool_call_timeout_ms": 60000}}, "hooks": [], "merge_all_available_skills": False, + "telemetry": True, } ) diff --git a/tests/hooks/test_engine.py b/tests/hooks/test_engine.py index ef6e161c1..a651a04e0 100644 --- a/tests/hooks/test_engine.py +++ b/tests/hooks/test_engine.py @@ -1,3 +1,5 @@ +from unittest.mock import patch + import pytest from kimi_cli.hooks.config import HookDef @@ -68,3 +70,15 @@ async def test_invalid_regex_skips_hook(): # Should not raise, just skip the hook with invalid regex results = await engine.trigger("PreToolUse", matcher_value="Shell", input_data={}) assert len(results) == 0 + + +@pytest.mark.asyncio +async def test_telemetry_failure_does_not_discard_block_result(engine): + """Safety-critical: a telemetry failure MUST NOT cause the hook engine + to fail open. For PreToolUse block, dropping results to [] silently + bypasses the block — that's exactly what this guard prevents. + """ + with patch("kimi_cli.telemetry.track", side_effect=RuntimeError("telemetry broken")): + results = await engine.trigger("PreToolUse", matcher_value="ReadFile", input_data={}) + assert len(results) == 1 + assert results[0].action == "block" diff --git a/tests/telemetry/__init__.py b/tests/telemetry/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/telemetry/test_crash.py b/tests/telemetry/test_crash.py new file mode 100644 index 000000000..4a6b5aae6 --- /dev/null +++ b/tests/telemetry/test_crash.py @@ -0,0 +1,270 @@ +"""Tests for crash telemetry handlers (sys.excepthook + asyncio).""" + +from __future__ import annotations + +import asyncio +import sys +from typing import Any +from unittest.mock import MagicMock + +import click +import pytest + +import kimi_cli.telemetry as telemetry_mod +import kimi_cli.telemetry.crash as crash_mod +from kimi_cli.telemetry import attach_sink +from kimi_cli.telemetry.crash import ( + install_asyncio_handler, + install_crash_handlers, + set_phase, +) +from kimi_cli.telemetry.sink import EventSink + + +@pytest.fixture(autouse=True) +def _reset_state(): + """Reset telemetry + crash module state around each test.""" + # telemetry module + telemetry_mod._event_queue.clear() + telemetry_mod._device_id = None + telemetry_mod._session_id = None + telemetry_mod._client_info = None + telemetry_mod._sink = None + telemetry_mod._disabled = False + # crash module + original_excepthook = sys.excepthook + original_crash_excepthook_backup = crash_mod._original_excepthook + original_crash_asyncio_backup = crash_mod._original_asyncio_handler + original_phase = crash_mod._phase + yield + # restore + sys.excepthook = original_excepthook + crash_mod._original_excepthook = original_crash_excepthook_backup + crash_mod._original_asyncio_handler = original_crash_asyncio_backup + crash_mod._phase = original_phase + telemetry_mod._event_queue.clear() + telemetry_mod._device_id = None + telemetry_mod._session_id = None + telemetry_mod._client_info = None + telemetry_mod._sink = None + telemetry_mod._disabled = False + + +def _invoke_excepthook(exc: BaseException) -> None: + """Simulate Python calling sys.excepthook with a raised exception.""" + try: + raise exc + except BaseException as e: + sys.excepthook(type(e), e, e.__traceback__) + + +class TestExcepthook: + def test_runtime_error_produces_crash_event(self): + """RuntimeError triggers a crash event with correct fields.""" + install_crash_handlers() + set_phase("runtime") + + # Swallow the original handler's traceback output + crash_mod._original_excepthook = lambda *a, **kw: None + + _invoke_excepthook(RuntimeError("boom")) + + assert len(telemetry_mod._event_queue) == 1 + event = telemetry_mod._event_queue[0] + assert event["event"] == "crash" + assert event["properties"]["error_type"] == "RuntimeError" + assert event["properties"]["where"] == "runtime" + assert event["properties"]["source"] == "excepthook" + + def test_keyboard_interrupt_ignored(self): + """KeyboardInterrupt does not produce a crash event.""" + install_crash_handlers() + crash_mod._original_excepthook = lambda *a, **kw: None + + _invoke_excepthook(KeyboardInterrupt()) + + assert len(telemetry_mod._event_queue) == 0 + + def test_system_exit_ignored(self): + """SystemExit does not produce a crash event.""" + install_crash_handlers() + crash_mod._original_excepthook = lambda *a, **kw: None + + _invoke_excepthook(SystemExit(1)) + + assert len(telemetry_mod._event_queue) == 0 + + def test_click_usage_error_ignored(self): + """click.UsageError (user input error) does not produce a crash event.""" + install_crash_handlers() + crash_mod._original_excepthook = lambda *a, **kw: None + + _invoke_excepthook(click.UsageError("invalid option")) + + assert len(telemetry_mod._event_queue) == 0 + + def test_original_handler_preserved(self): + """Original excepthook is called even when we record the crash.""" + original_called: dict[str, Any] = {"called": False, "args": None} + + def fake_original(exc_type, exc, tb): + original_called["called"] = True + original_called["args"] = (exc_type, exc, tb) + + sys.excepthook = fake_original + install_crash_handlers() + + _invoke_excepthook(ValueError("boom")) + + assert original_called["called"] is True + assert original_called["args"] is not None + exc_type, exc, _tb = original_called["args"] + assert exc_type is ValueError + assert str(exc) == "boom" + + def test_original_handler_called_for_ignored_exceptions(self): + """Ignored exceptions still get passed to the original handler (no event).""" + calls: list[type[BaseException]] = [] + sys.excepthook = lambda et, e, tb: calls.append(et) + install_crash_handlers() + + _invoke_excepthook(KeyboardInterrupt()) + _invoke_excepthook(SystemExit(1)) + + assert calls == [KeyboardInterrupt, SystemExit] + assert len(telemetry_mod._event_queue) == 0 + + def test_phase_reflected_in_event(self): + """The current phase is recorded in the crash event's `where` field.""" + install_crash_handlers() + crash_mod._original_excepthook = lambda *a, **kw: None + + set_phase("startup") + _invoke_excepthook(RuntimeError("early boom")) + + assert telemetry_mod._event_queue[0]["properties"]["where"] == "startup" + + +class TestAsyncioHandler: + @pytest.mark.asyncio + async def test_task_exception_produces_crash_event(self): + """Unhandled task exception is recorded with source=asyncio_task.""" + loop = asyncio.get_running_loop() + # Stub the default handler so pytest-asyncio doesn't fail the test + original_default = loop.default_exception_handler + loop.default_exception_handler = lambda ctx: None # type: ignore[method-assign] + try: + install_asyncio_handler(loop) + set_phase("runtime") + + loop.call_exception_handler( + {"message": "task failed", "exception": RuntimeError("async boom")} + ) + + assert len(telemetry_mod._event_queue) == 1 + event = telemetry_mod._event_queue[0] + assert event["event"] == "crash" + assert event["properties"]["error_type"] == "RuntimeError" + assert event["properties"]["where"] == "runtime" + assert event["properties"]["source"] == "asyncio_task" + finally: + loop.default_exception_handler = original_default # type: ignore[method-assign] + loop.set_exception_handler(None) + + @pytest.mark.asyncio + async def test_cancelled_error_ignored(self): + """asyncio.CancelledError does not produce a crash event.""" + loop = asyncio.get_running_loop() + original_default = loop.default_exception_handler + loop.default_exception_handler = lambda ctx: None # type: ignore[method-assign] + try: + install_asyncio_handler(loop) + + loop.call_exception_handler( + {"message": "cancelled", "exception": asyncio.CancelledError()} + ) + + assert len(telemetry_mod._event_queue) == 0 + finally: + loop.default_exception_handler = original_default # type: ignore[method-assign] + loop.set_exception_handler(None) + + @pytest.mark.asyncio + async def test_original_handler_preserved(self): + """An existing custom asyncio exception handler is still invoked.""" + captured: list[dict[str, Any]] = [] + + def custom_handler(loop: asyncio.AbstractEventLoop, ctx: dict[str, Any]) -> None: + captured.append(ctx) + + loop = asyncio.get_running_loop() + loop.set_exception_handler(custom_handler) + install_asyncio_handler(loop) + + try: + loop.call_exception_handler( + {"message": "task failed", "exception": RuntimeError("boom")} + ) + + assert len(captured) == 1 + assert isinstance(captured[0]["exception"], RuntimeError) + assert len(telemetry_mod._event_queue) == 1 + finally: + loop.set_exception_handler(None) + + @pytest.mark.asyncio + async def test_default_handler_used_when_no_original(self): + """When no prior handler was set, the default handler is invoked.""" + loop = asyncio.get_running_loop() + calls: list[dict[str, Any]] = [] + loop.default_exception_handler = lambda ctx: calls.append(ctx) # type: ignore[method-assign] + install_asyncio_handler(loop) + + try: + loop.call_exception_handler( + {"message": "task failed", "exception": RuntimeError("boom")} + ) + assert len(calls) == 1 + finally: + loop.set_exception_handler(None) + + +class TestCrashEventViaSink: + def test_crash_event_routed_through_sink_when_attached(self): + """A crash event recorded after sink attach goes to the sink.""" + mock_sink = MagicMock(spec=EventSink) + attach_sink(mock_sink) + install_crash_handlers() + crash_mod._original_excepthook = lambda *a, **kw: None + + _invoke_excepthook(RuntimeError("boom")) + + mock_sink.accept.assert_called_once() + event = mock_sink.accept.call_args[0][0] + assert event["event"] == "crash" + assert event["properties"]["source"] == "excepthook" + + +class TestPhase: + def test_set_phase_updates_value(self): + set_phase("runtime") + assert crash_mod._phase == "runtime" + set_phase("shutdown") + assert crash_mod._phase == "shutdown" + + +class TestIdempotentInstall: + def test_install_crash_handlers_is_idempotent(self): + """Calling install_crash_handlers twice does not double-wrap itself.""" + install_crash_handlers() + first = sys.excepthook + saved_original = crash_mod._original_excepthook + + install_crash_handlers() + + # Excepthook pointer unchanged + assert sys.excepthook is first + # And critically: _original_excepthook was NOT overwritten with + # our own hook (which would cause infinite recursion when invoked). + assert crash_mod._original_excepthook is saved_original + assert crash_mod._original_excepthook is not crash_mod._excepthook diff --git a/tests/telemetry/test_instrumentation.py b/tests/telemetry/test_instrumentation.py new file mode 100644 index 000000000..959821428 --- /dev/null +++ b/tests/telemetry/test_instrumentation.py @@ -0,0 +1,960 @@ +"""Tests for telemetry event behavior and schema correctness. + +These tests exercise the telemetry API directly and verify that calls to +track() and related helpers produce the expected event names, properties, +queue entries, and sink-forwarded payloads under the correct conditions. +They do NOT verify that specific production UI/soul call sites are still +instrumented — that coverage belongs in integration tests. +Transport/infrastructure tests are in test_telemetry.py. +""" + +from __future__ import annotations + +import time +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +import kimi_cli.telemetry as telemetry_mod +from kimi_cli.telemetry import attach_sink, disable, set_context, track +from kimi_cli.telemetry.sink import EventSink +from kimi_cli.telemetry.transport import AsyncTransport + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _reset_telemetry_state(): + """Reset telemetry module state before each test.""" + telemetry_mod._event_queue.clear() + telemetry_mod._device_id = None + telemetry_mod._session_id = None + telemetry_mod._client_info = None + telemetry_mod._sink = None + telemetry_mod._disabled = False + yield + telemetry_mod._event_queue.clear() + telemetry_mod._device_id = None + telemetry_mod._session_id = None + telemetry_mod._client_info = None + telemetry_mod._sink = None + telemetry_mod._disabled = False + + +def _collect_events() -> list[dict[str, Any]]: + """Return a snapshot of queued events.""" + return list(telemetry_mod._event_queue) + + +def _collect_sink_events(sink_mock: MagicMock) -> list[dict[str, Any]]: + """Extract events forwarded to a mock sink.""" + return [call[0][0] for call in sink_mock.accept.call_args_list] + + +# --------------------------------------------------------------------------- +# 1. Slash command counting correctness +# --------------------------------------------------------------------------- + + +class TestSlashCommandCounting: + """Verify that slash commands emit exactly one input_command event.""" + + def test_shell_slash_command_tracks_once(self): + """A shell-level slash command emits input_command with the command name.""" + # Simulate what _run_slash_command does: one track call + track("input_command", command="model") + events = _collect_events() + matching = [e for e in events if e["event"] == "input_command"] + assert len(matching) == 1 + assert matching[0]["properties"]["command"] == "model" + + def test_soul_slash_command_tracks_once(self): + """A soul-level slash command emits input_command (not double-counted).""" + # Soul-level commands are tracked at the shell layer before dispatch + track("input_command", command="compact") + events = _collect_events() + matching = [e for e in events if e["event"] == "input_command"] + assert len(matching) == 1 + assert matching[0]["properties"]["command"] == "compact" + + def test_invalid_command_tracks_separate_event(self): + """Invalid slash commands emit input_command_invalid, not input_command.""" + track("input_command_invalid") + events = _collect_events() + assert any(e["event"] == "input_command_invalid" for e in events) + assert not any(e["event"] == "input_command" for e in events) + + def test_no_double_counting_shell_and_soul(self): + """Shell and soul layers must not both emit for the same command invocation.""" + # Simulate: only one track call per command execution path + track("input_command", command="yolo") + events = _collect_events() + cmd_events = [e for e in events if e["event"] == "input_command"] + assert len(cmd_events) == 1 + + def test_command_property_is_string_enum(self): + """Command property must be a string (enum-like), not an int or bool.""" + track("input_command", command="clear") + event = _collect_events()[-1] + assert isinstance(event["properties"]["command"], str) + + +# --------------------------------------------------------------------------- +# 2. Tool approval path completeness +# --------------------------------------------------------------------------- + + +class TestToolApprovalPaths: + """Every approval path must emit exactly one of the two tool tracking events.""" + + def test_manual_approve(self): + """User clicking approve emits tool_approved with approval_mode=manual.""" + track("tool_approved", tool_name="Bash", approval_mode="manual") + events = _collect_events() + assert events[-1]["event"] == "tool_approved" + assert events[-1]["properties"]["tool_name"] == "Bash" + assert events[-1]["properties"]["approval_mode"] == "manual" + + def test_approve_for_session(self): + """'Approve for session' emits tool_approved with approval_mode=manual.""" + track("tool_approved", tool_name="WriteFile", approval_mode="manual") + events = _collect_events() + assert events[-1]["event"] == "tool_approved" + assert events[-1]["properties"]["approval_mode"] == "manual" + + def test_yolo_approve(self): + """Yolo auto-approval emits tool_approved with approval_mode=yolo.""" + track("tool_approved", tool_name="Bash", approval_mode="yolo") + event = _collect_events()[-1] + assert event["properties"]["approval_mode"] == "yolo" + + def test_auto_session_approve(self): + """Session-cached auto-approval emits approval_mode=auto_session.""" + track("tool_approved", tool_name="ReadFile", approval_mode="auto_session") + event = _collect_events()[-1] + assert event["properties"]["approval_mode"] == "auto_session" + + def test_manual_reject(self): + """User clicking reject emits tool_rejected with approval_mode=manual.""" + track("tool_rejected", tool_name="Bash", approval_mode="manual") + events = _collect_events() + assert events[-1]["event"] == "tool_rejected" + assert events[-1]["properties"]["tool_name"] == "Bash" + assert events[-1]["properties"]["approval_mode"] == "manual" + + def test_cancelled_approval(self): + """ApprovalCancelledError (e.g. Esc) emits tool_rejected with approval_mode=cancelled.""" + track("tool_rejected", tool_name="Bash", approval_mode="cancelled") + events = _collect_events() + assert events[-1]["event"] == "tool_rejected" + assert events[-1]["properties"]["approval_mode"] == "cancelled" + + def test_approval_events_are_mutually_exclusive(self): + """Each approval path emits exactly one event — they never overlap.""" + track("tool_approved", tool_name="Bash") + events = _collect_events() + approval_events = [e for e in events if e["event"] in ("tool_approved", "tool_rejected")] + assert len(approval_events) == 1 + + def test_tool_name_always_present(self): + """All tool approval events include tool_name.""" + for event_name in ("tool_approved", "tool_rejected"): + telemetry_mod._event_queue.clear() + track(event_name, tool_name="SomeTool") + event = _collect_events()[-1] + assert "tool_name" in event["properties"], f"{event_name} missing tool_name" + + +# --------------------------------------------------------------------------- +# 3. API error classification +# --------------------------------------------------------------------------- + + +class TestAPIErrorClassification: + """Verify the error_type mapping in api_error events. + + Tests call the real classifier function, so any drift in the production + mapping shows up here. + """ + + def _mk_status_error(self, status: int, message: str = ""): + from kosong.chat_provider import APIStatusError + + exc = APIStatusError.__new__(APIStatusError) + exc.status_code = status + exc.args = (message,) if message else () + return exc + + def test_429_maps_to_rate_limit(self): + from kimi_cli.soul.kimisoul import classify_api_error + + et, sc = classify_api_error(self._mk_status_error(429)) + assert et == "rate_limit" + assert sc == 429 + + def test_401_maps_to_auth(self): + from kimi_cli.soul.kimisoul import classify_api_error + + et, sc = classify_api_error(self._mk_status_error(401)) + assert et == "auth" + assert sc == 401 + + def test_403_maps_to_auth(self): + from kimi_cli.soul.kimisoul import classify_api_error + + et, _ = classify_api_error(self._mk_status_error(403)) + assert et == "auth" + + def test_500_maps_to_5xx_server(self): + from kimi_cli.soul.kimisoul import classify_api_error + + et, sc = classify_api_error(self._mk_status_error(500)) + assert et == "5xx_server" + assert sc == 500 + + def test_502_maps_to_5xx_server(self): + from kimi_cli.soul.kimisoul import classify_api_error + + et, _ = classify_api_error(self._mk_status_error(502)) + assert et == "5xx_server" + + def test_400_maps_to_4xx_client(self): + from kimi_cli.soul.kimisoul import classify_api_error + + et, sc = classify_api_error(self._mk_status_error(400)) + assert et == "4xx_client" + assert sc == 400 + + def test_422_maps_to_4xx_client(self): + from kimi_cli.soul.kimisoul import classify_api_error + + et, _ = classify_api_error(self._mk_status_error(422)) + assert et == "4xx_client" + + def test_400_with_context_length_maps_to_context_overflow(self): + from kimi_cli.soul.kimisoul import classify_api_error + + et, _ = classify_api_error(self._mk_status_error(400, "Context length exceeded")) + assert et == "context_overflow" + + def test_400_with_max_tokens_maps_to_context_overflow(self): + from kimi_cli.soul.kimisoul import classify_api_error + + et, _ = classify_api_error(self._mk_status_error(400, "Exceeded max tokens")) + assert et == "context_overflow" + + def test_400_with_maximum_context_maps_to_context_overflow(self): + from kimi_cli.soul.kimisoul import classify_api_error + + et, _ = classify_api_error(self._mk_status_error(422, "Maximum context window exceeded")) + assert et == "context_overflow" + + def test_connection_error_maps_to_network(self): + from kosong.chat_provider import APIConnectionError + + from kimi_cli.soul.kimisoul import classify_api_error + + et, sc = classify_api_error(APIConnectionError.__new__(APIConnectionError)) + assert et == "network" + assert sc is None + + def test_api_timeout_maps_to_timeout(self): + from kosong.chat_provider import APITimeoutError + + from kimi_cli.soul.kimisoul import classify_api_error + + et, _ = classify_api_error(APITimeoutError.__new__(APITimeoutError)) + assert et == "timeout" + + def test_builtin_timeout_maps_to_timeout(self): + from kimi_cli.soul.kimisoul import classify_api_error + + et, _ = classify_api_error(TimeoutError("timed out")) + assert et == "timeout" + + def test_empty_response_maps_to_empty_response(self): + from kosong.chat_provider import APIEmptyResponseError + + from kimi_cli.soul.kimisoul import classify_api_error + + et, sc = classify_api_error(APIEmptyResponseError.__new__(APIEmptyResponseError)) + assert et == "empty_response" + assert sc is None + + def test_generic_exception_maps_to_other(self): + from kimi_cli.soul.kimisoul import classify_api_error + + et, sc = classify_api_error(RuntimeError("unexpected")) + assert et == "other" + assert sc is None + + def test_status_code_is_none_for_non_http_errors(self): + """Only APIStatusError should produce a non-None status_code.""" + from kimi_cli.soul.kimisoul import classify_api_error + + _, sc = classify_api_error(RuntimeError("other")) + assert sc is None + + def test_classification_emits_correct_track_call(self): + """The classified error_type is passed as a string property.""" + track("api_error", error_type="rate_limit") + event = _collect_events()[-1] + assert event["event"] == "api_error" + assert event["properties"]["error_type"] == "rate_limit" + assert isinstance(event["properties"]["error_type"], str) + + def test_api_error_with_status_code_field(self): + """When status_code is available it is included in the event properties.""" + track("api_error", error_type="5xx_server", status_code=503) + event = _collect_events()[-1] + assert event["properties"]["status_code"] == 503 + assert isinstance(event["properties"]["status_code"], int) + + +# --------------------------------------------------------------------------- +# 4. Cancel / interrupt correctness +# --------------------------------------------------------------------------- + + +class TestCancelInterrupt: + """Verify cancel and interrupt events.""" + + def test_esc_emits_cancel(self): + """Pressing Esc during streaming emits cancel.""" + track("cancel") + events = _collect_events() + assert events[-1]["event"] == "cancel" + + def test_esc_in_question_panel_emits_dismissed(self): + """Pressing Esc on question panel emits question_dismissed, not cancel.""" + track("question_dismissed") + events = _collect_events() + assert events[-1]["event"] == "question_dismissed" + assert not any(e["event"] == "cancel" for e in events) + + def test_run_cancelled_emits_turn_interrupted(self): + """RunCancelled exception emits turn_interrupted with at_step.""" + track("turn_interrupted", at_step=3) + event = _collect_events()[-1] + assert event["event"] == "turn_interrupted" + assert event["properties"]["at_step"] == 3 + + def test_turn_interrupted_at_step_is_int(self): + """at_step property must be an integer.""" + track("turn_interrupted", at_step=0) + event = _collect_events()[-1] + assert isinstance(event["properties"]["at_step"], int) + + def test_cancel_and_dismissed_are_distinct(self): + """cancel and question_dismissed are different events.""" + track("cancel") + track("question_dismissed") + events = _collect_events() + event_names = [e["event"] for e in events] + assert "cancel" in event_names + assert "question_dismissed" in event_names + + +# --------------------------------------------------------------------------- +# 5. Core infrastructure edge cases +# --------------------------------------------------------------------------- + + +class TestInfrastructureEdgeCases: + """Tests for telemetry infrastructure behavior under edge conditions.""" + + def test_disabled_track_is_noop(self): + """After disable(), track() is a silent no-op.""" + disable() + track("should_be_dropped") + assert len(telemetry_mod._event_queue) == 0 + + def test_disabled_with_sink_clears_buffer(self): + """disable() clears both queue and sink buffer.""" + mock_sink = MagicMock(spec=EventSink) + attach_sink(mock_sink) + track("event_before") + disable() + mock_sink.clear_buffer.assert_called_once() + + def test_flush_sync_empty_buffer_is_noop(self): + """flush_sync with empty buffer does not call transport.""" + transport = MagicMock(spec=AsyncTransport) + sink = EventSink(transport, version="1.0.0") + sink.flush_sync() + transport.save_to_disk.assert_not_called() + + def test_flush_sync_writes_to_disk(self): + """flush_sync (atexit) saves events to disk, not HTTP.""" + transport = MagicMock(spec=AsyncTransport) + sink = EventSink(transport, version="1.0.0") + sink.accept({"event": "test", "timestamp": 1.0, "properties": {}}) + sink.flush_sync() + transport.save_to_disk.assert_called_once() + events = transport.save_to_disk.call_args[0][0] + assert len(events) == 1 + + @pytest.mark.asyncio + async def test_transport_send_falls_back_to_disk_on_transient_error(self): + """Transient HTTP errors trigger disk fallback via send().""" + from kimi_cli.telemetry.transport import _TransientError + + transport = AsyncTransport(endpoint="https://mock.test/events", retry_backoffs_s=()) + with ( + patch.object( + transport, "_send_http", new_callable=AsyncMock, side_effect=_TransientError("503") + ), + patch.object(transport, "save_to_disk") as mock_save, + ): + await transport.send([{"event": "test", "timestamp": 1.0}]) + mock_save.assert_called_once() + saved_events = mock_save.call_args[0][0] + assert len(saved_events) == 1 + assert saved_events[0]["event"] == "test" + + def test_queue_overflow_preserves_newest(self): + """When queue overflows, oldest events are dropped, newest kept.""" + for i in range(telemetry_mod._MAX_QUEUE_SIZE + 50): + track(f"evt_{i}") + events = _collect_events() + assert len(events) == telemetry_mod._MAX_QUEUE_SIZE + # Newest event should be last + assert events[-1]["event"] == f"evt_{telemetry_mod._MAX_QUEUE_SIZE + 49}" + # Oldest surviving event + assert events[0]["event"] == "evt_50" + + @pytest.mark.asyncio + async def test_disk_file_expiry(self, tmp_path: Path): + """Files older than 7 days are deleted without retry.""" + import os + + failed_file = tmp_path / "failed_old.jsonl" + failed_file.write_text('{"event":"old","timestamp":1.0}\n') + old_time = time.time() - 8 * 24 * 3600 + os.utime(failed_file, (old_time, old_time)) + + transport = AsyncTransport(endpoint="https://mock.test/events") + with ( + patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path), + patch.object(transport, "_send_http", new_callable=AsyncMock) as mock_send, + ): + await transport.retry_disk_events() + mock_send.assert_not_called() + assert not failed_file.exists() + + +# --------------------------------------------------------------------------- +# 6. Specific event property correctness +# --------------------------------------------------------------------------- + + +class TestEventPropertyCorrectness: + """Verify specific events carry the right property types and values.""" + + def test_yolo_toggle_enabled_bool(self): + """yolo_toggle.enabled is a bool.""" + track("yolo_toggle", enabled=True) + event = _collect_events()[-1] + assert isinstance(event["properties"]["enabled"], bool) + assert event["properties"]["enabled"] is True + + telemetry_mod._event_queue.clear() + track("yolo_toggle", enabled=False) + event = _collect_events()[-1] + assert event["properties"]["enabled"] is False + + def test_shortcut_mode_switch_to_mode(self): + """shortcut_mode_switch.to_mode is a string enum.""" + track("shortcut_mode_switch", to_mode="agent") + event = _collect_events()[-1] + assert event["properties"]["to_mode"] == "agent" + assert isinstance(event["properties"]["to_mode"], str) + + def test_question_answered_method_enum(self): + """question_answered.method is a string enum.""" + for method in ("number_key", "enter", "escape"): + telemetry_mod._event_queue.clear() + track("question_answered", method=method) + event = _collect_events()[-1] + assert event["properties"]["method"] == method + + def test_tool_error_has_tool_name_and_error_type(self): + """tool_error includes tool_name and error_type (Python exception class name).""" + track("tool_error", tool_name="Bash", error_type="RuntimeError") + event = _collect_events()[-1] + assert event["event"] == "tool_error" + assert event["properties"]["tool_name"] == "Bash" + assert event["properties"]["error_type"] == "RuntimeError" + + def test_tool_call_success_has_no_error_type(self): + """tool_call success path: tool_name + success=True + duration_ms, no error_type.""" + track("tool_call", tool_name="ReadFile", success=True, duration_ms=123) + event = _collect_events()[-1] + assert event["event"] == "tool_call" + assert event["properties"]["tool_name"] == "ReadFile" + assert event["properties"]["success"] is True + assert event["properties"]["duration_ms"] == 123 + assert isinstance(event["properties"]["duration_ms"], int) + assert "error_type" not in event["properties"] + + def test_tool_call_failure_has_error_type(self): + """tool_call failure path includes error_type from Python exception name.""" + track( + "tool_call", + tool_name="Bash", + success=False, + duration_ms=42, + error_type="TimeoutError", + ) + event = _collect_events()[-1] + assert event["properties"]["success"] is False + assert event["properties"]["error_type"] == "TimeoutError" + + def test_oauth_refresh_success_has_no_reason(self): + """oauth_refresh success: only success=True, no reason field.""" + track("oauth_refresh", success=True) + event = _collect_events()[-1] + assert event["properties"]["success"] is True + assert "reason" not in event["properties"] + + def test_oauth_refresh_unauthorized_has_reason(self): + """OAuthUnauthorized path: success=False + reason=unauthorized.""" + track("oauth_refresh", success=False, reason="unauthorized") + event = _collect_events()[-1] + assert event["properties"]["success"] is False + assert event["properties"]["reason"] == "unauthorized" + + def test_oauth_refresh_generic_failure_has_reason(self): + """Generic Exception path: success=False + reason=network_or_other.""" + track("oauth_refresh", success=False, reason="network_or_other") + event = _collect_events()[-1] + assert event["properties"]["reason"] == "network_or_other" + + def test_mcp_connected_has_total_count(self): + """mcp_connected has server_count and total_count.""" + track("mcp_connected", server_count=2, total_count=3) + event = _collect_events()[-1] + assert event["properties"]["server_count"] == 2 + assert event["properties"]["total_count"] == 3 + + def test_mcp_failed_has_failed_count(self): + """mcp_failed has failed_count and total_count.""" + track("mcp_failed", failed_count=1, total_count=3) + event = _collect_events()[-1] + assert event["properties"]["failed_count"] == 1 + assert event["properties"]["total_count"] == 3 + + def test_session_load_failed_has_reason(self): + """session_load_failed includes the Python exception class name as reason.""" + track("session_load_failed", reason="JSONDecodeError") + event = _collect_events()[-1] + assert event["event"] == "session_load_failed" + assert event["properties"]["reason"] == "JSONDecodeError" + assert isinstance(event["properties"]["reason"], str) + + def test_exit_event_has_duration(self): + """exit includes duration_s (float).""" + track("exit", duration_s=123.456) + event = _collect_events()[-1] + assert isinstance(event["properties"]["duration_s"], float) + + def test_startup_perf_has_four_phase_timings(self): + """startup_perf has duration_ms + config_ms + init_ms + mcp_ms (all int).""" + track( + "startup_perf", + duration_ms=342, + config_ms=42, + init_ms=100, + mcp_ms=180, + ) + event = _collect_events()[-1] + for field in ("duration_ms", "config_ms", "init_ms", "mcp_ms"): + assert field in event["properties"], f"missing {field}" + assert isinstance(event["properties"][field], int) + + def test_model_switch_has_model_string(self): + """model_switch.model is a string.""" + track("model_switch", model="kimi-k2.5") + event = _collect_events()[-1] + assert event["properties"]["model"] == "kimi-k2.5" + + def test_hook_triggered_properties(self): + """hook_triggered has event_type and action.""" + track("hook_triggered", event_type="PreToolUse", action="block") + event = _collect_events()[-1] + assert event["properties"]["event_type"] == "PreToolUse" + assert event["properties"]["action"] == "block" + + def test_started_event_has_yolo(self): + """started includes resumed (bool) and yolo (bool).""" + track("started", resumed=False, yolo=True) + event = _collect_events()[-1] + assert event["event"] == "started" + assert event["properties"]["resumed"] is False + assert event["properties"]["yolo"] is True + + def test_background_task_completed_success_no_reason(self): + """Success path: no `reason` field.""" + track("background_task_completed", success=True, duration_s=45.2) + event = _collect_events()[-1] + assert event["properties"]["success"] is True + assert isinstance(event["properties"]["duration_s"], float) + assert "reason" not in event["properties"] + + def test_background_task_completed_failure_reason_error(self): + """_mark_task_failed emits reason='error'.""" + track( + "background_task_completed", + success=False, + duration_s=10.0, + reason="error", + ) + event = _collect_events()[-1] + assert event["properties"]["reason"] == "error" + + def test_background_task_completed_failure_reason_timeout(self): + """_mark_task_timed_out emits reason='timeout'.""" + track( + "background_task_completed", + success=False, + duration_s=300.0, + reason="timeout", + ) + event = _collect_events()[-1] + assert event["properties"]["reason"] == "timeout" + + def test_background_task_completed_failure_reason_killed(self): + """_mark_task_killed emits reason='killed'.""" + track( + "background_task_completed", + success=False, + duration_s=5.0, + reason="killed", + ) + event = _collect_events()[-1] + assert event["properties"]["reason"] == "killed" + + def test_background_task_no_event_without_start_time(self): + """_mark_task_completed must NOT emit track when started_at is None.""" + from kimi_cli.background.manager import BackgroundTaskManager + from kimi_cli.background.models import TaskRuntime + + runtime = TaskRuntime(status="running", started_at=None) + mock_store = MagicMock() + mock_store.read_runtime.return_value = runtime + + manager = object.__new__(BackgroundTaskManager) + manager._store = mock_store + + with patch("kimi_cli.telemetry.track") as mock_track: + manager._mark_task_completed("task-no-start") + + mock_track.assert_not_called() + + def test_mark_task_killed_emits_completed_event(self): + """_mark_task_killed must emit background_task_completed(success=False).""" + from kimi_cli.background.manager import BackgroundTaskManager + from kimi_cli.background.models import TaskRuntime + + runtime = TaskRuntime(status="running", started_at=1000.0) + + mock_store = MagicMock() + mock_store.read_runtime.return_value = runtime + + manager = object.__new__(BackgroundTaskManager) + manager._store = mock_store + + with patch("kimi_cli.telemetry.track") as mock_track: + manager._mark_task_killed("task-123", "Killed by user") + + mock_track.assert_called_once() + call_args = mock_track.call_args + assert call_args[0][0] == "background_task_completed" + assert call_args[1]["success"] is False + assert "duration_s" in call_args[1] + + def test_mark_task_killed_no_event_without_start_time(self): + """_mark_task_killed must NOT emit track when started_at is None.""" + from kimi_cli.background.manager import BackgroundTaskManager + from kimi_cli.background.models import TaskRuntime + + runtime = TaskRuntime(status="running", started_at=None) + mock_store = MagicMock() + mock_store.read_runtime.return_value = runtime + + manager = object.__new__(BackgroundTaskManager) + manager._store = mock_store + + with patch("kimi_cli.telemetry.track") as mock_track: + manager._mark_task_killed("task-no-start", "Killed by user") + + mock_track.assert_not_called() + + def test_timestamp_is_recent(self): + """All events get a timestamp close to now.""" + before = time.time() + track("test") + after = time.time() + event = _collect_events()[-1] + assert before <= event["timestamp"] <= after + + +# --------------------------------------------------------------------------- +# 7. Context enrichment +# --------------------------------------------------------------------------- + + +class TestContextEnrichment: + """Verify EventSink enriches events correctly.""" + + def test_enrichment_adds_version_platform(self): + """Enriched events include version and platform.""" + transport = MagicMock(spec=AsyncTransport) + sink = EventSink(transport, version="2.0.0", model="test-model") + sink.accept({"event": "test", "timestamp": 1.0, "properties": {}}) + sink.flush_sync() + buffered = transport.save_to_disk.call_args[0][0][0] + assert buffered["context"]["version"] == "2.0.0" + assert buffered["context"]["model"] == "test-model" + assert "platform" in buffered["context"] + assert "arch" in buffered["context"] + + def test_enrichment_does_not_mutate_input(self): + """accept() must not mutate the caller's dict.""" + transport = MagicMock(spec=AsyncTransport) + sink = EventSink(transport, version="1.0.0") + original = {"event": "test", "timestamp": 1.0, "properties": {}} + sink.accept(original) + assert "context" not in original + + def test_model_set_at_init(self): + """Model passed at init appears in enriched context.""" + transport = MagicMock(spec=AsyncTransport) + sink = EventSink(transport, version="1.0.0", model="test-model") + sink.accept({"event": "test", "timestamp": 1.0, "properties": {}}) + sink.flush_sync() + buffered = transport.save_to_disk.call_args[0][0][0] + assert buffered["context"]["model"] == "test-model" + + def test_device_and_session_ids_propagate(self): + """device_id and session_id set via set_context() appear in events.""" + set_context(device_id="dev-abc", session_id="sess-xyz") + track("test_event") + event = _collect_events()[-1] + assert event["device_id"] == "dev-abc" + assert event["session_id"] == "sess-xyz" + + def test_enrichment_adds_runtime_python(self): + """context.runtime is always 'python' for the Python CLI.""" + transport = MagicMock(spec=AsyncTransport) + sink = EventSink(transport, version="1.0.0") + sink.accept({"event": "test", "timestamp": 1.0, "properties": {}}) + sink.flush_sync() + buffered = transport.save_to_disk.call_args[0][0][0] + assert buffered["context"]["runtime"] == "python" + + +# --------------------------------------------------------------------------- +# 7b. Client info (wire/acp) +# --------------------------------------------------------------------------- + + +class TestClientInfo: + """Verify set_client_info and its propagation through sink enrichment.""" + + def _make_sink(self) -> tuple[EventSink, MagicMock]: + transport = MagicMock(spec=AsyncTransport) + sink = EventSink(transport, version="1.0.0") + return sink, transport + + def _enrich(self, sink: EventSink, transport: MagicMock) -> dict[str, Any]: + sink.accept({"event": "t", "timestamp": 1.0, "properties": {}}) + sink.flush_sync() + return transport.save_to_disk.call_args[0][0][0] + + def test_no_client_info_means_fields_absent(self): + """If set_client_info is never called, context has neither field.""" + sink, transport = self._make_sink() + enriched = self._enrich(sink, transport) + assert "client_name" not in enriched["context"] + assert "client_version" not in enriched["context"] + + def test_set_client_info_populates_both_fields(self): + """set_client_info with name+version injects both into context.""" + from kimi_cli.telemetry import set_client_info + + set_client_info(name="vscode", version="1.90.0") + sink, transport = self._make_sink() + enriched = self._enrich(sink, transport) + assert enriched["context"]["client_name"] == "vscode" + assert enriched["context"]["client_version"] == "1.90.0" + + def test_set_client_info_name_only_omits_version(self): + """When version is None, client_version key is not added.""" + from kimi_cli.telemetry import set_client_info + + set_client_info(name="zed") + sink, transport = self._make_sink() + enriched = self._enrich(sink, transport) + assert enriched["context"]["client_name"] == "zed" + assert "client_version" not in enriched["context"] + + def test_set_client_info_empty_name_is_ignored(self): + """Empty string name must not overwrite any previously set info.""" + from kimi_cli.telemetry import set_client_info + + set_client_info(name="cursor", version="0.40.0") + set_client_info(name="", version="anything") + sink, transport = self._make_sink() + enriched = self._enrich(sink, transport) + # Previous value preserved + assert enriched["context"]["client_name"] == "cursor" + assert enriched["context"]["client_version"] == "0.40.0" + + def test_set_client_info_overwrites_previous(self): + """Non-empty set_client_info replaces the tuple atomically.""" + from kimi_cli.telemetry import set_client_info + + set_client_info(name="vscode", version="1.90.0") + set_client_info(name="zed", version="0.180.0") + sink, transport = self._make_sink() + enriched = self._enrich(sink, transport) + assert enriched["context"]["client_name"] == "zed" + assert enriched["context"]["client_version"] == "0.180.0" + + def test_client_info_stored_as_tuple(self): + """_client_info is stored as a tuple so readers never see a half-update.""" + from kimi_cli.telemetry import set_client_info + + set_client_info(name="kimi-web", version="2.0.0") + assert telemetry_mod._client_info == ("kimi-web", "2.0.0") + + def test_values_pass_through_verbatim(self): + """No sanitization: values should reach the backend unchanged.""" + from kimi_cli.telemetry import set_client_info + + weird_name = "VS Code\n(Insiders)" + weird_version = "1.90.0-beta\ttest" + set_client_info(name=weird_name, version=weird_version) + sink, transport = self._make_sink() + enriched = self._enrich(sink, transport) + assert enriched["context"]["client_name"] == weird_name + assert enriched["context"]["client_version"] == weird_version + + +# --------------------------------------------------------------------------- +# 7c. Compaction tracking (exercises real compact_context branches) +# --------------------------------------------------------------------------- + + +class TestCompactionTracking: + """compaction_triggered must fire on both success and failure paths.""" + + def _make_soul(self, *, before_tokens: int, estimated_after: int) -> Any: + """Construct a minimal KimiSoul stub bypassing __init__.""" + from kimi_cli.soul.kimisoul import KimiSoul + + soul = object.__new__(KimiSoul) + + runtime = MagicMock() + runtime.llm = MagicMock() # non-None so LLMNotSet is not raised + runtime.session.id = "test-session" + runtime.role = "non-root" # skip active-task-snapshot branch + runtime.background_tasks = MagicMock() + soul._runtime = runtime + + ctx = MagicMock() + ctx.token_count = before_tokens + ctx.history = [] + ctx.clear = AsyncMock() + ctx.write_system_prompt = AsyncMock() + ctx.append_message = AsyncMock() + ctx.update_token_count = AsyncMock() + soul._context = ctx + + soul._hook_engine = MagicMock() + soul._hook_engine.trigger = AsyncMock() + + soul._compaction = MagicMock() + + soul._agent = MagicMock() + soul._agent.system_prompt = "sys" + + loop_control = MagicMock() + loop_control.max_retries_per_step = 1 + soul._loop_control = loop_control + + soul._checkpoint = AsyncMock() + + # _run_with_connection_recovery returns a value with .messages and + # .estimated_token_count — shape it with MagicMock to avoid depending + # on the internal NamedTuple layout. + fake_result = MagicMock() + fake_result.messages = [] + fake_result.estimated_token_count = estimated_after + soul._run_with_connection_recovery = AsyncMock(return_value=fake_result) + return soul + + @pytest.mark.asyncio + async def test_auto_compaction_success_emits_event(self): + """Auto-triggered success: track has trigger_type=auto + after_tokens + success=True.""" + soul = self._make_soul(before_tokens=12000, estimated_after=3000) + + with ( + patch("kimi_cli.soul.kimisoul.wire_send"), + patch("kimi_cli.telemetry.track") as mock_track, + ): + await soul.compact_context() + + # Filter to the compaction event — other events (hook triggers etc.) + # shouldn't go through telemetry.track. + calls = [c for c in mock_track.call_args_list if c[0][0] == "compaction_triggered"] + assert len(calls) == 1 + args, kwargs = calls[0] + assert args[0] == "compaction_triggered" + assert kwargs["trigger_type"] == "auto" + assert kwargs["before_tokens"] == 12000 + assert kwargs["after_tokens"] == 3000 + assert kwargs["success"] is True + + @pytest.mark.asyncio + async def test_manual_compaction_success_emits_event(self): + """/compact with instruction yields trigger_type=manual.""" + soul = self._make_soul(before_tokens=8000, estimated_after=2000) + + with ( + patch("kimi_cli.soul.kimisoul.wire_send"), + patch("kimi_cli.telemetry.track") as mock_track, + ): + await soul.compact_context(custom_instruction="focus on auth") + + calls = [c for c in mock_track.call_args_list if c[0][0] == "compaction_triggered"] + assert len(calls) == 1 + assert calls[0][1]["trigger_type"] == "manual" + assert calls[0][1]["success"] is True + + @pytest.mark.asyncio + async def test_compaction_failure_emits_event_then_reraises(self): + """On compaction failure: track success=False (no after_tokens), then re-raise.""" + soul = self._make_soul(before_tokens=50000, estimated_after=0) + # Force the compaction to fail with a non-retryable error + soul._run_with_connection_recovery = AsyncMock(side_effect=RuntimeError("compaction boom")) + + with ( + patch("kimi_cli.soul.kimisoul.wire_send"), + patch("kimi_cli.telemetry.track") as mock_track, + pytest.raises(RuntimeError, match="compaction boom"), + ): + await soul.compact_context() + + calls = [c for c in mock_track.call_args_list if c[0][0] == "compaction_triggered"] + assert len(calls) == 1 + kwargs = calls[0][1] + assert kwargs["trigger_type"] == "auto" + assert kwargs["before_tokens"] == 50000 + assert kwargs["success"] is False + assert "after_tokens" not in kwargs diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py new file mode 100644 index 000000000..bf50da0ec --- /dev/null +++ b/tests/telemetry/test_telemetry.py @@ -0,0 +1,1009 @@ +"""Tests for the telemetry system.""" + +from __future__ import annotations + +import asyncio +import json +import time +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +import kimi_cli.telemetry as telemetry_mod +from kimi_cli.telemetry import attach_sink, disable, set_context, track +from kimi_cli.telemetry.sink import EventSink +from kimi_cli.telemetry.transport import AsyncTransport + + +@pytest.fixture(autouse=True) +def _reset_telemetry_state(): + """Reset telemetry module state before each test.""" + telemetry_mod._event_queue.clear() + telemetry_mod._device_id = None + telemetry_mod._session_id = None + telemetry_mod._client_info = None + telemetry_mod._sink = None + telemetry_mod._disabled = False + yield + telemetry_mod._event_queue.clear() + telemetry_mod._device_id = None + telemetry_mod._session_id = None + telemetry_mod._client_info = None + telemetry_mod._sink = None + telemetry_mod._disabled = False + + +class TestTrack: + def test_track_queues_event_before_sink(self): + """Events are queued in memory before sink is attached.""" + track("test_event", foo=True, bar=42) + assert len(telemetry_mod._event_queue) == 1 + event = telemetry_mod._event_queue[0] + assert event["event"] == "test_event" + assert event["properties"] == {"foo": True, "bar": 42} + assert event["timestamp"] > 0 + + def test_track_includes_context_ids(self): + """Events include device_id and session_id.""" + set_context(device_id="dev123", session_id="sess456") + track("test_event") + event = telemetry_mod._event_queue[0] + assert event["device_id"] == "dev123" + assert event["session_id"] == "sess456" + + def test_track_forwards_to_sink(self): + """Events are forwarded to sink when attached.""" + mock_sink = MagicMock(spec=EventSink) + attach_sink(mock_sink) + track("test_event", key=1) + mock_sink.accept.assert_called_once() + event = mock_sink.accept.call_args[0][0] + assert event["event"] == "test_event" + assert event["properties"] == {"key": 1} + + def test_track_disabled_drops_events(self): + """Events are silently dropped when disabled.""" + disable() + track("test_event") + assert len(telemetry_mod._event_queue) == 0 + + def test_attach_sink_drains_queue(self): + """Attaching sink drains queued events.""" + track("event1") + track("event2") + assert len(telemetry_mod._event_queue) == 2 + + mock_sink = MagicMock(spec=EventSink) + attach_sink(mock_sink) + assert len(telemetry_mod._event_queue) == 0 + assert mock_sink.accept.call_count == 2 + + def test_track_empty_properties(self): + """Events with no properties have empty dict.""" + track("test_event") + event = telemetry_mod._event_queue[0] + assert event["properties"] == {} + + def test_track_string_properties(self): + """String properties are allowed for enum-like values.""" + track("test_event", command="model", mode="agent") + event = telemetry_mod._event_queue[0] + assert event["properties"]["command"] == "model" + assert event["properties"]["mode"] == "agent" + + def test_queue_max_size(self): + """Queue drops oldest events when exceeding MAX_QUEUE_SIZE.""" + for i in range(telemetry_mod._MAX_QUEUE_SIZE + 100): + track(f"event_{i}") + assert len(telemetry_mod._event_queue) == telemetry_mod._MAX_QUEUE_SIZE + # Oldest events should be dropped; newest should remain + assert ( + telemetry_mod._event_queue[-1]["event"] == f"event_{telemetry_mod._MAX_QUEUE_SIZE + 99}" + ) + assert telemetry_mod._event_queue[0]["event"] == "event_100" + + def test_disable_clears_sink_buffer(self): + """Disabling telemetry clears the sink buffer.""" + mock_sink = MagicMock(spec=EventSink) + attach_sink(mock_sink) + track("event_before_disable") + disable() + mock_sink.clear_buffer.assert_called_once() + # Further events should be dropped + track("event_after_disable") + # accept should have been called once (before disable), not twice + assert mock_sink.accept.call_count == 1 + + def test_attach_sink_flushes_previous_sink(self): + """Replacing the global sink (e.g. multi-session ACP) must flush the + previous sink so its buffered events aren't silently orphaned. + """ + first_sink = MagicMock(spec=EventSink) + attach_sink(first_sink) + second_sink = MagicMock(spec=EventSink) + attach_sink(second_sink) + first_sink.flush_sync.assert_called_once() + # Second attach does not re-flush itself + second_sink.flush_sync.assert_not_called() + + def test_attach_same_sink_twice_does_not_flush(self): + """Re-attaching the same sink is a no-op (no self-flush).""" + sink = MagicMock(spec=EventSink) + attach_sink(sink) + attach_sink(sink) + sink.flush_sync.assert_not_called() + + def test_event_id_is_hex_string(self): + """Every event has a unique event_id (hex string).""" + track("test_event") + event = telemetry_mod._event_queue[0] + assert "event_id" in event + assert isinstance(event["event_id"], str) + assert len(event["event_id"]) == 32 # uuid4 hex + + def test_event_ids_are_unique(self): + """Each event gets a distinct event_id.""" + track("event_a") + track("event_b") + ids = [e["event_id"] for e in telemetry_mod._event_queue] + assert ids[0] != ids[1] + + def test_backfill_device_and_session_id_on_attach(self): + """Events tracked before set_context() get backfilled on attach_sink().""" + # Track before context is set — device_id/session_id are None + track("early_event") + assert telemetry_mod._event_queue[0]["device_id"] is None + assert telemetry_mod._event_queue[0]["session_id"] is None + + # Now set context and attach sink + set_context(device_id="dev-backfill", session_id="sess-backfill") + mock_sink = MagicMock(spec=EventSink) + attach_sink(mock_sink) + + # The event forwarded to sink should have backfilled ids + event = mock_sink.accept.call_args[0][0] + assert event["device_id"] == "dev-backfill" + assert event["session_id"] == "sess-backfill" + + +class TestEventSink: + def test_accept_enriches_context(self): + """Events are enriched with version/platform context.""" + transport = MagicMock(spec=AsyncTransport) + sink = EventSink(transport, version="1.0.0", model="kimi-k2.5") + event: dict[str, Any] = { + "event": "test", + "timestamp": time.time(), + "properties": {}, + } + sink.accept(event) + # accept() should not mutate the original event dict + assert "context" not in event + # The enriched copy should be in the buffer + sink.flush_sync() + buffered = transport.save_to_disk.call_args[0][0][0] + assert buffered["context"]["version"] == "1.0.0" + assert buffered["context"]["model"] == "kimi-k2.5" + assert "platform" in buffered["context"] + assert "ui_mode" in buffered["context"] + assert "python_version" in buffered["context"] + assert "os_version" in buffered["context"] + assert isinstance(buffered["context"]["ci"], bool) + assert "locale" in buffered["context"] + assert "terminal" in buffered["context"] + + def test_flush_sync_saves_to_disk(self): + """Sync flush saves events to disk via transport.""" + transport = MagicMock(spec=AsyncTransport) + sink = EventSink(transport, version="1.0.0") + sink.accept({"event": "test", "timestamp": 1.0, "properties": {}}) + sink.flush_sync() + transport.save_to_disk.assert_called_once() + events = transport.save_to_disk.call_args[0][0] + assert len(events) == 1 + + def test_flush_sync_noop_when_empty(self): + """Sync flush is a no-op when buffer is empty.""" + transport = MagicMock(spec=AsyncTransport) + sink = EventSink(transport, version="1.0.0") + sink.flush_sync() + transport.save_to_disk.assert_not_called() + + def test_accept_includes_ui_mode(self): + """Events are enriched with ui_mode in context.""" + transport = MagicMock(spec=AsyncTransport) + sink = EventSink(transport, version="1.0.0", ui_mode="print") + sink.accept({"event": "test", "timestamp": 1.0, "properties": {}}) + sink.flush_sync() + buffered = transport.save_to_disk.call_args[0][0][0] + assert buffered["context"]["ui_mode"] == "print" + + def test_accept_default_ui_mode_is_shell(self): + """Default ui_mode is 'shell'.""" + transport = MagicMock(spec=AsyncTransport) + sink = EventSink(transport, version="1.0.0") + sink.accept({"event": "test", "timestamp": 1.0, "properties": {}}) + sink.flush_sync() + buffered = transport.save_to_disk.call_args[0][0][0] + assert buffered["context"]["ui_mode"] == "shell" + + +class TestAsyncTransport: + def test_save_to_disk(self, tmp_path: Path): + """Events are saved as JSONL files.""" + with patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path): + transport = AsyncTransport() + events = [ + {"event": "e1", "timestamp": 1.0}, + {"event": "e2", "timestamp": 2.0}, + ] + transport.save_to_disk(events) + + files = list(tmp_path.glob("failed_*.jsonl")) + assert len(files) == 1 + lines = files[0].read_text().strip().split("\n") + assert len(lines) == 2 + assert json.loads(lines[0])["event"] == "e1" + assert json.loads(lines[1])["event"] == "e2" + + def test_save_to_disk_empty(self, tmp_path: Path): + """No file is created for empty event list.""" + with patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path): + transport = AsyncTransport() + transport.save_to_disk([]) + + files = list(tmp_path.glob("failed_*.jsonl")) + assert len(files) == 0 + + @pytest.mark.asyncio + async def test_send_falls_back_on_error(self): + """HTTP errors trigger disk fallback after retries are exhausted.""" + transport = AsyncTransport(endpoint="https://mock.test/events", retry_backoffs_s=()) + + # Make _send_http raise a transient error + from kimi_cli.telemetry.transport import _TransientError + + with ( + patch.object( + transport, "_send_http", new_callable=AsyncMock, side_effect=_TransientError("500") + ), + patch.object(transport, "save_to_disk") as mock_save, + ): + await transport.send([{"event": "test", "timestamp": 1.0}]) + mock_save.assert_called_once() + + def test_default_retry_schedule(self): + """Lock down the production backoff schedule so it isn't silently changed.""" + from kimi_cli.telemetry.transport import RETRY_BACKOFFS_S + + assert RETRY_BACKOFFS_S == (1.0, 4.0, 16.0) + + def test_server_prefix_constant(self): + """Lock down the production server prefix so it isn't silently changed.""" + from kimi_cli.telemetry.transport import SERVER_EVENT_PREFIX + + assert SERVER_EVENT_PREFIX == "kfc_" + + def test_apply_server_prefix_one_does_not_mutate_input(self): + """_apply_server_prefix_one builds a new dict with prefix, input untouched.""" + from kimi_cli.telemetry.transport import _apply_server_prefix_one + + event = {"event": "started", "timestamp": 1.0, "properties": {"a": 1}} + snapshot = dict(event) + out = _apply_server_prefix_one(event) + assert event == snapshot + assert out["event"] == "kfc_started" + # Shallow-shared sub-dicts are fine (not mutated downstream). + assert out["properties"] is event["properties"] + + def test_apply_server_prefix_one_idempotent(self): + """Events already carrying the prefix pass through unchanged (no copy).""" + from kimi_cli.telemetry.transport import _apply_server_prefix_one + + event = {"event": "kfc_already", "timestamp": 2.0, "properties": {}} + out = _apply_server_prefix_one(event) + assert out is event + assert out["event"] == "kfc_already" + + def test_apply_server_prefix_one_passthrough_edge_cases(self): + """Missing / empty / non-string event values pass through unchanged.""" + from kimi_cli.telemetry.transport import _apply_server_prefix_one + + missing = {"timestamp": 1.0} + empty = {"event": "", "timestamp": 2.0} + non_str = {"event": 42, "timestamp": 3.0} + + assert _apply_server_prefix_one(missing) is missing + assert _apply_server_prefix_one(empty) is empty + assert _apply_server_prefix_one(non_str) is non_str + + @pytest.mark.asyncio + async def test_send_adds_server_prefix_to_event_names(self): + """Outbound payload carries kfc_ prefix and is flattened; in-memory events stay bare + nested.""" + transport = AsyncTransport( + device_id="dev-xyz", + endpoint="https://mock.test/events", + retry_backoffs_s=(), + ) + + captured: dict[str, Any] = {} + + async def capture(payload: dict[str, Any]) -> None: + captured.update(payload) + + events_in = [ + {"event": "started", "timestamp": 1.0, "properties": {}}, + {"event": "tool_call", "timestamp": 2.0, "properties": {"success": True}}, + ] + with patch.object(transport, "_send_http", new=capture): + await transport.send(events_in) + + # Payload-level user_id + assert captured["user_id"] == "kfc_device_id_dev-xyz" + + outbound = captured["events"] + assert [e["event"] for e in outbound] == ["kfc_started", "kfc_tool_call"] + # Original events list untouched (save_to_disk would keep bare + nested shape) + assert [e["event"] for e in events_in] == ["started", "tool_call"] + assert events_in[1]["properties"] == {"success": True} + # Properties flattened; timestamp preserved + assert outbound[1]["property_success"] is True + assert outbound[1]["timestamp"] == 2.0 + # Outbound events should not carry the nested sub-dicts anymore + assert "properties" not in outbound[1] + assert "context" not in outbound[1] + + @pytest.mark.asyncio + async def test_send_is_idempotent_on_already_prefixed_events(self): + """Events already carrying the prefix are not double-prefixed.""" + transport = AsyncTransport(endpoint="https://mock.test/events", retry_backoffs_s=()) + + captured: dict[str, Any] = {} + + async def capture(payload: dict[str, Any]) -> None: + captured.update(payload) + + events_in = [{"event": "kfc_legacy", "timestamp": 1.0, "properties": {}}] + with patch.object(transport, "_send_http", new=capture): + await transport.send(events_in) + + assert captured["events"][0]["event"] == "kfc_legacy" # not "kfc_kfc_legacy" + + @pytest.mark.asyncio + async def test_disk_fallback_keeps_bare_names(self): + """Transient failure saves events to disk with the bare (unprefixed) name.""" + transport = AsyncTransport(endpoint="https://mock.test/events", retry_backoffs_s=()) + from kimi_cli.telemetry.transport import _TransientError + + events_in = [{"event": "exit", "timestamp": 1.0, "properties": {}}] + with ( + patch.object( + transport, "_send_http", new_callable=AsyncMock, side_effect=_TransientError("503") + ), + patch.object(transport, "save_to_disk") as mock_save, + ): + await transport.send(events_in) + saved = mock_save.call_args[0][0] + assert saved[0]["event"] == "exit" # bare name for disk retry + + @pytest.mark.asyncio + async def test_send_retries_transient_then_falls_back(self): + """send() retries transient errors (without sleeping) then falls back to disk.""" + transport = AsyncTransport( + endpoint="https://mock.test/events", + # 3 attempts total: initial + 2 retries (zero sleep for test speed) + retry_backoffs_s=(0.0, 0.0), + ) + from kimi_cli.telemetry.transport import _TransientError + + send_mock = AsyncMock(side_effect=_TransientError("503")) + with ( + patch.object(transport, "_send_http", send_mock), + patch.object(transport, "save_to_disk") as mock_save, + ): + await transport.send([{"event": "test", "timestamp": 1.0}]) + assert send_mock.await_count == 3 + mock_save.assert_called_once() + + @pytest.mark.asyncio + async def test_send_retry_succeeds_no_fallback(self): + """send() retries transient errors and succeeds without hitting disk.""" + transport = AsyncTransport( + endpoint="https://mock.test/events", + retry_backoffs_s=(0.0, 0.0), + ) + from kimi_cli.telemetry.transport import _TransientError + + # Fail once, succeed on second attempt + send_mock = AsyncMock(side_effect=[_TransientError("503"), None]) + with ( + patch.object(transport, "_send_http", send_mock), + patch.object(transport, "save_to_disk") as mock_save, + ): + await transport.send([{"event": "test", "timestamp": 1.0}]) + assert send_mock.await_count == 2 + mock_save.assert_not_called() + + @pytest.mark.asyncio + async def test_send_cancelled_during_backoff_saves_to_disk(self): + """If the send task is cancelled mid-backoff, events must be persisted.""" + transport = AsyncTransport( + endpoint="https://mock.test/events", + # Non-zero backoff so there's a real sleep point to cancel + retry_backoffs_s=(60.0,), + ) + from kimi_cli.telemetry.transport import _TransientError + + # _send_http always raises _TransientError; first attempt fails, + # then asyncio.sleep(60) gives us a window to cancel the task. + send_mock = AsyncMock(side_effect=_TransientError("503")) + + with ( + patch.object(transport, "_send_http", send_mock), + patch.object(transport, "save_to_disk") as mock_save, + ): + task = asyncio.create_task(transport.send([{"event": "test", "timestamp": 1.0}])) + # Let the first attempt fail and enter the backoff sleep + await asyncio.sleep(0.05) + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + # Events must have been persisted to disk before the cancel propagated + mock_save.assert_called_once() + saved_events = mock_save.call_args[0][0] + assert len(saved_events) == 1 + assert saved_events[0]["event"] == "test" + + @pytest.mark.asyncio + async def test_send_success_no_fallback(self): + """Successful send does not fall back to disk.""" + transport = AsyncTransport(endpoint="https://mock.test/events") + + with ( + patch.object(transport, "_send_http", new_callable=AsyncMock), + patch.object(transport, "save_to_disk") as mock_save, + ): + await transport.send([{"event": "test", "timestamp": 1.0}]) + mock_save.assert_not_called() + + @pytest.mark.asyncio + async def test_retry_disk_events_success(self, tmp_path: Path): + """Disk events are retried through the outbound pipeline, file deleted.""" + # Mix of bare names (new format) and already-prefixed (legacy format). + failed_file = tmp_path / "failed_abc123.jsonl" + failed_file.write_text( + '{"event":"old","timestamp":1.0}\n{"event":"kfc_legacy","timestamp":2.0}\n' + ) + + transport = AsyncTransport(device_id="dev-retry", endpoint="https://mock.test/events") + + with ( + patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path), + patch.object(transport, "_send_http", new_callable=AsyncMock) as mock_send, + ): + await transport.retry_disk_events() + mock_send.assert_called_once() + payload = mock_send.call_args[0][0] + # Same outbound pipeline: user_id at top + prefixed, flat events + assert payload["user_id"] == "kfc_device_id_dev-retry" + assert [e["event"] for e in payload["events"]] == ["kfc_old", "kfc_legacy"] + # File should be deleted after successful retry + assert not failed_file.exists() + + @pytest.mark.asyncio + async def test_retry_disk_events_expired_file(self, tmp_path: Path): + """Expired disk event files are deleted without retry.""" + import os + + failed_file = tmp_path / "failed_expired.jsonl" + failed_file.write_text('{"event":"old","timestamp":1.0}\n') + # Set mtime to 8 days ago + old_time = time.time() - 8 * 24 * 3600 + os.utime(failed_file, (old_time, old_time)) + + transport = AsyncTransport(endpoint="https://mock.test/events") + + with ( + patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path), + patch.object(transport, "_send_http", new_callable=AsyncMock) as mock_send, + ): + await transport.retry_disk_events() + mock_send.assert_not_called() + assert not failed_file.exists() + + @pytest.mark.asyncio + async def test_retry_disk_events_keeps_file_on_unexpected_error(self, tmp_path: Path): + """Unexpected errors during retry should keep the file for next startup.""" + failed_file = tmp_path / "failed_keep.jsonl" + failed_file.write_text('{"event":"ok","timestamp":1.0}\n') + + transport = AsyncTransport(endpoint="https://mock.test/events") + + with ( + patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path), + patch.object( + transport, + "_send_http", + new_callable=AsyncMock, + side_effect=RuntimeError("SSL error"), + ), + ): + await transport.retry_disk_events() + # File should be preserved for next retry + assert failed_file.exists() + + @pytest.mark.asyncio + async def test_retry_disk_events_deletes_corrupted_file(self, tmp_path: Path): + """Corrupted (non-JSON) files are deleted.""" + failed_file = tmp_path / "failed_corrupt.jsonl" + failed_file.write_text("this is not json\n") + + transport = AsyncTransport(endpoint="https://mock.test/events") + + with ( + patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path), + patch.object(transport, "_send_http", new_callable=AsyncMock) as mock_send, + ): + await transport.retry_disk_events() + mock_send.assert_not_called() + assert not failed_file.exists() + + @pytest.mark.asyncio + async def test_send_401_no_token_drops(self, tmp_path: Path): + """401 when no token is present is treated as a non-recoverable client + error: drop events, do not spool to disk. Retrying would just replay + the same token-less request and hit 401 again until the 7-day expiry. + """ + transport = AsyncTransport( + get_access_token=lambda: None, # no token + endpoint="https://mock.test/events", + retry_backoffs_s=(), + ) + + mock_resp = MagicMock() + mock_resp.status = 401 + mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) + mock_resp.__aexit__ = AsyncMock(return_value=False) + + mock_session = MagicMock() + mock_session.post.return_value = mock_resp + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=False) + + with ( + patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path), + patch("kimi_cli.utils.aiohttp.new_client_session", return_value=mock_session), + ): + await transport.send([{"event": "test", "timestamp": 1.0}]) + + saved_files = list(tmp_path.glob("failed_*.jsonl")) + assert len(saved_files) == 0 + + @pytest.mark.asyncio + async def test_anonymous_retry_4xx_drops_events(self): + """401 with token → anonymous retry returns 4xx → events dropped, no disk fallback.""" + transport = AsyncTransport( + get_access_token=lambda: "valid-token", + endpoint="https://mock.test/events", + ) + + # First response: 401 (triggers anonymous retry) + resp_401 = MagicMock() + resp_401.status = 401 + resp_401.__aenter__ = AsyncMock(return_value=resp_401) + resp_401.__aexit__ = AsyncMock(return_value=False) + + # Second response: 403 (client error on anonymous retry) + resp_403 = MagicMock() + resp_403.status = 403 + resp_403.__aenter__ = AsyncMock(return_value=resp_403) + resp_403.__aexit__ = AsyncMock(return_value=False) + + mock_session = MagicMock() + mock_session.post = MagicMock(side_effect=[resp_401, resp_403]) + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=False) + + with ( + patch("kimi_cli.utils.aiohttp.new_client_session", return_value=mock_session), + patch.object(transport, "save_to_disk") as mock_save, + ): + await transport.send([{"event": "test", "timestamp": 1.0}]) + mock_save.assert_not_called() + + @pytest.mark.asyncio + async def test_401_with_token_anonymous_retry_success(self): + """401 with token → anonymous retry returns 200 → success, no disk fallback.""" + transport = AsyncTransport( + get_access_token=lambda: "valid-token", + endpoint="https://mock.test/events", + ) + + resp_401 = MagicMock() + resp_401.status = 401 + resp_401.__aenter__ = AsyncMock(return_value=resp_401) + resp_401.__aexit__ = AsyncMock(return_value=False) + + resp_200 = MagicMock() + resp_200.status = 200 + resp_200.__aenter__ = AsyncMock(return_value=resp_200) + resp_200.__aexit__ = AsyncMock(return_value=False) + + mock_session = MagicMock() + mock_session.post = MagicMock(side_effect=[resp_401, resp_200]) + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=False) + + with ( + patch("kimi_cli.utils.aiohttp.new_client_session", return_value=mock_session), + patch.object(transport, "save_to_disk") as mock_save, + ): + await transport.send([{"event": "test", "timestamp": 1.0}]) + mock_save.assert_not_called() + + @pytest.mark.asyncio + async def test_401_with_token_anonymous_retry_5xx(self, tmp_path: Path): + """401 with token → anonymous retry returns 500 → disk fallback.""" + transport = AsyncTransport( + get_access_token=lambda: "valid-token", + endpoint="https://mock.test/events", + retry_backoffs_s=(), + ) + + resp_401 = MagicMock() + resp_401.status = 401 + resp_401.__aenter__ = AsyncMock(return_value=resp_401) + resp_401.__aexit__ = AsyncMock(return_value=False) + + resp_500 = MagicMock() + resp_500.status = 500 + resp_500.__aenter__ = AsyncMock(return_value=resp_500) + resp_500.__aexit__ = AsyncMock(return_value=False) + + mock_session = MagicMock() + mock_session.post = MagicMock(side_effect=[resp_401, resp_500]) + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=False) + + with ( + patch("kimi_cli.utils.aiohttp.new_client_session", return_value=mock_session), + patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path), + ): + await transport.send([{"event": "test", "timestamp": 1.0}]) + + saved_files = list(tmp_path.glob("failed_*.jsonl")) + assert len(saved_files) == 1 + + @pytest.mark.asyncio + @pytest.mark.parametrize("status_code", [400, 403, 404, 422]) + async def test_send_4xx_drops_without_disk_fallback(self, tmp_path: Path, status_code: int): + """Non-429 4xx client errors are dropped, never spooled to disk. + + Retrying an un-acked schema error / auth error would just waste + disk and network on every subsequent startup. + """ + transport = AsyncTransport( + get_access_token=lambda: "valid-token", + endpoint="https://mock.test/events", + retry_backoffs_s=(), + ) + mock_resp = MagicMock() + mock_resp.status = status_code + mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) + mock_resp.__aexit__ = AsyncMock(return_value=False) + mock_session = MagicMock() + mock_session.post.return_value = mock_resp + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=False) + + with ( + patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path), + patch("kimi_cli.utils.aiohttp.new_client_session", return_value=mock_session), + ): + await transport.send([{"event": "test", "timestamp": 1.0}]) + + assert list(tmp_path.glob("failed_*.jsonl")) == [] + + @pytest.mark.asyncio + async def test_send_429_treated_as_transient(self, tmp_path: Path): + """429 Too Many Requests is transient (server-imposed backoff), so + events should be spooled to disk after in-process retries exhaust. + """ + transport = AsyncTransport( + get_access_token=lambda: None, + endpoint="https://mock.test/events", + retry_backoffs_s=(), + ) + mock_resp = MagicMock() + mock_resp.status = 429 + mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) + mock_resp.__aexit__ = AsyncMock(return_value=False) + mock_session = MagicMock() + mock_session.post.return_value = mock_resp + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=False) + + with ( + patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path), + patch("kimi_cli.utils.aiohttp.new_client_session", return_value=mock_session), + ): + await transport.send([{"event": "test", "timestamp": 1.0}]) + + assert len(list(tmp_path.glob("failed_*.jsonl"))) == 1 + + @pytest.mark.asyncio + async def test_send_unexpected_exception_falls_back_to_disk(self, tmp_path: Path): + """Unexpected exception during send triggers disk fallback.""" + transport = AsyncTransport(endpoint="https://mock.test/events") + + with ( + patch.object( + transport, + "_send_http", + new_callable=AsyncMock, + side_effect=RuntimeError("unexpected"), + ), + patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path), + ): + await transport.send([{"event": "test", "timestamp": 1.0}]) + + saved_files = list(tmp_path.glob("failed_*.jsonl")) + assert len(saved_files) == 1 + + +class TestPayloadAssembly: + """Unit tests for the outbound payload pipeline: + _build_user_id / _flatten_event / _build_payload.""" + + def test_user_id_prefix_constant(self): + """Lock down the production user_id prefix so it isn't silently changed.""" + from kimi_cli.telemetry.transport import USER_ID_PREFIX + + assert USER_ID_PREFIX == "kfc_device_id_" + + def test_build_user_id(self): + from kimi_cli.telemetry.transport import _build_user_id + + assert _build_user_id("abc123") == "kfc_device_id_abc123" + + def test_build_user_id_empty_device_id(self): + """Empty device_id still returns the prefix (no crash).""" + from kimi_cli.telemetry.transport import _build_user_id + + assert _build_user_id("") == "kfc_device_id_" + + def test_flatten_event_properties_prefix(self): + from kimi_cli.telemetry.transport import _flatten_event + + out = _flatten_event( + { + "event": "tool_call", + "timestamp": 1.0, + "properties": {"tool_name": "bash", "approved": True}, + } + ) + assert out["property_tool_name"] == "bash" + assert out["property_approved"] is True + assert "properties" not in out + + def test_flatten_event_context_prefix(self): + from kimi_cli.telemetry.transport import _flatten_event + + out = _flatten_event( + { + "event": "tool_call", + "timestamp": 1.0, + "context": {"version": "1.0", "platform": "darwin", "ci": False}, + } + ) + assert out["context_version"] == "1.0" + assert out["context_platform"] == "darwin" + assert out["context_ci"] is False + assert "context" not in out + + def test_flatten_event_preserves_top_level(self): + """event_id / event / timestamp / device_id / session_id pass through.""" + from kimi_cli.telemetry.transport import _flatten_event + + event = { + "event_id": "eid", + "event": "started", + "timestamp": 1.5, + "device_id": "d", + "session_id": "s", + "properties": {}, + "context": {}, + } + out = _flatten_event(event) + assert out["event_id"] == "eid" + assert out["event"] == "started" + assert out["timestamp"] == 1.5 + assert out["device_id"] == "d" + assert out["session_id"] == "s" + + def test_flatten_event_does_not_mutate_input(self): + from kimi_cli.telemetry.transport import _flatten_event + + event = { + "event": "tool_call", + "timestamp": 1.0, + "properties": {"a": 1}, + "context": {"v": "x"}, + } + snapshot = { + "event": "tool_call", + "timestamp": 1.0, + "properties": {"a": 1}, + "context": {"v": "x"}, + } + _flatten_event(event) + assert event == snapshot + + def test_flatten_event_allows_none_values(self): + from kimi_cli.telemetry.transport import _flatten_event + + out = _flatten_event({"event": "x", "timestamp": 1.0, "properties": {"reason": None}}) + assert out["property_reason"] is None + + def test_flatten_event_empty_properties_and_context(self): + """Empty or missing sub-dicts produce no property_/context_ keys.""" + from kimi_cli.telemetry.transport import _flatten_event + + out = _flatten_event({"event": "x", "timestamp": 1.0, "properties": {}, "context": {}}) + assert all(not k.startswith("property_") for k in out) + assert all(not k.startswith("context_") for k in out) + + # Missing entirely + out2 = _flatten_event({"event": "x", "timestamp": 1.0}) + assert all(not k.startswith("property_") for k in out2) + assert all(not k.startswith("context_") for k in out2) + + def test_flatten_event_raises_on_nested_dict_in_properties(self): + from kimi_cli.telemetry.transport import _flatten_event + + with pytest.raises(TypeError, match="property.nested"): + _flatten_event( + { + "event": "x", + "timestamp": 1.0, + "properties": {"nested": {"inner": 1}}, + } + ) + + def test_flatten_event_raises_on_list_in_properties(self): + from kimi_cli.telemetry.transport import _flatten_event + + with pytest.raises(TypeError, match="property.items"): + _flatten_event({"event": "x", "timestamp": 1.0, "properties": {"items": [1, 2, 3]}}) + + def test_flatten_event_raises_on_nested_dict_in_context(self): + from kimi_cli.telemetry.transport import _flatten_event + + with pytest.raises(TypeError, match="context.meta"): + _flatten_event( + { + "event": "x", + "timestamp": 1.0, + "context": {"meta": {"nested": True}}, + } + ) + + def test_build_payload_user_id_at_top(self): + from kimi_cli.telemetry.transport import _build_payload + + payload = _build_payload( + [{"event": "started", "timestamp": 1.0, "properties": {}}], + device_id="dev-1", + ) + assert payload["user_id"] == "kfc_device_id_dev-1" + assert "events" in payload + + def test_build_payload_events_are_flat_and_prefixed(self): + from kimi_cli.telemetry.transport import _build_payload + + payload = _build_payload( + [ + { + "event_id": "e1", + "event": "tool_call", + "timestamp": 1.0, + "device_id": "dev-1", + "session_id": "sess-1", + "properties": {"tool_name": "bash", "approved": True}, + "context": {"version": "1.0", "platform": "darwin"}, + } + ], + device_id="dev-1", + ) + event = payload["events"][0] + assert event["event"] == "kfc_tool_call" + assert event["event_id"] == "e1" + assert event["device_id"] == "dev-1" + assert event["session_id"] == "sess-1" + assert event["property_tool_name"] == "bash" + assert event["property_approved"] is True + assert event["context_version"] == "1.0" + assert event["context_platform"] == "darwin" + assert "properties" not in event + assert "context" not in event + + def test_build_payload_does_not_mutate_input(self): + from kimi_cli.telemetry.transport import _build_payload + + events = [{"event": "started", "timestamp": 1.0, "properties": {"x": 1}}] + _build_payload(events, device_id="dev-1") + assert events[0]["event"] == "started" + assert events[0]["properties"] == {"x": 1} + + @pytest.mark.asyncio + async def test_send_drops_events_on_schema_violation(self, tmp_path: Path): + """A non-primitive value in properties must drop the batch (not loop on disk). + + Retrying would hit the same TypeError on every reload, so falling + back to disk would create a permanently stuck file. + """ + transport = AsyncTransport( + device_id="dev-bad", + endpoint="https://mock.test/events", + retry_backoffs_s=(), + ) + # properties value is a dict — violates _assert_primitive. + events_in = [ + {"event": "bad", "timestamp": 1.0, "properties": {"nested": {"x": 1}}}, + ] + + sent = AsyncMock() + with ( + patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path), + patch.object(transport, "_send_http", new=sent), + ): + # Must not raise — schema error is caught and events dropped. + await transport.send(events_in) + + # No HTTP attempt + sent.assert_not_awaited() + # No disk fallback (would loop forever) + assert list(tmp_path.glob("failed_*.jsonl")) == [] + + @pytest.mark.asyncio + async def test_send_persists_nested_shape_on_failure(self, tmp_path: Path): + """save_to_disk must write the original nested events, not the flat payload.""" + from kimi_cli.telemetry.transport import _TransientError + + transport = AsyncTransport( + device_id="dev-disk", + endpoint="https://mock.test/events", + retry_backoffs_s=(), + ) + events_in = [ + { + "event": "tool_call", + "timestamp": 1.0, + "properties": {"tool_name": "bash"}, + "context": {"version": "1.0"}, + } + ] + + with ( + patch("kimi_cli.telemetry.transport._telemetry_dir", return_value=tmp_path), + patch.object( + transport, + "_send_http", + new_callable=AsyncMock, + side_effect=_TransientError("503"), + ), + ): + await transport.send(events_in) + + saved_files = list(tmp_path.glob("failed_*.jsonl")) + assert len(saved_files) == 1 + persisted = json.loads(saved_files[0].read_text().strip()) + # Bare name + nested shape preserved — no prefix, no flattening. + assert persisted["event"] == "tool_call" + assert persisted["properties"] == {"tool_name": "bash"} + assert persisted["context"] == {"version": "1.0"} + assert "property_tool_name" not in persisted + assert "user_id" not in persisted diff --git a/tests/tools/test_read_file.py b/tests/tools/test_read_file.py index 26b0772f0..7476b9d6f 100644 --- a/tests/tools/test_read_file.py +++ b/tests/tools/test_read_file.py @@ -382,7 +382,7 @@ async def test_read_with_tilde_path_expansion(read_file_tool: ReadFile, temp_wor # Create a test file in temp_work_dir and use ~ to reference it # We simulate by creating a file and checking that ~ expands correctly home = Path.home() - test_file = home / ".kimi_test_expanduser_temp" + test_file = home / ".test_expanduser_temp" test_content = "Test content for tilde expansion" try: @@ -390,7 +390,7 @@ async def test_read_with_tilde_path_expansion(read_file_tool: ReadFile, temp_wor test_file.write_text(test_content) # Read using ~ path - result = await read_file_tool(Params(path="~/.kimi_test_expanduser_temp")) + result = await read_file_tool(Params(path="~/.test_expanduser_temp")) assert not result.is_error assert "Test content for tilde expansion" in result.output