Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 142 additions & 9 deletions codeframe/ui/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,12 @@ def _validate_security_config():
)


def _detect_worker_count() -> int:
"""Best-effort detection of the configured uvicorn/gunicorn worker count.

Reads ``WEB_CONCURRENCY`` (honored by both uvicorn and gunicorn) and
``UVICORN_WORKERS`` (uvicorn's env-var form of ``--workers``) and returns the
largest valid value found, or ``1`` when neither is set or parseable.
def _worker_count_from_env() -> int:
"""Worker count from ``WEB_CONCURRENCY`` / ``UVICORN_WORKERS`` env vars.

Worker count cannot be observed directly from inside a worker process, so
this is a best-effort signal for the startup warning below. A bare
``uvicorn --workers N`` on the command line (no env var) is not detected.
Returns the largest valid value found, or ``1`` when neither is set or
parseable. ``WEB_CONCURRENCY`` is honored by both uvicorn and gunicorn;
``UVICORN_WORKERS`` is uvicorn's env-var form of ``--workers``.
"""
count = 1
for var in ("WEB_CONCURRENCY", "UVICORN_WORKERS"):
Expand All @@ -196,6 +192,143 @@ def _detect_worker_count() -> int:
return count


def _parse_worker_count_from_argv(argv: list[str]) -> int | None:
"""Parse a uvicorn/gunicorn worker count from a command-line argv list.

Recognizes ``--workers N``, ``--workers=N``, ``-w N`` and ``-w=N``. Returns
the positive integer worker count, or ``None`` when no parseable, positive
worker flag is present.
"""
for i, token in enumerate(argv):
value: str | None = None
if token in ("--workers", "-w"):
value = argv[i + 1] if i + 1 < len(argv) else None
elif token.startswith("--workers="):
value = token[len("--workers="):]
elif token.startswith("-w="):
value = token[len("-w="):]
if value is None:
continue
try:
count = int(value.strip())
except (ValueError, AttributeError):
continue
if count > 0:
return count
return None


def _is_asgi_server_cmdline(argv: list[str]) -> bool:
"""True when ``argv`` looks like a uvicorn/gunicorn server invocation.

Matches ``uvicorn``, ``python -m uvicorn``, an absolute ``uvicorn`` path, and
``gunicorn`` (incl. ``-k uvicorn.workers.UvicornWorker``). Used to avoid
reading a ``--workers`` flag off an unrelated ancestor (a wrapper, supervisor,
or test runner) and mistaking it for the server's worker count.

This is a loose substring match on any token, so a path that merely contains
``uvicorn``/``gunicorn`` would also match. That's acceptable for an
advisory-only warning: a false positive only emits an unnecessary WARNING, it
never breaks startup.
"""
return any(("uvicorn" in token or "gunicorn" in token) for token in argv)


def _read_proc_cmdline(pid: int) -> list[str] | None:
"""Read ``/proc/<pid>/cmdline`` as an argv list, or ``None`` if unavailable.

Linux-only; returns ``None`` on any error (missing /proc, permission, race).
"""
try:
with open(f"/proc/{pid}/cmdline", "rb") as f:
raw = f.read()
except OSError:
return None
if not raw:
return None
# /proc cmdline is NUL-separated and usually NUL-terminated.
return [part.decode("utf-8", "replace") for part in raw.split(b"\x00") if part]


def _read_proc_ppid(pid: int) -> int | None:
"""Read the parent PID from ``/proc/<pid>/status``, or ``None`` on error."""
try:
with open(f"/proc/{pid}/status", "r", encoding="ascii") as f:
for line in f:
if line.startswith("PPid:"):
return int(line.split()[1])
except (OSError, ValueError, IndexError):
return None
return None


def _iter_ancestor_cmdlines(max_depth: int = 8) -> list[list[str]]:
"""Best-effort: this process's cmdline plus those of its ancestors.

Walks up the process tree via ``/proc`` (Linux only). Returns ``[]`` on
non-Linux platforms or when nothing is readable. Never raises.
"""
cmdlines: list[list[str]] = []
try:
pid: int | None = os.getpid()
except OSError:
return cmdlines
seen: set[int] = set()
for _ in range(max_depth):
if pid is None or pid <= 0 or pid in seen:
break
seen.add(pid)
cmdline = _read_proc_cmdline(pid)
if cmdline:
cmdlines.append(cmdline)
pid = _read_proc_ppid(pid)
return cmdlines


def _detect_workers_from_process_tree() -> int | None:
"""Best-effort worker count from a ``--workers N`` flag on an ancestor.

Worker subprocesses are spawned, so their own ``sys.argv`` does not carry
``--workers`` — the original flag lives on the supervisor (parent) process.
Returns ``None`` when nothing is found or detection isn't possible. Never
raises.
"""
try:
for argv in _iter_ancestor_cmdlines():
if not _is_asgi_server_cmdline(argv):
continue
count = _parse_worker_count_from_argv(argv)
if count is not None:
return count
except Exception: # best-effort signal; never break startup
return None
return None


def _detect_worker_count() -> int:
"""Best-effort detection of the configured uvicorn/gunicorn worker count.

Combines two signals and returns the largest:

- ``WEB_CONCURRENCY`` / ``UVICORN_WORKERS`` env vars (set by most production
process managers), and
- a ``--workers N`` flag on an ancestor process command line, which covers a
bare ``uvicorn ... --workers N`` invocation that sets no env var (Linux
only, via ``/proc``).

This is a best-effort signal for the startup warning below; detection
failure degrades silently to the env-var result (never raises).
"""
count = _worker_count_from_env()
try:
from_tree = _detect_workers_from_process_tree()
except Exception:
from_tree = None
if from_tree is not None and from_tree > count:
count = from_tree
return count


def _per_worker_rate_limit_warning(
*, enabled: bool, storage: str, worker_count: int
) -> str | None:
Expand Down
176 changes: 176 additions & 0 deletions tests/ui/test_rate_limit_worker_warning.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,179 @@ def test_no_warning_when_rate_limiting_disabled():
)
is None
)


# --- _parse_worker_count_from_argv (issue #680) --------------------------


@pytest.mark.parametrize(
"argv, expected",
[
(["uvicorn", "codeframe.ui.server:app", "--workers", "2"], 2),
(["uvicorn", "app", "--workers=4"], 4),
(["gunicorn", "-w", "3", "app"], 3),
(["gunicorn", "-w=5", "app"], 5),
(["python", "-m", "uvicorn", "app", "--workers", " 6 "], 6),
# Not present / not a worker flag
(["uvicorn", "codeframe.ui.server:app"], None),
(["uvicorn", "app", "--port", "8000"], None),
# Trailing flag with no value
(["uvicorn", "app", "--workers"], None),
# Unparseable / non-positive values are ignored
(["uvicorn", "app", "--workers", "notanumber"], None),
(["uvicorn", "app", "--workers", "0"], None),
(["uvicorn", "app", "--workers", "-2"], None),
([], None),
],
)
def test_parse_worker_count_from_argv(argv, expected):
assert server._parse_worker_count_from_argv(argv) == expected


# --- _detect_workers_from_process_tree (issue #680) ----------------------


def test_process_tree_detection_finds_workers_flag(monkeypatch):
"""A `--workers N` on an ancestor cmdline is detected."""
monkeypatch.setattr(
server,
"_iter_ancestor_cmdlines",
lambda max_depth=8: [
["python", "spawn_main"], # the worker's own bootstrap argv
["uvicorn", "codeframe.ui.server:app", "--workers", "3"], # supervisor
],
)
assert server._detect_workers_from_process_tree() == 3


def test_process_tree_detection_ignores_non_server_workers_flag(monkeypatch):
"""A `--workers` on a non-uvicorn/gunicorn ancestor must not false-positive.

A wrapper/test-runner with its own `--workers` flag is skipped; the real
server ancestor (further up) is the one that counts.
"""
monkeypatch.setattr(
server,
"_iter_ancestor_cmdlines",
lambda max_depth=8: [
["some-wrapper", "--workers", "9"], # NOT a server → ignored
["uvicorn", "codeframe.ui.server:app", "--workers", "2"], # real one
],
)
assert server._detect_workers_from_process_tree() == 2


def test_process_tree_detection_ignores_lone_non_server_workers_flag(monkeypatch):
monkeypatch.setattr(
server,
"_iter_ancestor_cmdlines",
lambda max_depth=8: [["pytest", "-p", "xdist", "--workers", "4"]],
)
assert server._detect_workers_from_process_tree() is None


@pytest.mark.parametrize(
"argv, is_server",
[
(["uvicorn", "app"], True),
(["/usr/bin/uvicorn", "app"], True),
(["python", "-m", "uvicorn", "app"], True),
(["gunicorn", "-k", "uvicorn.workers.UvicornWorker", "app"], True),
(["pytest", "--workers", "4"], False),
(["some-wrapper", "--workers", "9"], False),
([], False),
],
)
def test_is_asgi_server_cmdline(argv, is_server):
assert server._is_asgi_server_cmdline(argv) is is_server


def test_process_tree_detection_returns_none_when_absent(monkeypatch):
monkeypatch.setattr(
server, "_iter_ancestor_cmdlines", lambda max_depth=8: [["uvicorn", "app"]]
)
assert server._detect_workers_from_process_tree() is None


def test_process_tree_detection_never_raises(monkeypatch):
"""Detection degrades gracefully if the walker blows up."""

def boom(max_depth=8):
raise OSError("no /proc here")

monkeypatch.setattr(server, "_iter_ancestor_cmdlines", boom)
assert server._detect_workers_from_process_tree() is None


# --- _detect_worker_count integrates env + process tree (issue #680) ------


def test_detect_worker_count_uses_process_tree(monkeypatch):
"""Bare `uvicorn --workers N` (no env vars) is detected via the tree."""
monkeypatch.setattr(
server, "_detect_workers_from_process_tree", lambda: 2
)
assert server._detect_worker_count() == 2


def test_detect_worker_count_env_wins_when_larger(monkeypatch):
monkeypatch.setenv("WEB_CONCURRENCY", "8")
monkeypatch.setattr(
server, "_detect_workers_from_process_tree", lambda: 2
)
assert server._detect_worker_count() == 8


def test_detect_worker_count_tree_wins_when_larger(monkeypatch):
monkeypatch.setenv("WEB_CONCURRENCY", "2")
monkeypatch.setattr(
server, "_detect_workers_from_process_tree", lambda: 5
)
assert server._detect_worker_count() == 5


def test_detect_worker_count_falls_back_to_env_on_tree_failure(monkeypatch):
"""If the process-tree scan fails, the env-based result still stands."""
monkeypatch.setenv("UVICORN_WORKERS", "4")

def boom():
raise RuntimeError("unexpected")

monkeypatch.setattr(server, "_detect_workers_from_process_tree", boom)
assert server._detect_worker_count() == 4


def test_detect_worker_count_defaults_to_one_with_no_signals(monkeypatch):
monkeypatch.setattr(
server, "_detect_workers_from_process_tree", lambda: None
)
assert server._detect_worker_count() == 1


# --- _iter_ancestor_cmdlines via a fake /proc (issue #680) ---------------


def test_iter_ancestor_cmdlines_walks_proc(monkeypatch):
"""The walker reads the current process and its ancestors from /proc."""
# Fake a tiny process tree: 100 (self) -> 50 (supervisor) -> 1 (init)
cmdlines = {
100: ["python", "spawn_main"],
50: ["uvicorn", "app", "--workers", "2"],
1: ["/sbin/init"],
}
ppids = {100: 50, 50: 1, 1: 0}

monkeypatch.setattr(server.os, "getpid", lambda: 100)
monkeypatch.setattr(server, "_read_proc_cmdline", lambda pid: cmdlines.get(pid))
monkeypatch.setattr(server, "_read_proc_ppid", lambda pid: ppids.get(pid))

result = server._iter_ancestor_cmdlines(max_depth=8)
assert ["uvicorn", "app", "--workers", "2"] in result
# And the parser finds the count through it
assert server._detect_workers_from_process_tree() == 2


def test_iter_ancestor_cmdlines_returns_empty_on_missing_proc(monkeypatch):
monkeypatch.setattr(server, "_read_proc_cmdline", lambda pid: None)
monkeypatch.setattr(server, "_read_proc_ppid", lambda pid: None)
assert server._iter_ancestor_cmdlines(max_depth=8) == []
Loading