diff --git a/CHANGELOG.md b/CHANGELOG.md index 4324cf0eb..14b2e5d32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Only write entries that are worth mentioning to users. ## Unreleased +- Print: Wait for background tasks before exiting — in one-shot `--print` mode, the process now waits for running background agents to finish and lets the model process their results, instead of exiting and killing them - Core: Fix agent loop silently stopping when model response contains only thinking content — detect think-only responses (reasoning content with no text or tool calls) as an incomplete response error and retry automatically - Core: Fix crash on streaming mid-flight network disconnection — when the OpenAI SDK raises a base `APIError` (instead of `APIConnectionError`) during long-running streams, the error is now correctly classified as retryable, enabling automatic retry and connection recovery instead of an unrecoverable crash - Shell: Exclude empty current session from `/sessions` picker — completely empty sessions (no conversation history and no custom title) are no longer shown in the session list; sessions with a custom title are still displayed diff --git a/docs/en/release-notes/changelog.md b/docs/en/release-notes/changelog.md index 7bf369374..2e1a5d218 100644 --- a/docs/en/release-notes/changelog.md +++ b/docs/en/release-notes/changelog.md @@ -4,6 +4,7 @@ This page documents the changes in each Kimi Code CLI release. ## Unreleased +- Print: Wait for background tasks before exiting — in one-shot `--print` mode, the process now waits for running background agents to finish and lets the model process their results, instead of exiting and killing them - Core: Fix agent loop silently stopping when model response contains only thinking content — detect think-only responses (reasoning content with no text or tool calls) as an incomplete response error and retry automatically - Core: Fix crash on streaming mid-flight network disconnection — when the OpenAI SDK raises a base `APIError` (instead of `APIConnectionError`) during long-running streams, the error is now correctly classified as retryable, enabling automatic retry and connection recovery instead of an unrecoverable crash - Shell: Exclude empty current session from `/sessions` picker — completely empty sessions (no conversation history and no custom title) are no longer shown in the session list; sessions with a custom title are still displayed diff --git a/docs/zh/release-notes/changelog.md b/docs/zh/release-notes/changelog.md index 84286bdb9..303347b01 100644 --- a/docs/zh/release-notes/changelog.md +++ b/docs/zh/release-notes/changelog.md @@ -4,6 +4,7 @@ ## 未发布 +- Print:退出前等待后台任务完成——在单次 `--print` 模式下,进程现在会等待仍在运行的后台 Agent 完成并让模型处理它们的结果,而不是直接退出并杀死它们 - Core:修复模型响应仅包含思考内容时 agent loop 静默停止的问题——将仅含思考内容(无文本或工具调用)的响应检测为不完整响应错误并自动重试 - Core:修复长时间 streaming 过程中网络断连导致崩溃的问题——当 OpenAI SDK 在流式传输中途抛出基类 `APIError`(而非 `APIConnectionError`)时,现在能正确识别为可重试错误,自动触发重试和连接恢复,而不再直接崩溃退出 - Shell:从 `/sessions` 选择器中排除空的当前会话——完全为空的会话(既无对话记录也无自定义标题)不再显示在会话列表中;有自定义标题的会话仍然正常显示 diff --git a/src/kimi_cli/background/manager.py b/src/kimi_cli/background/manager.py index fc6399f15..eead26598 100644 --- a/src/kimi_cli/background/manager.py +++ b/src/kimi_cli/background/manager.py @@ -93,6 +93,14 @@ def _active_task_count(self) -> int: 1 for view in self._store.list_views() if not is_terminal_status(view.runtime.status) ) + def has_active_tasks(self) -> bool: + """Return True if any background tasks are in a non-terminal status. + + This includes ``running``, ``awaiting_approval``, and any other + non-terminal state — not just actively executing tasks. + """ + return self._active_task_count() > 0 + def _worker_command(self, task_dir: Path) -> list[str]: if getattr(sys, "frozen", False): return [ diff --git a/src/kimi_cli/soul/kimisoul.py b/src/kimi_cli/soul/kimisoul.py index 9365c11b9..81b954ae9 100644 --- a/src/kimi_cli/soul/kimisoul.py +++ b/src/kimi_cli/soul/kimisoul.py @@ -758,6 +758,7 @@ async def _agent_loop(self) -> TurnOutcome: has_steers = await self._consume_pending_steers() if has_steers: continue # steers injected, force another LLM step + final_message = ( step_outcome.assistant_message if step_outcome.stop_reason == "no_tool_calls" diff --git a/src/kimi_cli/ui/print/__init__.py b/src/kimi_cli/ui/print/__init__.py index 6242c33af..8e483c4a0 100644 --- a/src/kimi_cli/ui/print/__init__.py +++ b/src/kimi_cli/ui/print/__init__.py @@ -96,6 +96,74 @@ def _handler(): runtime.session.wire_file if runtime else None, runtime, ) + + # In one-shot text mode the process exits after this + # function returns, which would kill still-running + # background agents. Poll until they finish, calling + # reconcile() each iteration (the notification pump + # inside run_soul is no longer running, so we must + # drive reconcile ourselves to recover lost workers + # and publish terminal notifications). Only re-enter + # the soul when there are pending LLM notifications. + # + # stream-json mode is multi-turn: background tasks + # from one command must not block the next command. + # + # keep_alive_on_exit opts into "fire and forget" + # semantics: background tasks are meant to outlive + # the CLI process, so Print must not block waiting + # for them. + if ( + runtime + and runtime.role == "root" + and self.input_format == "text" + and not runtime.config.background.keep_alive_on_exit + ): + manager = runtime.background_tasks + notifications = runtime.notifications + while not cancel_event.is_set(): + # Drive reconcile() ourselves: the notification + # pump inside run_soul is no longer running, so + # we must recover lost workers and publish + # terminal notifications here. + manager.reconcile() + if notifications.has_pending_for_sink("llm"): + # Re-enter soul so the LLM can process the + # completion notification. Do this even if + # other tasks are still active — progress on + # completed tasks should not wait on siblings. + bg_prompt = ( + "" + "Background tasks have completed." + " Process their results." + "" + ) + await run_soul( + self.soul, + bg_prompt, + partial(visualize, self.output_format, self.final_only), + cancel_event, + runtime.session.wire_file, + runtime, + ) + continue + if not manager.has_active_tasks(): + # Re-check once after noticing no active + # tasks: a worker may have finished between + # the reconcile above and this snapshot, + # leaving a terminal state on disk that we + # haven't published yet. Without this + # second reconcile+pending check, that + # final completion notification would be + # lost when the process exits. + manager.reconcile() + if notifications.has_pending_for_sink("llm"): + continue + break + # Still waiting for tasks to finish. + await asyncio.sleep(1.0) + if cancel_event.is_set(): + raise RunCancelled else: logger.info("Empty command, skipping") diff --git a/tests/core/test_kimisoul_background_wait.py b/tests/core/test_kimisoul_background_wait.py new file mode 100644 index 000000000..2daacbb92 --- /dev/null +++ b/tests/core/test_kimisoul_background_wait.py @@ -0,0 +1,406 @@ +"""Tests for Print mode background task waiting behavior. + +When background agents are still running after ``run_soul()`` completes a turn, +**text** (one-shot) print mode should: + +- drive ``reconcile()`` each iteration (the notification pump inside ``run_soul`` + is no longer running, so we must recover lost workers and publish terminal + notifications ourselves); +- re-enter the soul whenever ``has_pending_for_sink("llm")`` is True — even if + other tasks are still active — so per-task progress is not blocked by + long-running siblings; +- keep polling until both ``has_active_tasks()`` and ``has_pending_for_sink`` + are False; +- skip the wait loop entirely in ``stream-json`` mode (multi-turn) so + background tasks from one command do not block the next command; +- raise ``RunCancelled`` when ``cancel_event`` is set. +""" + +from __future__ import annotations + +import asyncio +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from kimi_cli.cli import ExitCode, InputFormat +from kimi_cli.soul.kimisoul import KimiSoul +from kimi_cli.ui.print import Print + + +class _FakeState: + """Mutable state that drives has_active_tasks / has_pending_for_sink.""" + + def __init__(self, *, active: bool = False, pending: bool = False): + self.active = active + self.pending = pending + self.reconcile_count = 0 + + +def _wire_manager(state: _FakeState) -> tuple[MagicMock, MagicMock]: + manager = MagicMock() + manager.has_active_tasks = MagicMock(side_effect=lambda: state.active) + + def _reconcile(): + state.reconcile_count += 1 + + manager.reconcile = MagicMock(side_effect=_reconcile) + + notifications = MagicMock() + notifications.has_pending_for_sink = MagicMock(side_effect=lambda sink: state.pending) + return manager, notifications + + +def _make_print_with_runtime( + tmp_path: Path, + manager: MagicMock, + notifications: MagicMock, + *, + input_format: InputFormat = "text", + keep_alive_on_exit: bool = False, +) -> tuple[Print, AsyncMock]: + soul = AsyncMock(spec=KimiSoul) + soul.runtime = MagicMock() + soul.runtime.role = "root" + soul.runtime.background_tasks = manager + soul.runtime.notifications = notifications + soul.runtime.config.background.keep_alive_on_exit = keep_alive_on_exit + soul.runtime.session.wire_file = tmp_path / "wire.jsonl" + + p = Print( + soul=soul, + input_format=input_format, + output_format="text", + context_file=tmp_path / "context.json", + ) + return p, soul + + +# --------------------------------------------------------------------------- +# Core: wait → pending → re-enter soul +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_print_reruns_soul_on_pending_notification(tmp_path: Path) -> None: + """After run_soul, if tasks complete and create pending LLM notifications, + Print should re-enter run_soul with a system-reminder prompt.""" + state = _FakeState(active=True, pending=False) + manager, notifications = _wire_manager(state) + + p, _ = _make_print_with_runtime(tmp_path, manager, notifications) + run_soul_calls: list[str] = [] + + async def fake_run_soul(soul_arg, user_input, *args, **kwargs): + run_soul_calls.append(user_input) + if len(run_soul_calls) == 1: + # Simulate a worker finishing + reconcile publishing a notification + state.active = False + state.pending = True + else: + # Re-entry drains the pending notification (like real deliver_pending) + state.pending = False + + with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul): + code = await p.run(command="do work") + + assert code == ExitCode.SUCCESS + assert len(run_soul_calls) == 2 + assert run_soul_calls[0] == "do work" + assert "" in run_soul_calls[1] + assert state.reconcile_count >= 1 + + +# --------------------------------------------------------------------------- +# reconcile() is called on every poll iteration +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_print_calls_reconcile_each_poll_iteration(tmp_path: Path) -> None: + """reconcile() must be called on every poll iteration.""" + state = _FakeState(active=True, pending=False) + manager, notifications = _wire_manager(state) + + p, _ = _make_print_with_runtime(tmp_path, manager, notifications) + call_count = 0 + + async def fake_run_soul(soul_arg, user_input, *args, **kwargs): + nonlocal call_count + call_count += 1 + + # Patch sleep to also decrement a poll counter so the test finishes fast + poll_counter = {"n": 0} + real_sleep = asyncio.sleep + + async def fake_sleep(duration): + poll_counter["n"] += 1 + if poll_counter["n"] >= 3: + state.active = False + await real_sleep(0) + + with ( + patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul), + patch("kimi_cli.ui.print.asyncio.sleep", side_effect=fake_sleep), + ): + await p.run(command="test") + + # Before each sleep there is a reconcile call (and one final reconcile + # after the last sleep). Expect at least 3 reconciles. + assert state.reconcile_count >= 3 + + +# --------------------------------------------------------------------------- +# No re-entry when no notifications are pending +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_print_skips_reentry_when_no_pending_notifications(tmp_path: Path) -> None: + """If tasks complete but there are no pending LLM notifications, the soul + should NOT be re-entered.""" + state = _FakeState(active=True, pending=False) + manager, notifications = _wire_manager(state) + + p, _ = _make_print_with_runtime(tmp_path, manager, notifications) + run_soul_calls: list[str] = [] + + async def fake_run_soul(soul_arg, user_input, *args, **kwargs): + run_soul_calls.append(user_input) + + real_sleep = asyncio.sleep + + async def fake_sleep(duration): + state.active = False + await real_sleep(0) + + with ( + patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul), + patch("kimi_cli.ui.print.asyncio.sleep", side_effect=fake_sleep), + ): + code = await p.run(command="hello") + + assert code == ExitCode.SUCCESS + assert len(run_soul_calls) == 1 + + +# --------------------------------------------------------------------------- +# Pre-existing pending notifications: tasks already done before first check +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_print_reruns_soul_when_tasks_done_but_notifications_pending( + tmp_path: Path, +) -> None: + """If all tasks finished before the first check and reconcile publishes + notifications, the soul should still be re-entered to drain them.""" + state = _FakeState(active=False, pending=False) + manager, notifications = _wire_manager(state) + + p, _ = _make_print_with_runtime(tmp_path, manager, notifications) + run_soul_calls: list[str] = [] + reconcile_original = manager.reconcile.side_effect + + def reconcile_then_publish(): + reconcile_original() + # First reconcile: publish a pending notification + if state.reconcile_count == 1: + state.pending = True + + manager.reconcile.side_effect = reconcile_then_publish + + async def fake_run_soul(soul_arg, user_input, *args, **kwargs): + run_soul_calls.append(user_input) + if len(run_soul_calls) > 1: + state.pending = False + + with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul): + code = await p.run(command="trigger") + + assert code == ExitCode.SUCCESS + assert len(run_soul_calls) == 2 + assert "" in run_soul_calls[1] + + +# --------------------------------------------------------------------------- +# Empty: no tasks, no pending → no wait, exit immediately +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_print_exits_normally_when_no_background_work(tmp_path: Path) -> None: + """No active tasks and no pending notifications → exit without waiting.""" + state = _FakeState(active=False, pending=False) + manager, notifications = _wire_manager(state) + + p, _ = _make_print_with_runtime(tmp_path, manager, notifications) + run_soul_calls: list[str] = [] + + async def fake_run_soul(soul_arg, user_input, *args, **kwargs): + run_soul_calls.append(user_input) + + with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul): + code = await p.run(command="hello") + + assert code == ExitCode.SUCCESS + assert len(run_soul_calls) == 1 + # Two reconciles: one at the top of the loop, one final double-check + # before break (to catch workers that finish between the two snapshots). + assert state.reconcile_count == 2 + + +# --------------------------------------------------------------------------- +# stream-json mode: must NOT block between commands +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_print_stream_json_does_not_wait_for_background_tasks( + tmp_path: Path, +) -> None: + """In stream-json mode the wait loop must be skipped entirely.""" + state = _FakeState(active=True, pending=True) + manager, notifications = _wire_manager(state) + + p, _ = _make_print_with_runtime(tmp_path, manager, notifications, input_format="stream-json") + run_soul_calls: list[str] = [] + read_count = 0 + + def fake_read_next_command(): + nonlocal read_count + read_count += 1 + if read_count == 1: + return "second command" + return None + + p._read_next_command = fake_read_next_command + + async def fake_run_soul(soul_arg, user_input, *args, **kwargs): + run_soul_calls.append(user_input) + + with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul): + code = await p.run(command="first command") + + assert code == ExitCode.SUCCESS + assert run_soul_calls == ["first command", "second command"] + # reconcile must NOT be called in stream-json mode + assert state.reconcile_count == 0 + + +# --------------------------------------------------------------------------- +# keep_alive_on_exit: wait loop is skipped entirely +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_print_skips_wait_when_keep_alive_on_exit_enabled(tmp_path: Path) -> None: + """When ``background.keep_alive_on_exit`` is True, background tasks are + supposed to outlive the CLI exit — so Print must not block waiting for + them to finish. Verify the wait loop is skipped entirely (reconcile is + not called, no re-entry happens) even when active tasks and pending LLM + notifications are both True.""" + state = _FakeState(active=True, pending=True) + manager, notifications = _wire_manager(state) + + p, _ = _make_print_with_runtime(tmp_path, manager, notifications, keep_alive_on_exit=True) + run_soul_calls: list[str] = [] + + async def fake_run_soul(soul_arg, user_input, *args, **kwargs): + run_soul_calls.append(user_input) + + with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul): + code = await p.run(command="fire and forget") + + assert code == ExitCode.SUCCESS + # Only the original command was processed — no wait, no re-entry. + assert len(run_soul_calls) == 1 + assert run_soul_calls[0] == "fire and forget" + # The wait loop was never entered — reconcile must not be called. + assert state.reconcile_count == 0 + + +# --------------------------------------------------------------------------- +# Cancellation → FAILURE, not SUCCESS +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_print_background_wait_cancel_returns_failure(tmp_path: Path) -> None: + """Ctrl+C during background wait should exit and return FAILURE.""" + state = _FakeState(active=True, pending=False) + manager, notifications = _wire_manager(state) + + p, _ = _make_print_with_runtime(tmp_path, manager, notifications) + + async def fake_run_soul(soul_arg, user_input, *args, **kwargs): + pass + + with ( + patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul), + patch("kimi_cli.ui.print.install_sigint_handler") as mock_sigint, + ): + cancel_handler = None + + def capture_handler(loop, handler): + nonlocal cancel_handler + cancel_handler = handler + return lambda: None + + mock_sigint.side_effect = capture_handler + + async def run_with_cancel(): + task = asyncio.create_task(p.run(command="test")) + await asyncio.sleep(0.05) + if cancel_handler: + cancel_handler() + return await asyncio.wait_for(task, timeout=5.0) + + code = await run_with_cancel() + + assert code == ExitCode.FAILURE + + +# --------------------------------------------------------------------------- +# Re-entry with sibling tasks still running (P1 scenario 2) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_print_reruns_soul_even_with_active_sibling_tasks( + tmp_path: Path, +) -> None: + """When one task finishes and publishes a notification while another is + still active, the re-entry must happen immediately — completed-task + progress must not wait on siblings.""" + state = _FakeState(active=True, pending=False) + manager, notifications = _wire_manager(state) + + p, _ = _make_print_with_runtime(tmp_path, manager, notifications) + run_soul_calls: list[str] = [] + + reconcile_original = manager.reconcile.side_effect + + def reconcile_then_publish(): + reconcile_original() + # First reconcile: publish notification for completed sibling, + # other task still running. + if state.reconcile_count == 1: + state.pending = True + + manager.reconcile.side_effect = reconcile_then_publish + + async def fake_run_soul(soul_arg, user_input, *args, **kwargs): + run_soul_calls.append(user_input) + if len(run_soul_calls) == 2: + # Re-entry: ack the pending notification and finish the sibling + state.pending = False + state.active = False + + with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul): + code = await p.run(command="siblings") + + assert code == ExitCode.SUCCESS + # Re-entry happened even though active=True at that moment + assert len(run_soul_calls) == 2 diff --git a/tests/core/test_print_background_integration.py b/tests/core/test_print_background_integration.py new file mode 100644 index 000000000..5cec7899b --- /dev/null +++ b/tests/core/test_print_background_integration.py @@ -0,0 +1,331 @@ +"""Integration test: Print mode background wait with real task/notification stores. + +Unlike the unit tests in test_kimisoul_background_wait.py (which mock +has_active_tasks/reconcile/has_pending_for_sink independently), this test +exercises the **real** reconcile → publish_terminal_notifications → +has_pending_for_sink chain with file-backed stores. It verifies that the +Print wait loop correctly detects background task completions and re-enters +the soul to process completion notifications. +""" + +from __future__ import annotations + +import asyncio +import time +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from kimi_cli.background.manager import BackgroundTaskManager +from kimi_cli.background.models import TaskRuntime, TaskSpec +from kimi_cli.cli import ExitCode +from kimi_cli.config import BackgroundConfig, NotificationConfig +from kimi_cli.notifications.manager import NotificationManager +from kimi_cli.soul.kimisoul import KimiSoul +from kimi_cli.ui.print import Print +from kimi_cli.wire.file import WireFile + + +def _make_session(tmp_path: Path) -> MagicMock: + """Create a minimal mock Session pointing at tmp_path for file stores.""" + session = MagicMock() + session.id = "integration-test" + session.context_file = tmp_path / "context.jsonl" + session.wire_file = WireFile(path=tmp_path / "wire.jsonl") + # Ensure parent dirs exist + session.context_file.parent.mkdir(parents=True, exist_ok=True) + return session + + +def _create_running_task(manager: BackgroundTaskManager, task_id: str) -> None: + """Create a bash-kind task in the store and mark it as running. + + We deliberately use ``kind="bash"`` (not ``"agent"``) because + ``recover()`` marks any running ``agent`` task that is not in + ``_live_agent_tasks`` as ``lost`` immediately. ``bash`` tasks are + kept alive while their ``heartbeat_at`` is fresh — perfect for + simulating an in-progress worker without actually spawning one. + """ + now = time.time() + spec = TaskSpec( + id=task_id, + kind="bash", + session_id="integration-test", + description=f"Test task {task_id}", + tool_call_id=f"call-{task_id}", + owner_role="root", + command="true", + shell_name="bash", + shell_path="/bin/bash", + cwd="/tmp", + timeout_s=60, + ) + manager.store.create_task(spec) + runtime = TaskRuntime( + status="running", + started_at=now, + heartbeat_at=now, # fresh heartbeat → recover() will not mark as lost + updated_at=now, + ) + manager.store.write_runtime(task_id, runtime) + + +def _complete_task(manager: BackgroundTaskManager, task_id: str) -> None: + """Mark a task as completed by writing terminal runtime status to disk.""" + now = time.time() + runtime = TaskRuntime( + status="completed", + exit_code=0, + finished_at=now, + updated_at=now, + ) + manager.store.write_runtime(task_id, runtime) + + +@pytest.mark.asyncio +async def test_real_reconcile_publishes_notification_and_triggers_reentry( + tmp_path: Path, +) -> None: + """End-to-end integration: a running task completes on disk, reconcile() + publishes a terminal notification, has_pending_for_sink("llm") returns + True, and Print re-enters run_soul.""" + session = _make_session(tmp_path) + bg_config = BackgroundConfig() + notif_config = NotificationConfig() + + notifications = NotificationManager(tmp_path / "notifications", notif_config) + manager = BackgroundTaskManager(session, bg_config, notifications=notifications) + + # Create a task that is "running" + _create_running_task(manager, "b-int-00001") + + # Verify preconditions + assert manager.has_active_tasks() + assert not notifications.has_pending_for_sink("llm") + + # Build a mock soul whose .runtime exposes the real manager/notifications + soul = AsyncMock(spec=KimiSoul) + soul.runtime = MagicMock() + soul.runtime.role = "root" + soul.runtime.config.background.keep_alive_on_exit = False + soul.runtime.background_tasks = manager + soul.runtime.notifications = notifications + soul.runtime.session = session + + p = Print( + soul=soul, + input_format="text", + output_format="text", + context_file=tmp_path / "ctx.json", + ) + + run_soul_calls: list[str] = [] + + async def fake_run_soul(soul_arg, user_input, *args, **kwargs): + run_soul_calls.append(user_input) + if len(run_soul_calls) == 1: + # After the first run_soul, simulate the background worker + # completing by writing terminal status to disk. The wait loop's + # reconcile() will pick this up. + _complete_task(manager, "b-int-00001") + else: + # On re-entry, simulate the real soul draining pending + # "llm" notifications (like deliver_pending would). + for view in notifications.claim_for_sink("llm"): + notifications.ack("llm", view.event.id) + + with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul): + code = await asyncio.wait_for(p.run(command="do work"), timeout=10.0) + + assert code == ExitCode.SUCCESS + + # The real chain was exercised: + # 1. run_soul returns, _complete_task wrote terminal status + # 2. pre-loop reconcile() → publish_terminal_notifications() publishes + # a "task.completed" notification targeting the "llm" sink + # 3. while: has_active_tasks()=False, has_pending_for_sink("llm")=True + # → enter body + # 4. body: has_active_tasks()=False → re-enter run_soul + # 5. re-entry mock acks the notification + # 6. while: both conditions False → exit + assert len(run_soul_calls) == 2 + assert run_soul_calls[0] == "do work" + assert "" in run_soul_calls[1] + assert not notifications.has_pending_for_sink("llm") + + +@pytest.mark.asyncio +async def test_real_reconcile_no_reentry_when_task_completes_without_notification( + tmp_path: Path, +) -> None: + """If a task completes and reconcile() publishes a notification, but then + the notification is acked (drained) by run_soul, no further re-entry + should happen.""" + session = _make_session(tmp_path) + notifications = NotificationManager(tmp_path / "notifications", NotificationConfig()) + manager = BackgroundTaskManager(session, BackgroundConfig(), notifications=notifications) + + _create_running_task(manager, "b-int-00002") + + soul = AsyncMock(spec=KimiSoul) + soul.runtime = MagicMock() + soul.runtime.role = "root" + soul.runtime.config.background.keep_alive_on_exit = False + soul.runtime.background_tasks = manager + soul.runtime.notifications = notifications + soul.runtime.session = session + + p = Print( + soul=soul, + input_format="text", + output_format="text", + context_file=tmp_path / "ctx.json", + ) + + run_soul_calls: list[str] = [] + + async def fake_run_soul(soul_arg, user_input, *args, **kwargs): + run_soul_calls.append(user_input) + if len(run_soul_calls) == 1: + _complete_task(manager, "b-int-00002") + elif len(run_soul_calls) == 2: + # Simulate the soul draining all pending "llm" notifications + # during this re-entry (like real deliver_pending would). + for view in notifications.claim_for_sink("llm"): + notifications.ack("llm", view.event.id) + + with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul): + code = await asyncio.wait_for(p.run(command="drain test"), timeout=10.0) + + assert code == ExitCode.SUCCESS + # First call: original command; second: bg-task follow-up. + # No third call because the re-entry acked all notifications. + assert len(run_soul_calls) == 2 + assert not notifications.has_pending_for_sink("llm") + + +@pytest.mark.asyncio +async def test_real_reconcile_multiple_tasks( + tmp_path: Path, +) -> None: + """With two tasks, the first completing triggers reconcile + notification; + the second still keeps the loop active until it also completes.""" + session = _make_session(tmp_path) + notifications = NotificationManager(tmp_path / "notifications", NotificationConfig()) + manager = BackgroundTaskManager(session, BackgroundConfig(), notifications=notifications) + + _create_running_task(manager, "b-int-00003") + _create_running_task(manager, "b-int-00004") + + soul = AsyncMock(spec=KimiSoul) + soul.runtime = MagicMock() + soul.runtime.role = "root" + soul.runtime.config.background.keep_alive_on_exit = False + soul.runtime.background_tasks = manager + soul.runtime.notifications = notifications + soul.runtime.session = session + + p = Print( + soul=soul, + input_format="text", + output_format="text", + context_file=tmp_path / "ctx.json", + ) + + run_soul_calls: list[str] = [] + + async def fake_run_soul(soul_arg, user_input, *args, **kwargs): + run_soul_calls.append(user_input) + # Ack any pending LLM notifications, like real deliver_pending would + for view in notifications.claim_for_sink("llm"): + notifications.ack("llm", view.event.id) + if len(run_soul_calls) == 1: + # First run: complete task 003, leave 004 running + _complete_task(manager, "b-int-00003") + elif len(run_soul_calls) == 2: + # Re-entry after task 003's notification: complete task 004 + _complete_task(manager, "b-int-00004") + + with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul): + code = await asyncio.wait_for(p.run(command="two tasks"), timeout=15.0) + + assert code == ExitCode.SUCCESS + # 1 original + 1 re-entry (task 003 done) + 1 re-entry (task 004 done) + assert len(run_soul_calls) == 3 + assert not notifications.has_pending_for_sink("llm") + + +@pytest.mark.asyncio +async def test_race_window_worker_finishes_between_reconcile_and_active_check( + tmp_path: Path, +) -> None: + """Guard against the race where a worker writes terminal status between + the top-of-loop reconcile() and the has_active_tasks() check. Without + a final double-check reconcile() before break, the completion + notification would be lost and the process would exit without + informing the LLM. + + We simulate the race by wrapping has_active_tasks(): on the call that + sees no active tasks, we first flip the underlying task to completed + on disk. Then the "final reconcile" inside the break branch must + still publish the notification and the loop must re-enter the soul. + """ + session = _make_session(tmp_path) + notifications = NotificationManager(tmp_path / "notifications", NotificationConfig()) + manager = BackgroundTaskManager(session, BackgroundConfig(), notifications=notifications) + + _create_running_task(manager, "b-int-race1") + + # Wrap has_active_tasks so that right before it reports "no active + # tasks", we simulate the worker flipping the runtime to terminal on + # disk. The top-of-loop reconcile already ran before this point, so + # the only way the notification gets published is via a SECOND + # reconcile after the active check — that's the P1 fix under test. + real_has_active = manager.has_active_tasks + task_completed_on_disk = {"done": False} + + def racy_has_active(): + if not task_completed_on_disk["done"]: + # First call in this iteration: tasks are "running"; but + # simulate the worker completing right before the next + # snapshot by flipping it here. The next reconcile() must + # still catch it. + _complete_task(manager, "b-int-race1") + task_completed_on_disk["done"] = True + return False + return real_has_active() + + manager.has_active_tasks = racy_has_active # type: ignore[method-assign] + + soul = AsyncMock(spec=KimiSoul) + soul.runtime = MagicMock() + soul.runtime.role = "root" + soul.runtime.config.background.keep_alive_on_exit = False + soul.runtime.background_tasks = manager + soul.runtime.notifications = notifications + soul.runtime.session = session + + p = Print( + soul=soul, + input_format="text", + output_format="text", + context_file=tmp_path / "ctx.json", + ) + + run_soul_calls: list[str] = [] + + async def fake_run_soul(soul_arg, user_input, *args, **kwargs): + run_soul_calls.append(user_input) + if len(run_soul_calls) > 1: + for view in notifications.claim_for_sink("llm"): + notifications.ack("llm", view.event.id) + + with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul): + code = await asyncio.wait_for(p.run(command="race"), timeout=10.0) + + assert code == ExitCode.SUCCESS + # Without the double-check reconcile, this would be 1 (the race loses + # the completion notification). With the fix, it is 2. + assert len(run_soul_calls) == 2 + assert "" in run_soul_calls[1]