Skip to content

Commit 101c646

Browse files
rustyconoverclaude
andcommitted
feat(rpc): expose TransportKind and on_serve_start lifecycle hook
Workers (RPC implementations) had no way to learn which transport they were bound to. CallContext.transport_metadata only carried HTTP fields and was ambiguous for non-HTTP transports. Add a coarse TransportKind enum (PIPE / HTTP / UNIX) plus a duck-typed on_serve_start(self, kind) lifecycle hook fired once per process before the first request. HTTP fires lazily via a Falcon middleware so pre-fork servers (gunicorn, uwsgi) correctly run startup work in each child. Capabilities (currently {"shm"}) ride alongside the kind so zero-copy paths can be detected without bloating the enum. CallContext gains a kind field for per-call branching. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c4f698a commit 101c646

13 files changed

Lines changed: 713 additions & 5 deletions

File tree

CLAUDE.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@ For HTTP transport, the wire protocol maps to separate endpoints: `POST /vgi/{me
107107

108108
**Stream state**: Streaming methods return a `Stream[S]` where `S` is a `StreamState` subclass. The state's `process(input, out, ctx)` method is called once per iteration. Producer streams (default `input_schema=_EMPTY_SCHEMA`) ignore the input and call `out.finish()` to end. Exchange streams set `input_schema` to a real schema and process client data.
109109

110-
**CallContext injection**: Server method implementations can accept an optional `ctx: CallContext` parameter. `CallContext` provides `auth` (`AuthContext`), `client_log()` (client-directed logging), `emit_client_log` (raw `ClientLog` callback), `transport_metadata` (e.g. `remote_addr` from HTTP), and a `logger` property returning a `LoggerAdapter` with request context pre-bound. The parameter is injected by the framework — it does **not** appear in the Protocol definition.
110+
**CallContext injection**: Server method implementations can accept an optional `ctx: CallContext` parameter. `CallContext` provides `auth` (`AuthContext`), `client_log()` (client-directed logging), `emit_client_log` (raw `ClientLog` callback), `transport_metadata` (e.g. `remote_addr` from HTTP), `kind` (the active `TransportKind`), and a `logger` property returning a `LoggerAdapter` with request context pre-bound. The parameter is injected by the framework — it does **not** appear in the Protocol definition.
111+
112+
**Transport awareness**: Workers can read `RpcServer.transport_kind` (a `TransportKind` enum: `PIPE`, `HTTP`, `UNIX`) and `RpcServer.transport_capabilities` (currently `{"shm"}` when bound to a `ShmPipeTransport`). For one-shot startup work an implementation may define an `on_serve_start(self, kind: TransportKind) -> None` method (the `ServeStartHook` Protocol); the framework calls it once per process before the first dispatch. For pipe/unix the hook fires inside `RpcServer.serve(transport)`; for HTTP it fires lazily on the first request handled in the current process so pre-fork servers (gunicorn, uwsgi) correctly run it in each child. Hook exceptions propagate (and are logged via `logging.getLogger("vgi_rpc.rpc").exception` first); rebinding to a different kind re-fires the hook rather than raising. Per-call code can also branch on `ctx.kind`.
111113

112114
**Authentication**: `AuthContext` (frozen dataclass) carries `domain`, `authenticated`, `principal`, and `claims`. For HTTP transport, `make_wsgi_app(authenticate=...)` installs `_AuthMiddleware` that calls the callback on each request and populates `CallContext.auth`. Pipe transport gets anonymous auth by default. Methods can call `ctx.auth.require_authenticated()` to gate access.
113115

docs/api/auth.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,20 @@ Over pipe/subprocess transport, `ctx.auth` is always `AuthContext.anonymous()`.
6565

6666
`ctx.transport_metadata` provides transport-level information like `remote_addr` and `user_agent` (HTTP only). It's a read-only mapping populated by the transport layer.
6767

68+
`ctx.kind` exposes the active `TransportKind` (`PIPE`, `HTTP`, or `UNIX`) so methods can branch per-call:
69+
70+
```python
71+
from vgi_rpc import CallContext, TransportKind
72+
73+
74+
def fetch(self, key: str, ctx: CallContext) -> str:
75+
if ctx.kind is TransportKind.HTTP:
76+
return self._cached_lookup(key)
77+
return self._direct_lookup(key)
78+
```
79+
80+
For one-shot startup work driven by transport kind, see [`on_serve_start`](transports.md#transport-awareness) on the transports page.
81+
6882
## API Reference
6983

7084
### AuthContext

docs/api/transports.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,46 @@ server_transport = ShmPipeTransport(server_pipe, shm)
8888

8989
Falls back to normal pipe IPC for batches that exceed the segment size.
9090

91+
## Transport awareness
92+
93+
Workers can ask which transport they are bound to — useful for tailoring startup work, enabling transport-specific metrics, or branching per-call behaviour. Three things are exposed:
94+
95+
- `RpcServer.transport_kind`: a `TransportKind` enum (`PIPE`, `HTTP`, `UNIX`) or `None` before serving begins.
96+
- `RpcServer.transport_capabilities`: a `frozenset[str]` of capability flags. Currently `{"shm"}` when bound to a `ShmPipeTransport`; empty otherwise.
97+
- `CallContext.kind`: per-call view of the same `TransportKind`, so methods that already accept `ctx` can branch without reaching for the server.
98+
99+
For one-shot startup work, an implementation may define an `on_serve_start(self, kind)` method. The framework calls it once per process before the first request is dispatched:
100+
101+
```python
102+
from vgi_rpc import CallContext, TransportKind
103+
104+
105+
class MyServiceImpl:
106+
def on_serve_start(self, kind: TransportKind) -> None:
107+
"""Called once per process before the first request."""
108+
if kind is TransportKind.HTTP:
109+
self._cache = build_http_cache()
110+
else:
111+
self._cache = None
112+
113+
def fetch(self, key: str, ctx: CallContext) -> str:
114+
if ctx.kind is TransportKind.HTTP and self._cache is not None:
115+
return self._cache.get(key)
116+
return load_from_disk(key)
117+
```
118+
119+
The hook is duck-typed (no base class needed); a `ServeStartHook` Protocol is exported for users who want to type-hint their implementation. Hook exceptions propagate (and are logged via `logging.getLogger("vgi_rpc.rpc").exception` first), so a misconfigured worker dies loudly rather than serving in a broken state.
120+
121+
For pipe / unix transports the hook fires inside `RpcServer.serve(transport)`. For HTTP it fires lazily on the first request handled in the current process — this is fork-safe under pre-fork WSGI servers (gunicorn, uwsgi), so each child worker runs its own startup logic. Subprocess workers report `PIPE` because they speak Arrow IPC over the parent's stdin/stdout.
122+
123+
`SHM` availability is exposed via `transport_capabilities`, not the enum, so coarse transport-kind checks stay simple while workers that need zero-copy paths can still detect shared memory:
124+
125+
```python
126+
def on_serve_start(self, kind: TransportKind) -> None:
127+
if "shm" in self.server.transport_capabilities:
128+
self._enable_zero_copy()
129+
```
130+
91131
## API Reference
92132

93133
### PipeTransport
@@ -106,6 +146,14 @@ Falls back to normal pipe IPC for batches that exceed the segment size.
106146

107147
::: vgi_rpc.rpc.StderrMode
108148

149+
### TransportKind
150+
151+
::: vgi_rpc.rpc.TransportKind
152+
153+
### ServeStartHook
154+
155+
::: vgi_rpc.rpc.ServeStartHook
156+
109157
### Utility Functions
110158

111159
::: vgi_rpc.rpc.make_pipe_pair

tests/serve_fixture_kind_pipe.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# © Copyright 2025-2026, Query.Farm LLC - https://query.farm
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Subprocess worker for ``test_transport_kind`` end-to-end tests.
5+
6+
Serves a tiny ``_KindService`` over stdin/stdout pipes via ``run_server``
7+
so the parent test can confirm the worker's ``on_serve_start`` sees
8+
``TransportKind.PIPE`` even when launched as a child process.
9+
"""
10+
11+
from __future__ import annotations
12+
13+
from tests.test_transport_kind import _KindService, _RecordingImpl
14+
from vgi_rpc.rpc import run_server
15+
16+
17+
def main() -> None:
18+
"""Serve the kind-reporting fixture over stdin/stdout."""
19+
run_server(_KindService, _RecordingImpl())
20+
21+
22+
if __name__ == "__main__":
23+
main()

0 commit comments

Comments
 (0)