diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4324cf0eb..14b2e5d32 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,7 @@ Only write entries that are worth mentioning to users.
## Unreleased
+- Print: Wait for background tasks before exiting — in one-shot `--print` mode, the process now waits for running background agents to finish and lets the model process their results, instead of exiting and killing them
- Core: Fix agent loop silently stopping when model response contains only thinking content — detect think-only responses (reasoning content with no text or tool calls) as an incomplete response error and retry automatically
- Core: Fix crash on streaming mid-flight network disconnection — when the OpenAI SDK raises a base `APIError` (instead of `APIConnectionError`) during long-running streams, the error is now correctly classified as retryable, enabling automatic retry and connection recovery instead of an unrecoverable crash
- Shell: Exclude empty current session from `/sessions` picker — completely empty sessions (no conversation history and no custom title) are no longer shown in the session list; sessions with a custom title are still displayed
diff --git a/docs/en/release-notes/changelog.md b/docs/en/release-notes/changelog.md
index 7bf369374..2e1a5d218 100644
--- a/docs/en/release-notes/changelog.md
+++ b/docs/en/release-notes/changelog.md
@@ -4,6 +4,7 @@ This page documents the changes in each Kimi Code CLI release.
## Unreleased
+- Print: Wait for background tasks before exiting — in one-shot `--print` mode, the process now waits for running background agents to finish and lets the model process their results, instead of exiting and killing them
- Core: Fix agent loop silently stopping when model response contains only thinking content — detect think-only responses (reasoning content with no text or tool calls) as an incomplete response error and retry automatically
- Core: Fix crash on streaming mid-flight network disconnection — when the OpenAI SDK raises a base `APIError` (instead of `APIConnectionError`) during long-running streams, the error is now correctly classified as retryable, enabling automatic retry and connection recovery instead of an unrecoverable crash
- Shell: Exclude empty current session from `/sessions` picker — completely empty sessions (no conversation history and no custom title) are no longer shown in the session list; sessions with a custom title are still displayed
diff --git a/docs/zh/release-notes/changelog.md b/docs/zh/release-notes/changelog.md
index 84286bdb9..303347b01 100644
--- a/docs/zh/release-notes/changelog.md
+++ b/docs/zh/release-notes/changelog.md
@@ -4,6 +4,7 @@
## 未发布
+- Print:退出前等待后台任务完成——在单次 `--print` 模式下,进程现在会等待仍在运行的后台 Agent 完成并让模型处理它们的结果,而不是直接退出并杀死它们
- Core:修复模型响应仅包含思考内容时 agent loop 静默停止的问题——将仅含思考内容(无文本或工具调用)的响应检测为不完整响应错误并自动重试
- Core:修复长时间 streaming 过程中网络断连导致崩溃的问题——当 OpenAI SDK 在流式传输中途抛出基类 `APIError`(而非 `APIConnectionError`)时,现在能正确识别为可重试错误,自动触发重试和连接恢复,而不再直接崩溃退出
- Shell:从 `/sessions` 选择器中排除空的当前会话——完全为空的会话(既无对话记录也无自定义标题)不再显示在会话列表中;有自定义标题的会话仍然正常显示
diff --git a/src/kimi_cli/background/manager.py b/src/kimi_cli/background/manager.py
index fc6399f15..eead26598 100644
--- a/src/kimi_cli/background/manager.py
+++ b/src/kimi_cli/background/manager.py
@@ -93,6 +93,14 @@ def _active_task_count(self) -> int:
1 for view in self._store.list_views() if not is_terminal_status(view.runtime.status)
)
+ def has_active_tasks(self) -> bool:
+ """Return True if any background tasks are in a non-terminal status.
+
+ This includes ``running``, ``awaiting_approval``, and any other
+ non-terminal state — not just actively executing tasks.
+ """
+ return self._active_task_count() > 0
+
def _worker_command(self, task_dir: Path) -> list[str]:
if getattr(sys, "frozen", False):
return [
diff --git a/src/kimi_cli/soul/kimisoul.py b/src/kimi_cli/soul/kimisoul.py
index 9365c11b9..81b954ae9 100644
--- a/src/kimi_cli/soul/kimisoul.py
+++ b/src/kimi_cli/soul/kimisoul.py
@@ -758,6 +758,7 @@ async def _agent_loop(self) -> TurnOutcome:
has_steers = await self._consume_pending_steers()
if has_steers:
continue # steers injected, force another LLM step
+
final_message = (
step_outcome.assistant_message
if step_outcome.stop_reason == "no_tool_calls"
diff --git a/src/kimi_cli/ui/print/__init__.py b/src/kimi_cli/ui/print/__init__.py
index 6242c33af..8e483c4a0 100644
--- a/src/kimi_cli/ui/print/__init__.py
+++ b/src/kimi_cli/ui/print/__init__.py
@@ -96,6 +96,74 @@ def _handler():
runtime.session.wire_file if runtime else None,
runtime,
)
+
+ # In one-shot text mode the process exits after this
+ # function returns, which would kill still-running
+ # background agents. Poll until they finish, calling
+ # reconcile() each iteration (the notification pump
+ # inside run_soul is no longer running, so we must
+ # drive reconcile ourselves to recover lost workers
+ # and publish terminal notifications). Only re-enter
+ # the soul when there are pending LLM notifications.
+ #
+ # stream-json mode is multi-turn: background tasks
+ # from one command must not block the next command.
+ #
+ # keep_alive_on_exit opts into "fire and forget"
+ # semantics: background tasks are meant to outlive
+ # the CLI process, so Print must not block waiting
+ # for them.
+ if (
+ runtime
+ and runtime.role == "root"
+ and self.input_format == "text"
+ and not runtime.config.background.keep_alive_on_exit
+ ):
+ manager = runtime.background_tasks
+ notifications = runtime.notifications
+ while not cancel_event.is_set():
+ # Drive reconcile() ourselves: the notification
+ # pump inside run_soul is no longer running, so
+ # we must recover lost workers and publish
+ # terminal notifications here.
+ manager.reconcile()
+ if notifications.has_pending_for_sink("llm"):
+ # Re-enter soul so the LLM can process the
+ # completion notification. Do this even if
+ # other tasks are still active — progress on
+ # completed tasks should not wait on siblings.
+ bg_prompt = (
+ ""
+ "Background tasks have completed."
+ " Process their results."
+ ""
+ )
+ await run_soul(
+ self.soul,
+ bg_prompt,
+ partial(visualize, self.output_format, self.final_only),
+ cancel_event,
+ runtime.session.wire_file,
+ runtime,
+ )
+ continue
+ if not manager.has_active_tasks():
+ # Re-check once after noticing no active
+ # tasks: a worker may have finished between
+ # the reconcile above and this snapshot,
+ # leaving a terminal state on disk that we
+ # haven't published yet. Without this
+ # second reconcile+pending check, that
+ # final completion notification would be
+ # lost when the process exits.
+ manager.reconcile()
+ if notifications.has_pending_for_sink("llm"):
+ continue
+ break
+ # Still waiting for tasks to finish.
+ await asyncio.sleep(1.0)
+ if cancel_event.is_set():
+ raise RunCancelled
else:
logger.info("Empty command, skipping")
diff --git a/tests/core/test_kimisoul_background_wait.py b/tests/core/test_kimisoul_background_wait.py
new file mode 100644
index 000000000..2daacbb92
--- /dev/null
+++ b/tests/core/test_kimisoul_background_wait.py
@@ -0,0 +1,406 @@
+"""Tests for Print mode background task waiting behavior.
+
+When background agents are still running after ``run_soul()`` completes a turn,
+**text** (one-shot) print mode should:
+
+- drive ``reconcile()`` each iteration (the notification pump inside ``run_soul``
+ is no longer running, so we must recover lost workers and publish terminal
+ notifications ourselves);
+- re-enter the soul whenever ``has_pending_for_sink("llm")`` is True — even if
+ other tasks are still active — so per-task progress is not blocked by
+ long-running siblings;
+- keep polling until both ``has_active_tasks()`` and ``has_pending_for_sink``
+ are False;
+- skip the wait loop entirely in ``stream-json`` mode (multi-turn) so
+ background tasks from one command do not block the next command;
+- raise ``RunCancelled`` when ``cancel_event`` is set.
+"""
+
+from __future__ import annotations
+
+import asyncio
+from pathlib import Path
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+from kimi_cli.cli import ExitCode, InputFormat
+from kimi_cli.soul.kimisoul import KimiSoul
+from kimi_cli.ui.print import Print
+
+
+class _FakeState:
+ """Mutable state that drives has_active_tasks / has_pending_for_sink."""
+
+ def __init__(self, *, active: bool = False, pending: bool = False):
+ self.active = active
+ self.pending = pending
+ self.reconcile_count = 0
+
+
+def _wire_manager(state: _FakeState) -> tuple[MagicMock, MagicMock]:
+ manager = MagicMock()
+ manager.has_active_tasks = MagicMock(side_effect=lambda: state.active)
+
+ def _reconcile():
+ state.reconcile_count += 1
+
+ manager.reconcile = MagicMock(side_effect=_reconcile)
+
+ notifications = MagicMock()
+ notifications.has_pending_for_sink = MagicMock(side_effect=lambda sink: state.pending)
+ return manager, notifications
+
+
+def _make_print_with_runtime(
+ tmp_path: Path,
+ manager: MagicMock,
+ notifications: MagicMock,
+ *,
+ input_format: InputFormat = "text",
+ keep_alive_on_exit: bool = False,
+) -> tuple[Print, AsyncMock]:
+ soul = AsyncMock(spec=KimiSoul)
+ soul.runtime = MagicMock()
+ soul.runtime.role = "root"
+ soul.runtime.background_tasks = manager
+ soul.runtime.notifications = notifications
+ soul.runtime.config.background.keep_alive_on_exit = keep_alive_on_exit
+ soul.runtime.session.wire_file = tmp_path / "wire.jsonl"
+
+ p = Print(
+ soul=soul,
+ input_format=input_format,
+ output_format="text",
+ context_file=tmp_path / "context.json",
+ )
+ return p, soul
+
+
+# ---------------------------------------------------------------------------
+# Core: wait → pending → re-enter soul
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_print_reruns_soul_on_pending_notification(tmp_path: Path) -> None:
+ """After run_soul, if tasks complete and create pending LLM notifications,
+ Print should re-enter run_soul with a system-reminder prompt."""
+ state = _FakeState(active=True, pending=False)
+ manager, notifications = _wire_manager(state)
+
+ p, _ = _make_print_with_runtime(tmp_path, manager, notifications)
+ run_soul_calls: list[str] = []
+
+ async def fake_run_soul(soul_arg, user_input, *args, **kwargs):
+ run_soul_calls.append(user_input)
+ if len(run_soul_calls) == 1:
+ # Simulate a worker finishing + reconcile publishing a notification
+ state.active = False
+ state.pending = True
+ else:
+ # Re-entry drains the pending notification (like real deliver_pending)
+ state.pending = False
+
+ with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul):
+ code = await p.run(command="do work")
+
+ assert code == ExitCode.SUCCESS
+ assert len(run_soul_calls) == 2
+ assert run_soul_calls[0] == "do work"
+ assert "" in run_soul_calls[1]
+ assert state.reconcile_count >= 1
+
+
+# ---------------------------------------------------------------------------
+# reconcile() is called on every poll iteration
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_print_calls_reconcile_each_poll_iteration(tmp_path: Path) -> None:
+ """reconcile() must be called on every poll iteration."""
+ state = _FakeState(active=True, pending=False)
+ manager, notifications = _wire_manager(state)
+
+ p, _ = _make_print_with_runtime(tmp_path, manager, notifications)
+ call_count = 0
+
+ async def fake_run_soul(soul_arg, user_input, *args, **kwargs):
+ nonlocal call_count
+ call_count += 1
+
+ # Patch sleep to also decrement a poll counter so the test finishes fast
+ poll_counter = {"n": 0}
+ real_sleep = asyncio.sleep
+
+ async def fake_sleep(duration):
+ poll_counter["n"] += 1
+ if poll_counter["n"] >= 3:
+ state.active = False
+ await real_sleep(0)
+
+ with (
+ patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul),
+ patch("kimi_cli.ui.print.asyncio.sleep", side_effect=fake_sleep),
+ ):
+ await p.run(command="test")
+
+ # Before each sleep there is a reconcile call (and one final reconcile
+ # after the last sleep). Expect at least 3 reconciles.
+ assert state.reconcile_count >= 3
+
+
+# ---------------------------------------------------------------------------
+# No re-entry when no notifications are pending
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_print_skips_reentry_when_no_pending_notifications(tmp_path: Path) -> None:
+ """If tasks complete but there are no pending LLM notifications, the soul
+ should NOT be re-entered."""
+ state = _FakeState(active=True, pending=False)
+ manager, notifications = _wire_manager(state)
+
+ p, _ = _make_print_with_runtime(tmp_path, manager, notifications)
+ run_soul_calls: list[str] = []
+
+ async def fake_run_soul(soul_arg, user_input, *args, **kwargs):
+ run_soul_calls.append(user_input)
+
+ real_sleep = asyncio.sleep
+
+ async def fake_sleep(duration):
+ state.active = False
+ await real_sleep(0)
+
+ with (
+ patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul),
+ patch("kimi_cli.ui.print.asyncio.sleep", side_effect=fake_sleep),
+ ):
+ code = await p.run(command="hello")
+
+ assert code == ExitCode.SUCCESS
+ assert len(run_soul_calls) == 1
+
+
+# ---------------------------------------------------------------------------
+# Pre-existing pending notifications: tasks already done before first check
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_print_reruns_soul_when_tasks_done_but_notifications_pending(
+ tmp_path: Path,
+) -> None:
+ """If all tasks finished before the first check and reconcile publishes
+ notifications, the soul should still be re-entered to drain them."""
+ state = _FakeState(active=False, pending=False)
+ manager, notifications = _wire_manager(state)
+
+ p, _ = _make_print_with_runtime(tmp_path, manager, notifications)
+ run_soul_calls: list[str] = []
+ reconcile_original = manager.reconcile.side_effect
+
+ def reconcile_then_publish():
+ reconcile_original()
+ # First reconcile: publish a pending notification
+ if state.reconcile_count == 1:
+ state.pending = True
+
+ manager.reconcile.side_effect = reconcile_then_publish
+
+ async def fake_run_soul(soul_arg, user_input, *args, **kwargs):
+ run_soul_calls.append(user_input)
+ if len(run_soul_calls) > 1:
+ state.pending = False
+
+ with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul):
+ code = await p.run(command="trigger")
+
+ assert code == ExitCode.SUCCESS
+ assert len(run_soul_calls) == 2
+ assert "" in run_soul_calls[1]
+
+
+# ---------------------------------------------------------------------------
+# Empty: no tasks, no pending → no wait, exit immediately
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_print_exits_normally_when_no_background_work(tmp_path: Path) -> None:
+ """No active tasks and no pending notifications → exit without waiting."""
+ state = _FakeState(active=False, pending=False)
+ manager, notifications = _wire_manager(state)
+
+ p, _ = _make_print_with_runtime(tmp_path, manager, notifications)
+ run_soul_calls: list[str] = []
+
+ async def fake_run_soul(soul_arg, user_input, *args, **kwargs):
+ run_soul_calls.append(user_input)
+
+ with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul):
+ code = await p.run(command="hello")
+
+ assert code == ExitCode.SUCCESS
+ assert len(run_soul_calls) == 1
+ # Two reconciles: one at the top of the loop, one final double-check
+ # before break (to catch workers that finish between the two snapshots).
+ assert state.reconcile_count == 2
+
+
+# ---------------------------------------------------------------------------
+# stream-json mode: must NOT block between commands
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_print_stream_json_does_not_wait_for_background_tasks(
+ tmp_path: Path,
+) -> None:
+ """In stream-json mode the wait loop must be skipped entirely."""
+ state = _FakeState(active=True, pending=True)
+ manager, notifications = _wire_manager(state)
+
+ p, _ = _make_print_with_runtime(tmp_path, manager, notifications, input_format="stream-json")
+ run_soul_calls: list[str] = []
+ read_count = 0
+
+ def fake_read_next_command():
+ nonlocal read_count
+ read_count += 1
+ if read_count == 1:
+ return "second command"
+ return None
+
+ p._read_next_command = fake_read_next_command
+
+ async def fake_run_soul(soul_arg, user_input, *args, **kwargs):
+ run_soul_calls.append(user_input)
+
+ with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul):
+ code = await p.run(command="first command")
+
+ assert code == ExitCode.SUCCESS
+ assert run_soul_calls == ["first command", "second command"]
+ # reconcile must NOT be called in stream-json mode
+ assert state.reconcile_count == 0
+
+
+# ---------------------------------------------------------------------------
+# keep_alive_on_exit: wait loop is skipped entirely
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_print_skips_wait_when_keep_alive_on_exit_enabled(tmp_path: Path) -> None:
+ """When ``background.keep_alive_on_exit`` is True, background tasks are
+ supposed to outlive the CLI exit — so Print must not block waiting for
+ them to finish. Verify the wait loop is skipped entirely (reconcile is
+ not called, no re-entry happens) even when active tasks and pending LLM
+ notifications are both True."""
+ state = _FakeState(active=True, pending=True)
+ manager, notifications = _wire_manager(state)
+
+ p, _ = _make_print_with_runtime(tmp_path, manager, notifications, keep_alive_on_exit=True)
+ run_soul_calls: list[str] = []
+
+ async def fake_run_soul(soul_arg, user_input, *args, **kwargs):
+ run_soul_calls.append(user_input)
+
+ with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul):
+ code = await p.run(command="fire and forget")
+
+ assert code == ExitCode.SUCCESS
+ # Only the original command was processed — no wait, no re-entry.
+ assert len(run_soul_calls) == 1
+ assert run_soul_calls[0] == "fire and forget"
+ # The wait loop was never entered — reconcile must not be called.
+ assert state.reconcile_count == 0
+
+
+# ---------------------------------------------------------------------------
+# Cancellation → FAILURE, not SUCCESS
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_print_background_wait_cancel_returns_failure(tmp_path: Path) -> None:
+ """Ctrl+C during background wait should exit and return FAILURE."""
+ state = _FakeState(active=True, pending=False)
+ manager, notifications = _wire_manager(state)
+
+ p, _ = _make_print_with_runtime(tmp_path, manager, notifications)
+
+ async def fake_run_soul(soul_arg, user_input, *args, **kwargs):
+ pass
+
+ with (
+ patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul),
+ patch("kimi_cli.ui.print.install_sigint_handler") as mock_sigint,
+ ):
+ cancel_handler = None
+
+ def capture_handler(loop, handler):
+ nonlocal cancel_handler
+ cancel_handler = handler
+ return lambda: None
+
+ mock_sigint.side_effect = capture_handler
+
+ async def run_with_cancel():
+ task = asyncio.create_task(p.run(command="test"))
+ await asyncio.sleep(0.05)
+ if cancel_handler:
+ cancel_handler()
+ return await asyncio.wait_for(task, timeout=5.0)
+
+ code = await run_with_cancel()
+
+ assert code == ExitCode.FAILURE
+
+
+# ---------------------------------------------------------------------------
+# Re-entry with sibling tasks still running (P1 scenario 2)
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_print_reruns_soul_even_with_active_sibling_tasks(
+ tmp_path: Path,
+) -> None:
+ """When one task finishes and publishes a notification while another is
+ still active, the re-entry must happen immediately — completed-task
+ progress must not wait on siblings."""
+ state = _FakeState(active=True, pending=False)
+ manager, notifications = _wire_manager(state)
+
+ p, _ = _make_print_with_runtime(tmp_path, manager, notifications)
+ run_soul_calls: list[str] = []
+
+ reconcile_original = manager.reconcile.side_effect
+
+ def reconcile_then_publish():
+ reconcile_original()
+ # First reconcile: publish notification for completed sibling,
+ # other task still running.
+ if state.reconcile_count == 1:
+ state.pending = True
+
+ manager.reconcile.side_effect = reconcile_then_publish
+
+ async def fake_run_soul(soul_arg, user_input, *args, **kwargs):
+ run_soul_calls.append(user_input)
+ if len(run_soul_calls) == 2:
+ # Re-entry: ack the pending notification and finish the sibling
+ state.pending = False
+ state.active = False
+
+ with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul):
+ code = await p.run(command="siblings")
+
+ assert code == ExitCode.SUCCESS
+ # Re-entry happened even though active=True at that moment
+ assert len(run_soul_calls) == 2
diff --git a/tests/core/test_print_background_integration.py b/tests/core/test_print_background_integration.py
new file mode 100644
index 000000000..5cec7899b
--- /dev/null
+++ b/tests/core/test_print_background_integration.py
@@ -0,0 +1,331 @@
+"""Integration test: Print mode background wait with real task/notification stores.
+
+Unlike the unit tests in test_kimisoul_background_wait.py (which mock
+has_active_tasks/reconcile/has_pending_for_sink independently), this test
+exercises the **real** reconcile → publish_terminal_notifications →
+has_pending_for_sink chain with file-backed stores. It verifies that the
+Print wait loop correctly detects background task completions and re-enters
+the soul to process completion notifications.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import time
+from pathlib import Path
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+from kimi_cli.background.manager import BackgroundTaskManager
+from kimi_cli.background.models import TaskRuntime, TaskSpec
+from kimi_cli.cli import ExitCode
+from kimi_cli.config import BackgroundConfig, NotificationConfig
+from kimi_cli.notifications.manager import NotificationManager
+from kimi_cli.soul.kimisoul import KimiSoul
+from kimi_cli.ui.print import Print
+from kimi_cli.wire.file import WireFile
+
+
+def _make_session(tmp_path: Path) -> MagicMock:
+ """Create a minimal mock Session pointing at tmp_path for file stores."""
+ session = MagicMock()
+ session.id = "integration-test"
+ session.context_file = tmp_path / "context.jsonl"
+ session.wire_file = WireFile(path=tmp_path / "wire.jsonl")
+ # Ensure parent dirs exist
+ session.context_file.parent.mkdir(parents=True, exist_ok=True)
+ return session
+
+
+def _create_running_task(manager: BackgroundTaskManager, task_id: str) -> None:
+ """Create a bash-kind task in the store and mark it as running.
+
+ We deliberately use ``kind="bash"`` (not ``"agent"``) because
+ ``recover()`` marks any running ``agent`` task that is not in
+ ``_live_agent_tasks`` as ``lost`` immediately. ``bash`` tasks are
+ kept alive while their ``heartbeat_at`` is fresh — perfect for
+ simulating an in-progress worker without actually spawning one.
+ """
+ now = time.time()
+ spec = TaskSpec(
+ id=task_id,
+ kind="bash",
+ session_id="integration-test",
+ description=f"Test task {task_id}",
+ tool_call_id=f"call-{task_id}",
+ owner_role="root",
+ command="true",
+ shell_name="bash",
+ shell_path="/bin/bash",
+ cwd="/tmp",
+ timeout_s=60,
+ )
+ manager.store.create_task(spec)
+ runtime = TaskRuntime(
+ status="running",
+ started_at=now,
+ heartbeat_at=now, # fresh heartbeat → recover() will not mark as lost
+ updated_at=now,
+ )
+ manager.store.write_runtime(task_id, runtime)
+
+
+def _complete_task(manager: BackgroundTaskManager, task_id: str) -> None:
+ """Mark a task as completed by writing terminal runtime status to disk."""
+ now = time.time()
+ runtime = TaskRuntime(
+ status="completed",
+ exit_code=0,
+ finished_at=now,
+ updated_at=now,
+ )
+ manager.store.write_runtime(task_id, runtime)
+
+
+@pytest.mark.asyncio
+async def test_real_reconcile_publishes_notification_and_triggers_reentry(
+ tmp_path: Path,
+) -> None:
+ """End-to-end integration: a running task completes on disk, reconcile()
+ publishes a terminal notification, has_pending_for_sink("llm") returns
+ True, and Print re-enters run_soul."""
+ session = _make_session(tmp_path)
+ bg_config = BackgroundConfig()
+ notif_config = NotificationConfig()
+
+ notifications = NotificationManager(tmp_path / "notifications", notif_config)
+ manager = BackgroundTaskManager(session, bg_config, notifications=notifications)
+
+ # Create a task that is "running"
+ _create_running_task(manager, "b-int-00001")
+
+ # Verify preconditions
+ assert manager.has_active_tasks()
+ assert not notifications.has_pending_for_sink("llm")
+
+ # Build a mock soul whose .runtime exposes the real manager/notifications
+ soul = AsyncMock(spec=KimiSoul)
+ soul.runtime = MagicMock()
+ soul.runtime.role = "root"
+ soul.runtime.config.background.keep_alive_on_exit = False
+ soul.runtime.background_tasks = manager
+ soul.runtime.notifications = notifications
+ soul.runtime.session = session
+
+ p = Print(
+ soul=soul,
+ input_format="text",
+ output_format="text",
+ context_file=tmp_path / "ctx.json",
+ )
+
+ run_soul_calls: list[str] = []
+
+ async def fake_run_soul(soul_arg, user_input, *args, **kwargs):
+ run_soul_calls.append(user_input)
+ if len(run_soul_calls) == 1:
+ # After the first run_soul, simulate the background worker
+ # completing by writing terminal status to disk. The wait loop's
+ # reconcile() will pick this up.
+ _complete_task(manager, "b-int-00001")
+ else:
+ # On re-entry, simulate the real soul draining pending
+ # "llm" notifications (like deliver_pending would).
+ for view in notifications.claim_for_sink("llm"):
+ notifications.ack("llm", view.event.id)
+
+ with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul):
+ code = await asyncio.wait_for(p.run(command="do work"), timeout=10.0)
+
+ assert code == ExitCode.SUCCESS
+
+ # The real chain was exercised:
+ # 1. run_soul returns, _complete_task wrote terminal status
+ # 2. pre-loop reconcile() → publish_terminal_notifications() publishes
+ # a "task.completed" notification targeting the "llm" sink
+ # 3. while: has_active_tasks()=False, has_pending_for_sink("llm")=True
+ # → enter body
+ # 4. body: has_active_tasks()=False → re-enter run_soul
+ # 5. re-entry mock acks the notification
+ # 6. while: both conditions False → exit
+ assert len(run_soul_calls) == 2
+ assert run_soul_calls[0] == "do work"
+ assert "" in run_soul_calls[1]
+ assert not notifications.has_pending_for_sink("llm")
+
+
+@pytest.mark.asyncio
+async def test_real_reconcile_no_reentry_when_task_completes_without_notification(
+ tmp_path: Path,
+) -> None:
+ """If a task completes and reconcile() publishes a notification, but then
+ the notification is acked (drained) by run_soul, no further re-entry
+ should happen."""
+ session = _make_session(tmp_path)
+ notifications = NotificationManager(tmp_path / "notifications", NotificationConfig())
+ manager = BackgroundTaskManager(session, BackgroundConfig(), notifications=notifications)
+
+ _create_running_task(manager, "b-int-00002")
+
+ soul = AsyncMock(spec=KimiSoul)
+ soul.runtime = MagicMock()
+ soul.runtime.role = "root"
+ soul.runtime.config.background.keep_alive_on_exit = False
+ soul.runtime.background_tasks = manager
+ soul.runtime.notifications = notifications
+ soul.runtime.session = session
+
+ p = Print(
+ soul=soul,
+ input_format="text",
+ output_format="text",
+ context_file=tmp_path / "ctx.json",
+ )
+
+ run_soul_calls: list[str] = []
+
+ async def fake_run_soul(soul_arg, user_input, *args, **kwargs):
+ run_soul_calls.append(user_input)
+ if len(run_soul_calls) == 1:
+ _complete_task(manager, "b-int-00002")
+ elif len(run_soul_calls) == 2:
+ # Simulate the soul draining all pending "llm" notifications
+ # during this re-entry (like real deliver_pending would).
+ for view in notifications.claim_for_sink("llm"):
+ notifications.ack("llm", view.event.id)
+
+ with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul):
+ code = await asyncio.wait_for(p.run(command="drain test"), timeout=10.0)
+
+ assert code == ExitCode.SUCCESS
+ # First call: original command; second: bg-task follow-up.
+ # No third call because the re-entry acked all notifications.
+ assert len(run_soul_calls) == 2
+ assert not notifications.has_pending_for_sink("llm")
+
+
+@pytest.mark.asyncio
+async def test_real_reconcile_multiple_tasks(
+ tmp_path: Path,
+) -> None:
+ """With two tasks, the first completing triggers reconcile + notification;
+ the second still keeps the loop active until it also completes."""
+ session = _make_session(tmp_path)
+ notifications = NotificationManager(tmp_path / "notifications", NotificationConfig())
+ manager = BackgroundTaskManager(session, BackgroundConfig(), notifications=notifications)
+
+ _create_running_task(manager, "b-int-00003")
+ _create_running_task(manager, "b-int-00004")
+
+ soul = AsyncMock(spec=KimiSoul)
+ soul.runtime = MagicMock()
+ soul.runtime.role = "root"
+ soul.runtime.config.background.keep_alive_on_exit = False
+ soul.runtime.background_tasks = manager
+ soul.runtime.notifications = notifications
+ soul.runtime.session = session
+
+ p = Print(
+ soul=soul,
+ input_format="text",
+ output_format="text",
+ context_file=tmp_path / "ctx.json",
+ )
+
+ run_soul_calls: list[str] = []
+
+ async def fake_run_soul(soul_arg, user_input, *args, **kwargs):
+ run_soul_calls.append(user_input)
+ # Ack any pending LLM notifications, like real deliver_pending would
+ for view in notifications.claim_for_sink("llm"):
+ notifications.ack("llm", view.event.id)
+ if len(run_soul_calls) == 1:
+ # First run: complete task 003, leave 004 running
+ _complete_task(manager, "b-int-00003")
+ elif len(run_soul_calls) == 2:
+ # Re-entry after task 003's notification: complete task 004
+ _complete_task(manager, "b-int-00004")
+
+ with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul):
+ code = await asyncio.wait_for(p.run(command="two tasks"), timeout=15.0)
+
+ assert code == ExitCode.SUCCESS
+ # 1 original + 1 re-entry (task 003 done) + 1 re-entry (task 004 done)
+ assert len(run_soul_calls) == 3
+ assert not notifications.has_pending_for_sink("llm")
+
+
+@pytest.mark.asyncio
+async def test_race_window_worker_finishes_between_reconcile_and_active_check(
+ tmp_path: Path,
+) -> None:
+ """Guard against the race where a worker writes terminal status between
+ the top-of-loop reconcile() and the has_active_tasks() check. Without
+ a final double-check reconcile() before break, the completion
+ notification would be lost and the process would exit without
+ informing the LLM.
+
+ We simulate the race by wrapping has_active_tasks(): on the call that
+ sees no active tasks, we first flip the underlying task to completed
+ on disk. Then the "final reconcile" inside the break branch must
+ still publish the notification and the loop must re-enter the soul.
+ """
+ session = _make_session(tmp_path)
+ notifications = NotificationManager(tmp_path / "notifications", NotificationConfig())
+ manager = BackgroundTaskManager(session, BackgroundConfig(), notifications=notifications)
+
+ _create_running_task(manager, "b-int-race1")
+
+ # Wrap has_active_tasks so that right before it reports "no active
+ # tasks", we simulate the worker flipping the runtime to terminal on
+ # disk. The top-of-loop reconcile already ran before this point, so
+ # the only way the notification gets published is via a SECOND
+ # reconcile after the active check — that's the P1 fix under test.
+ real_has_active = manager.has_active_tasks
+ task_completed_on_disk = {"done": False}
+
+ def racy_has_active():
+ if not task_completed_on_disk["done"]:
+ # First call in this iteration: tasks are "running"; but
+ # simulate the worker completing right before the next
+ # snapshot by flipping it here. The next reconcile() must
+ # still catch it.
+ _complete_task(manager, "b-int-race1")
+ task_completed_on_disk["done"] = True
+ return False
+ return real_has_active()
+
+ manager.has_active_tasks = racy_has_active # type: ignore[method-assign]
+
+ soul = AsyncMock(spec=KimiSoul)
+ soul.runtime = MagicMock()
+ soul.runtime.role = "root"
+ soul.runtime.config.background.keep_alive_on_exit = False
+ soul.runtime.background_tasks = manager
+ soul.runtime.notifications = notifications
+ soul.runtime.session = session
+
+ p = Print(
+ soul=soul,
+ input_format="text",
+ output_format="text",
+ context_file=tmp_path / "ctx.json",
+ )
+
+ run_soul_calls: list[str] = []
+
+ async def fake_run_soul(soul_arg, user_input, *args, **kwargs):
+ run_soul_calls.append(user_input)
+ if len(run_soul_calls) > 1:
+ for view in notifications.claim_for_sink("llm"):
+ notifications.ack("llm", view.event.id)
+
+ with patch("kimi_cli.ui.print.run_soul", side_effect=fake_run_soul):
+ code = await asyncio.wait_for(p.run(command="race"), timeout=10.0)
+
+ assert code == ExitCode.SUCCESS
+ # Without the double-check reconcile, this would be 1 (the race loses
+ # the completion notification). With the fix, it is 2.
+ assert len(run_soul_calls) == 2
+ assert "" in run_soul_calls[1]