Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
| --- | --- | --- |
| `ASTRBOT_BACKEND_URL` | 后端基础 URL | 默认 `http://127.0.0.1:6185/` |
| `ASTRBOT_BACKEND_AUTO_START` | 是否自动拉起后端 | 默认 `1`(启用) |
| `ASTRBOT_BACKEND_TIMEOUT_MS` | 后端就绪等待超时 | 开发模式默认 `20000`;打包模式默认回退 `300000` |
| `ASTRBOT_BACKEND_TIMEOUT_MS` | 后端就绪等待超时 | 开发模式默认 `20000`;打包模式默认回退 `900000` |
| `ASTRBOT_BACKEND_STARTUP_IDLE_TIMEOUT_MS` | 后端启动 heartbeat 空闲超时 | 默认 `60000`,范围 `5000~900000` |
| `ASTRBOT_BACKEND_READY_HTTP_PATH` | 就绪探针 HTTP 路径 | 默认 `/api/stat/start-time` |
| `ASTRBOT_BACKEND_READY_PROBE_TIMEOUT_MS` | 就绪探针单次超时 | 默认回退到 `ASTRBOT_BACKEND_PING_TIMEOUT_MS` |
| `ASTRBOT_BACKEND_READY_POLL_INTERVAL_MS` | 就绪轮询间隔 | 默认 `300`,并按边界 clamp |
Expand Down Expand Up @@ -53,6 +54,7 @@
| 变量 | 用途 | 默认值/行为 |
| --- | --- | --- |
| `ASTRBOT_DESKTOP_CLIENT` | 标记桌面客户端环境 | 打包态启动后端时写入 `1` |
| `ASTRBOT_BACKEND_STARTUP_HEARTBEAT_PATH` | 桌面端写给后端启动器的 heartbeat 文件路径 | 打包态默认写到 `ASTRBOT_ROOT/data/backend-startup-heartbeat.json` |

## 4. 发布/CI(GitHub Actions)

Expand Down
131 changes: 122 additions & 9 deletions scripts/backend/templates/launch_backend.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
from __future__ import annotations

import atexit
import ctypes
import json
import os
import runpy
import sys
import threading
import time
from pathlib import Path

BACKEND_DIR = Path(__file__).resolve().parent
APP_DIR = BACKEND_DIR / "app"
_WINDOWS_DLL_DIRECTORY_HANDLES: list[object] = []
# Keep this in sync with BACKEND_STARTUP_HEARTBEAT_PATH_ENV in src-tauri/src/app_constants.rs.
STARTUP_HEARTBEAT_ENV = "ASTRBOT_BACKEND_STARTUP_HEARTBEAT_PATH"
STARTUP_HEARTBEAT_INTERVAL_SECONDS = 2.0
STARTUP_HEARTBEAT_STOP_JOIN_TIMEOUT_SECONDS = 1.0


def configure_stdio_utf8() -> None:
Expand Down Expand Up @@ -113,15 +121,120 @@ def preload_windows_runtime_dlls() -> None:
continue


configure_stdio_utf8()
configure_windows_dll_search_path()
preload_windows_runtime_dlls()
def resolve_startup_heartbeat_path() -> Path | None:
raw = os.environ.get(STARTUP_HEARTBEAT_ENV, "").strip()
if not raw:
return None
return Path(raw)

sys.path.insert(0, str(APP_DIR))

main_file = APP_DIR / "main.py"
if not main_file.is_file():
raise FileNotFoundError(f"Backend entrypoint not found: {main_file}")
def build_heartbeat_payload(state: str) -> dict[str, object]:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider inlining the heartbeat JSON construction and atomic write logic directly into write_startup_heartbeat to avoid unnecessary helper indirection.

You can reduce indirection without changing behavior by inlining the tightly-coupled helpers into write_startup_heartbeat.

Right now:

def build_heartbeat_payload(state: str) -> dict[str, object]:
    return {
        "pid": os.getpid(),
        "state": state,
        "updated_at_ms": int(time.time() * 1000),
    }


def atomic_write_json(path: Path, payload: dict[str, object]) -> None:
    temp_path = path.with_name(f"{path.name}.tmp")
    temp_path.write_text(
        json.dumps(payload, separators=(",", ":")),
        encoding="utf-8",
    )
    temp_path.replace(path)


def write_startup_heartbeat(
    path: Path, state: str, *, warn_on_error: bool = False
) -> bool:
    try:
        path.parent.mkdir(parents=True, exist_ok=True)
        atomic_write_json(path, build_heartbeat_payload(state))
        return True
    except Exception as exc:
        ...

These helpers are only used together in a single call path and don’t provide reuse elsewhere, so they add mental hops without buying much.

You can keep the atomic write semantics and JSON format but collapse this into a single function:

def write_startup_heartbeat(
    path: Path, state: str, *, warn_on_error: bool = False
) -> bool:
    try:
        path.parent.mkdir(parents=True, exist_ok=True)
        temp_path = path.with_name(f"{path.name}.tmp")
        payload = {
            "pid": os.getpid(),
            "state": state,
            "updated_at_ms": int(time.time() * 1000),
        }
        temp_path.write_text(
            json.dumps(payload, separators=(",", ":")),
            encoding="utf-8",
        )
        temp_path.replace(path)
        return True
    except Exception as exc:
        if warn_on_error:
            print(
                f"[startup-heartbeat] failed to write heartbeat to {path}: "
                f"{exc.__class__.__name__}: {exc}",
                file=sys.stderr,
            )
        return False

Then you can remove build_heartbeat_payload and atomic_write_json entirely. This makes the heartbeat write path self-contained and easier to scan, while preserving all existing behavior (atomic replace, payload shape, logging).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying the heartbeat implementation by inlining helpers, reducing state in the loop, and centralizing logging policy to make the code easier to follow.

You can simplify the heartbeat implementation without changing behavior in any meaningful way for this feature.

1. Flatten build_heartbeat_payload and atomic_write_json into write_startup_heartbeat

For this internal heartbeat file, the extra helper layers don’t add much clarity, but they do add mental overhead. You can keep the atomic-write behavior while reducing the helper depth:

def write_startup_heartbeat(
    path: Path, state: str, *, warn_on_error: bool = False
) -> bool:
    payload = {
        "pid": os.getpid(),
        "state": state,
        "updated_at_ms": int(time.time() * 1000),
    }
    temp_path = path.with_name(f"{path.name}.tmp")

    try:
        path.parent.mkdir(parents=True, exist_ok=True)
        temp_path.write_text(
            json.dumps(payload, separators=(",", ":")),
            encoding="utf-8",
        )
        temp_path.replace(path)
        return True
    except Exception as exc:
        # Best-effort cleanup; ignore errors.
        try:
            temp_path.unlink(missing_ok=True)
        except Exception:
            pass

        if warn_on_error:
            print(
                f"[startup-heartbeat] failed to write heartbeat to {path}: "
                f"{exc.__class__.__name__}: {exc}",
                file=sys.stderr,
            )
        return False

This removes two helpers (build_heartbeat_payload, atomic_write_json) while preserving the atomic update behavior and logging.

2. Simplify heartbeat_loop logging policy

The dual-boolean state (had_successful_write, warning_emitted_since_last_success) plus should_warn is a lot of logic for a logging policy. A time-based throttle keeps visibility and is easier to follow.

For example, log at most once every N seconds:

LOG_THROTTLE_SECONDS = 30.0  # or whatever is reasonable

def heartbeat_loop(
    path: Path, interval_seconds: float, stop_event: threading.Event
) -> None:
    last_log_time = 0.0

    while not stop_event.wait(interval_seconds):
        now = time.time()
        warn_now = (now - last_log_time) >= LOG_THROTTLE_SECONDS

        ok = write_startup_heartbeat(path, "starting", warn_on_error=warn_now)
        if not ok and warn_now:
            last_log_time = now

If you still want “every failure before first success, then throttled”, you can keep a single had_successful_write flag instead of two booleans plus a closure:

def heartbeat_loop(
    path: Path, interval_seconds: float, stop_event: threading.Event
) -> None:
    had_successful_write = False
    last_log_time = 0.0
    LOG_THROTTLE_SECONDS = 30.0

    while not stop_event.wait(interval_seconds):
        now = time.time()
        warn_now = (not had_successful_write) or (
            (now - last_log_time) >= LOG_THROTTLE_SECONDS
        )

        ok = write_startup_heartbeat(path, "starting", warn_on_error=warn_now)
        if ok:
            had_successful_write = True
        elif warn_now:
            last_log_time = now

This preserves the “pre-first-success: log every failure; post-first-success: throttled” semantics with much simpler state.

3. Consider removing warn_on_error + bool return if logging is unified

If you adopt a simple throttle inside the loop, you can also simplify write_startup_heartbeat to only raise or only log, and keep the logging policy in one place. For example:

def write_startup_heartbeat(path: Path, state: str) -> None:
    # same body as above but raise instead of returning bool / warn_on_error
    ...

Then in heartbeat_loop:

try:
    write_startup_heartbeat(path, "starting")
except Exception as exc:
    if warn_now:
        print(
            f"[startup-heartbeat] failed to write heartbeat to {path}: "
            f"{exc.__class__.__name__}: {exc}",
            file=sys.stderr,
        )

This removes the warn_on_error flag and boolean return, and concentrates the logging policy in the loop.

return {
"pid": os.getpid(),
"state": state,
"updated_at_ms": int(time.time() * 1000),
}

sys.argv[0] = str(main_file)
runpy.run_path(str(main_file), run_name="__main__")

def atomic_write_json(path: Path, payload: dict[str, object]) -> None:
temp_path = path.with_name(f"{path.name}.tmp")
temp_path.write_text(
json.dumps(payload, separators=(",", ":")),
encoding="utf-8",
)
try:
temp_path.replace(path)
except Exception:
try:
temp_path.unlink(missing_ok=True)
except Exception:
pass
raise


def write_startup_heartbeat(
path: Path, state: str, *, warn_on_error: bool = False
) -> bool:
try:
path.parent.mkdir(parents=True, exist_ok=True)
atomic_write_json(path, build_heartbeat_payload(state))
return True
except Exception as exc:
if warn_on_error:
print(
f"[startup-heartbeat] failed to write heartbeat to {path}: {exc.__class__.__name__}: {exc}",
file=sys.stderr,
)
return False


def heartbeat_loop(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying the heartbeat warning logic and inlining the payload builder to make the heartbeat flow easier to follow and maintain.

You can reduce complexity in two small, targeted ways without changing the overall behavior (periodic atomic JSON writes + “stopping” heartbeat).

1. Simplify heartbeat loop warning logic

The current heartbeat_loop has two flags plus a nested should_warn helper. You can replace that small state machine with a simple time-based throttling scheme, which is easier to read and reason about.

For example, throttle warnings to at most once every 10 seconds:

WARNING_THROTTLE_SECONDS = 10.0


def heartbeat_loop(
    path: Path, interval_seconds: float, stop_event: threading.Event
) -> None:
    last_warning_time: float | None = None

    while not stop_event.wait(interval_seconds):
        now = time.time()
        warn_now = (
            last_warning_time is None
            or (now - last_warning_time) >= WARNING_THROTTLE_SECONDS
        )

        ok = write_startup_heartbeat(path, "starting", warn_on_error=warn_now)

        if warn_now and not ok:
            last_warning_time = now

This keeps log volume bounded while removing had_successful_write, warning_emitted_since_last_success, and the nested function.

2. Inline build_heartbeat_payload into write_startup_heartbeat

build_heartbeat_payload only wraps a small dict and is only used from write_startup_heartbeat. Inlining it removes one layer in the call stack (build_heartbeat_payload → atomic_write_json → write_startup_heartbeat → heartbeat_loop), making the flow easier to follow.

def write_startup_heartbeat(
    path: Path, state: str, *, warn_on_error: bool = False
) -> bool:
    payload = {
        "pid": os.getpid(),
        "state": state,
        "updated_at_ms": int(time.time() * 1000),
    }

    try:
        path.parent.mkdir(parents=True, exist_ok=True)
        atomic_write_json(path, payload)
        return True
    except Exception as exc:
        if warn_on_error:
            print(
                f"[startup-heartbeat] failed to write heartbeat to {path}: "
                f"{exc.__class__.__name__}: {exc}",
                file=sys.stderr,
            )
        return False

atomic_write_json remains a focused helper for the critical “write temp → replace” behavior, but the heartbeat logic is now more localized and straightforward.

path: Path, interval_seconds: float, stop_event: threading.Event
) -> None:
# At least one successful write has happened.
had_successful_write = False
# A warning has already been emitted since the last successful write.
warning_emitted_since_last_success = False

def should_warn() -> bool:
# Before the first successful heartbeat we want every failure to surface so startup
# path/permission issues stay visible. After a success, only warn on the first failure in
# each consecutive failure run to avoid log spam.
return (not had_successful_write) or (not warning_emitted_since_last_success)

ok = write_startup_heartbeat(path, "starting", warn_on_error=True)
if ok:
had_successful_write = True
else:
warning_emitted_since_last_success = True

while not stop_event.wait(interval_seconds):
warn_now = should_warn()
ok = write_startup_heartbeat(path, "starting", warn_on_error=warn_now)
if ok:
had_successful_write = True
warning_emitted_since_last_success = False
elif warn_now:
warning_emitted_since_last_success = True


def start_startup_heartbeat() -> None:
heartbeat_path = resolve_startup_heartbeat_path()
if heartbeat_path is None:
return

stop_event = threading.Event()
thread = threading.Thread(
target=heartbeat_loop,
args=(heartbeat_path, STARTUP_HEARTBEAT_INTERVAL_SECONDS, stop_event),
name="astrbot-startup-heartbeat",
daemon=True,
)

def on_exit() -> None:
stop_event.set()
thread.join(timeout=STARTUP_HEARTBEAT_STOP_JOIN_TIMEOUT_SECONDS)
write_startup_heartbeat(heartbeat_path, "stopping", warn_on_error=True)

thread.start()
atexit.register(on_exit)


def main() -> None:
configure_stdio_utf8()
configure_windows_dll_search_path()
preload_windows_runtime_dlls()
start_startup_heartbeat()

sys.path.insert(0, str(APP_DIR))

main_file = APP_DIR / "main.py"
if not main_file.is_file():
raise FileNotFoundError(f"Backend entrypoint not found: {main_file}")

sys.argv[0] = str(main_file)
runpy.run_path(str(main_file), run_name="__main__")


if __name__ == "__main__":
main()
136 changes: 136 additions & 0 deletions scripts/backend/templates/test_launch_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import importlib.util
import tempfile
import unittest
from pathlib import Path
from unittest import mock


MODULE_PATH = Path(__file__).with_name("launch_backend.py")
SPEC = importlib.util.spec_from_file_location("launch_backend_under_test", MODULE_PATH)
if SPEC is None or SPEC.loader is None:
raise RuntimeError(f"Cannot load launch_backend module from {MODULE_PATH}")
launch_backend = importlib.util.module_from_spec(SPEC)
SPEC.loader.exec_module(launch_backend)


class StartupHeartbeatTests(unittest.TestCase):
def test_atomic_write_json_cleans_up_temp_file_when_replace_fails(self) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
heartbeat_path = Path(temp_dir) / "heartbeat.json"
temp_path = heartbeat_path.with_name(f"{heartbeat_path.name}.tmp")

with mock.patch.object(
Path,
"replace",
autospec=True,
side_effect=OSError("replace failed"),
):
with self.assertRaises(OSError):
launch_backend.atomic_write_json(
heartbeat_path,
{"pid": 42, "state": "starting", "updated_at_ms": 5000},
)

self.assertFalse(temp_path.exists())

def test_repeated_failures_warn_before_first_success(self) -> None:
stop_event = mock.Mock()
stop_event.wait.side_effect = [False, True]

with mock.patch.object(
launch_backend,
"write_startup_heartbeat",
side_effect=[False, False],
) as write_mock:
launch_backend.heartbeat_loop(Path("/tmp/heartbeat.json"), 2.0, stop_event)

self.assertEqual(
[call.kwargs["warn_on_error"] for call in write_mock.call_args_list],
[True, True],
)

def test_repeated_failures_after_success_are_suppressed(self) -> None:
stop_event = mock.Mock()
stop_event.wait.side_effect = [False, False, True]

with mock.patch.object(
launch_backend,
"write_startup_heartbeat",
side_effect=[True, False, False],
) as write_mock:
launch_backend.heartbeat_loop(Path("/tmp/heartbeat.json"), 2.0, stop_event)

self.assertEqual(
[call.kwargs["warn_on_error"] for call in write_mock.call_args_list],
[True, True, False],
)

def test_stop_failure_still_warns_after_earlier_failure(self) -> None:
stop_event = mock.Mock()
thread = mock.Mock()
register = mock.Mock()

with mock.patch.object(
launch_backend,
"write_startup_heartbeat",
return_value=False,
) as write_mock:
with mock.patch.object(
launch_backend,
"resolve_startup_heartbeat_path",
return_value=Path("/tmp/heartbeat.json"),
):
with mock.patch.object(
launch_backend.threading, "Event", return_value=stop_event
):
with mock.patch.object(
launch_backend.threading, "Thread", return_value=thread
):
with mock.patch.object(
launch_backend.atexit, "register", register
):
launch_backend.start_startup_heartbeat()
thread.join.assert_not_called()
on_exit = register.call_args.args[0]
on_exit()

thread.join.assert_called_once_with(
timeout=launch_backend.STARTUP_HEARTBEAT_STOP_JOIN_TIMEOUT_SECONDS
)
self.assertEqual(
[call.args[1] for call in write_mock.call_args_list],
["stopping"],
)
self.assertEqual(
[call.kwargs["warn_on_error"] for call in write_mock.call_args_list],
[True],
)

def test_start_startup_heartbeat_does_not_register_exit_handler_when_thread_start_fails(
self,
) -> None:
stop_event = mock.Mock()
thread = mock.Mock()
thread.start.side_effect = RuntimeError("thread start failed")
register = mock.Mock()

with mock.patch.object(
launch_backend,
"resolve_startup_heartbeat_path",
return_value=Path("/tmp/heartbeat.json"),
):
with mock.patch.object(
launch_backend.threading, "Event", return_value=stop_event
):
with mock.patch.object(
launch_backend.threading, "Thread", return_value=thread
):
with mock.patch.object(launch_backend.atexit, "register", register):
with self.assertRaises(RuntimeError):
launch_backend.start_startup_heartbeat()

register.assert_not_called()


if __name__ == "__main__":
unittest.main()
12 changes: 11 additions & 1 deletion src-tauri/src/app_constants.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::time::Duration;

pub(crate) const DEFAULT_BACKEND_URL: &str = "http://127.0.0.1:6185/";
pub(crate) const ASTRBOT_ROOT_ENV: &str = "ASTRBOT_ROOT";
pub(crate) const BACKEND_TIMEOUT_ENV: &str = "ASTRBOT_BACKEND_TIMEOUT_MS";
pub(crate) const PACKAGED_BACKEND_TIMEOUT_FALLBACK_MS: u64 = 5 * 60 * 1000;
pub(crate) const PACKAGED_BACKEND_TIMEOUT_FALLBACK_MS: u64 = 15 * 60 * 1000;
pub(crate) const GRACEFUL_RESTART_REQUEST_TIMEOUT_MS: u64 = 2_500;
pub(crate) const GRACEFUL_RESTART_START_TIME_TIMEOUT_MS: u64 = 1_800;
pub(crate) const GRACEFUL_RESTART_POLL_INTERVAL_MS: u64 = 350;
Expand All @@ -17,6 +18,15 @@ pub(crate) const BACKEND_READY_PROBE_TIMEOUT_ENV: &str = "ASTRBOT_BACKEND_READY_
pub(crate) const BACKEND_READY_PROBE_TIMEOUT_MIN_MS: u64 = 100;
pub(crate) const BACKEND_READY_PROBE_TIMEOUT_MAX_MS: u64 = 30_000;
pub(crate) const BACKEND_READY_TCP_PROBE_TIMEOUT_MAX_MS: u64 = 1_000;
pub(crate) const BACKEND_STARTUP_IDLE_TIMEOUT_ENV: &str = "ASTRBOT_BACKEND_STARTUP_IDLE_TIMEOUT_MS";
pub(crate) const DEFAULT_BACKEND_STARTUP_IDLE_TIMEOUT_MS: u64 = 60 * 1000;
pub(crate) const BACKEND_STARTUP_IDLE_TIMEOUT_MIN_MS: u64 = 5_000;
pub(crate) const BACKEND_STARTUP_IDLE_TIMEOUT_MAX_MS: u64 = 15 * 60 * 1000;
// Keep this in sync with STARTUP_HEARTBEAT_ENV in scripts/backend/templates/launch_backend.py.
pub(crate) const BACKEND_STARTUP_HEARTBEAT_PATH_ENV: &str =
"ASTRBOT_BACKEND_STARTUP_HEARTBEAT_PATH";
pub(crate) const DEFAULT_BACKEND_STARTUP_HEARTBEAT_RELATIVE_PATH: &str =
"data/backend-startup-heartbeat.json";
pub(crate) const DEFAULT_BACKEND_PING_TIMEOUT_MS: u64 = 800;
pub(crate) const BACKEND_PING_TIMEOUT_MIN_MS: u64 = 50;
pub(crate) const BACKEND_PING_TIMEOUT_MAX_MS: u64 = 30_000;
Expand Down
1 change: 1 addition & 0 deletions src-tauri/src/app_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ mod tests {
cwd: PathBuf::from("."),
root_dir: None,
webui_dir: None,
startup_heartbeat_path: None,
packaged_mode: false,
};

Expand Down
1 change: 1 addition & 0 deletions src-tauri/src/app_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub(crate) struct LaunchPlan {
pub(crate) cwd: PathBuf,
pub(crate) root_dir: Option<PathBuf>,
pub(crate) webui_dir: Option<PathBuf>,
pub(crate) startup_heartbeat_path: Option<PathBuf>,
pub(crate) packaged_mode: bool,
}

Expand Down
Loading