Skip to content

v0.15.0

Latest

Choose a tag to compare

@rustyconover rustyconover released this 01 May 01:20
· 11 commits to main since this release

Workers can now learn which transport they are bound to

Workers (RPC service implementations) had no way to know which transport (pipe / http / unix) they were being served over. CallContext.transport_metadata only carried HTTP fields and was ambiguous for non-HTTP transports — an empty dict could mean "pipe" or "HTTP request without those headers." There was no startup hook either.

TransportKind and the on_serve_start hook

A new TransportKind enum (PIPE, HTTP, UNIX) is exposed via three knobs:

  • RpcServer.transport_kind — coarse identifier of the bound transport, populated once serving begins.
  • RpcServer.transport_capabilitiesfrozenset[str] of capability flags, currently {"shm"} when bound to a ShmPipeTransport.
  • CallContext.kind — per-call view of the same TransportKind for methods that already accept ctx.

Implementations may opt into a ServeStartHook lifecycle method:

from vgi_rpc import CallContext, TransportKind


class MyServiceImpl:
    def on_serve_start(self, kind: TransportKind) -> None:
        if kind is TransportKind.HTTP:
            self._cache = build_http_cache()

    def fetch(self, key: str, ctx: CallContext) -> str:
        if ctx.kind is TransportKind.HTTP and self._cache is not None:
            return self._cache.get(key)
        return load_from_disk(key)

The hook is duck-typed (no base class needed); a ServeStartHook Protocol is exported for type-hinting.

Fork-safe HTTP firing

For pipe / unix transports the hook fires inside RpcServer.serve(transport). For HTTP it fires lazily on the first request handled in the current process, via a tiny one-shot Falcon middleware. Pre-fork WSGI servers (gunicorn, uwsgi) therefore run startup work in each child worker, not the master — per-process resources (DB pools, threads, file handles) are no longer fork-unsafely inherited. Subprocess workers report PIPE because they speak Arrow IPC over the parent's stdin/stdout.

Failure semantics

Hook exceptions propagate (and are logged via logging.getLogger("vgi_rpc.rpc").exception first) — a misconfigured worker dies loudly rather than serving in a broken state. Rebinding the same RpcServer to a different transport re-fires the hook with the new kind rather than raising, so test fixtures that exercise multiple transports against one server are supported.

SHM as a capability, not an enum value

Shared-memory availability is exposed via transport_capabilities, not the enum, so coarse transport-kind checks stay simple while workers that need zero-copy paths can still detect SHM:

def on_serve_start(self, kind: TransportKind) -> None:
    if "shm" in self._server.transport_capabilities:
        self._enable_zero_copy()