This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
vgi-rpc is a transport-agnostic RPC framework built on Apache Arrow IPC serialization. RPC interfaces are defined as Python Protocol classes; the framework derives Arrow schemas from type annotations and provides typed client proxies with automatic serialization/deserialization.
# Run all tests (includes mypy type checking and ruff linting via pytest plugins)
pytest
# Run a single test
pytest tests/test_rpc.py::test_name
# Lint and format
ruff check vgi_rpc/ tests/
ruff format vgi_rpc/ tests/
# Type checking
mypy vgi_rpc/
# Coverage (80% minimum, branch coverage enabled)
pytest --cov=vgi_rpcUses uv as the package manager. Install dev dependencies with uv sync --all-extras.
Tests should complete in 50 seconds or less ALWAYS!
Discourage the use of Any types, check mypy strict type coverage and always try to improve it.
Before pushing changes make sure, mypy, ruff and tests pass.
Pay attention to mypy strict type checking make sure strict typing is preserved.
Verify "ty" type checking too.
The full process before committing code is
- Run
uv run ruff format .on all files - Run
uv run ruff check .and resolve all errors - Run
uv run mypy vgi_rpc/and resolve all errors - Run
uv run ty check vgi_rpc/and resolve all errors - Run
uv run pytestfor all tests
Always reformat before pushing. CI runs lint before tests — unformatted code fails the pipeline immediately and wastes time.
-
rpc/— The RPC framework package. Defines the wire protocol, method types (UNARY, STREAM), and the core classes:RpcServer,RpcConnection,RpcTransport,PipeTransport,ShmPipeTransport,SubprocessTransport,StreamSession,AnnotatedBatch,OutputCollector. Also definesAuthContext(frozen dataclass for authentication state),CallContext(request-scoped context injected into methods viactxparameter),_TransportContext(contextvar bridge for HTTP auth),RpcError, andVersionError. Introspects Protocol classes viarpc_methods()to extractRpcMethodInfo(schemas, method type). Client gets a typed proxy fromRpcConnection; server dispatches viaRpcServer.serve(). The convenience functionrun_server()parsessys.argvfor--http,--host, and--portflags — without--httpit serves over stdin/stdout pipes (forSubprocessTransport); with--httpit lazily importsvgi_rpc.http.serve_httpand serves over HTTP. The_debug.pysubmodule provides wire protocol debug logging infrastructure: 6 logger instances under thevgi_rpc.wire.*hierarchy (request,response,batch,stream,transport,http) and 4 formatting helpers (fmt_schema,fmt_metadata,fmt_batch,fmt_kwargs). All log points useisEnabledForguards for zero overhead when disabled. -
utils.py— Arrow serialization layer.ArrowSerializableDataclassmixin auto-generatesARROW_SCHEMAfrom dataclass field annotations and providesserialize()/deserialize_from_batch(). Handles type inference from Python types to Arrow types (including generics, Enum, Optional, nested dataclasses). Also provides low-level IPC stream read/write helpers,IpcValidationenum, andValidatedReader. -
log.py— Structured log messages (MessagewithLevelenum). Messages are serialized out-of-band as zero-row batches with metadata keysvgi_rpc.log_level,vgi_rpc.log_message,vgi_rpc.log_extra. Server methods access logging via theCallContext(see below). -
logging_utils.py—VgiJsonFormatter, alogging.Formattersubclass that serializes log records as single-line JSON. Not auto-imported; must be imported explicitly fromvgi_rpc.logging_utils. -
metadata.py— Shared helpers forpa.KeyValueMetadata. Centralises well-known metadata key constants (vgi_rpc.method,vgi_rpc.stream_state#b64,vgi_rpc.log_level,vgi_rpc.log_message,vgi_rpc.log_extra,vgi_rpc.server_id,vgi_rpc.request_version,vgi_rpc.location,vgi_rpc.shm_offset, etc.) and provides encoding, merging, and key-stripping utilities used byrpc/,http/,log.py,external.py,shm.py, andintrospect.py. -
introspect.py— Introspection support. Provides the built-in__describe__RPC method,MethodDescription,ServiceDescription,build_describe_batch,parse_describe_batch, andintrospect(). Enabled onRpcServerviaenable_describe=True. -
shm.py— Shared memory transport support. ProvidesShmAllocator,ShmSegment, and pointer batch helpers for zero-copy Arrow IPC batch transfer between co-located processes. Used byShmPipeTransport. -
pool.py— Subprocess process pool with shared memory support.WorkerPoolkeeps idle worker subprocesses alive betweenconnect()calls, avoiding repeated spawn/teardown overhead. Workers are keyed by command tuple, cached up tomax_idleglobally (LIFO for cache warmth), and evicted by a daemon reaper thread afteridle_timeout. Whenshm_sizeis set on the pool, eachconnect()borrow gets its own isolatedShmSegmentthat is automatically created and destroyed per borrow. Tracks_stream_opened/_last_stream_sessionto detect abandoned streams and discard workers with stale transport state. ProvidesPoolMetricsfor observability. Logger:vgi_rpc.pool. -
external.py— ExternalLocation batch support for large data. When batches exceed a configurable size threshold, they are uploaded to pluggableExternalStorage(e.g. S3) and replaced with zero-row pointer batches containing avgi_rpc.locationURL metadata key. Readers resolve pointers transparently viaexternal_fetch.fetch_url()(aiohttp-based parallel fetching); writers externalize batches above the threshold. ProvidesExternalLocationConfig,ExternalStorageprotocol,UploadUrl,UploadUrlProvider,Compressionenum,https_only_validator, and production/resolution functions. Supports optional zstd compression. -
external_fetch.py— Parallel range-request URL fetching. Issues a HEAD probe to learnContent-LengthandAccept-Ranges, then either fetches in parallel chunks with speculative hedging for stragglers, or falls back to a single GET. Maintains a persistentaiohttp.ClientSessionperFetchConfigon a daemon thread. Handles zstd decompression and stale-connection recovery. ProvidesFetchConfigandfetch_url(). -
cli.py(optional —pip install vgi-rpc[cli]) — Command-line interface. Atyper-based CLI registered as thevgi-rpcentry point. Providesdescribe,call, andloggerscommands for introspecting and invoking methods on any vgi-rpc service. Supports output formatsauto,json,table, andarrow(raw Arrow IPC via--format arrow), with--output/-ofor file output. Stream headers are surfaced in all formats. -
s3.py(optional —pip install vgi-rpc[s3]) — S3 storage backend implementingExternalStorage. Uses boto3 to upload IPC data and generate pre-signed URLs. Supports custom endpoints for MinIO/LocalStack. -
gcs.py(optional —pip install vgi-rpc[gcs]) — Google Cloud Storage backend implementingExternalStorage. Uses google-cloud-storage to upload IPC data and generate V4 signed URLs. Relies on Application Default Credentials. -
http/(optional —pip install vgi-rpc[http]) — HTTP transport package using Falcon (server) and httpx (client). Exposesmake_wsgi_app()to serve anRpcServeras a Falcon WSGI app,serve_http()as a convenience wrapper that combinesmake_wsgi_app+ automatic free-port selection +waitress.serve(printsPORT:<port>to stdout for machine-readable discovery), andhttp_connect()for the client side. Streaming is stateless: each exchange carries serializedStreamStatein a signed token in Arrow custom metadata. Supports pluggable authentication via anauthenticatecallback and_AuthMiddleware. Includes_testing.pywithmake_sync_client()for in-process testing without a real HTTP server.
Multiple IPC streams are written sequentially on the same pipe. Every request batch carries vgi_rpc.request_version in custom metadata; the server validates this before dispatch and rejects mismatches with VersionError. Each method call writes one request stream and reads one response stream:
- Unary: Client sends params batch → Server replies with log batches + result/error batch
- Stream: Initial params exchange, then lockstep: client sends input batch (tick for producer, real data for exchange), server replies with log batches + output batch, repeating until EOS
For HTTP transport, the wire protocol maps to separate endpoints: POST /vgi/{method} (unary), POST /vgi/{method}/init (stream init), POST /vgi/{method}/exchange (stream exchange).
Defining an RPC service: Write a Protocol class where return types determine method type — plain types for unary, Stream[S] for streaming (both producer and exchange patterns).
Stream state: Streaming methods return a Stream[S] where S is a StreamState subclass. The state's process(input, out, ctx) method is called once per iteration. Producer streams (default input_schema=_EMPTY_SCHEMA) ignore the input and call out.finish() to end. Exchange streams set input_schema to a real schema and process client data.
CallContext injection: Server method implementations can accept an optional ctx: CallContext parameter. CallContext provides auth (AuthContext), client_log() (client-directed logging), emit_client_log (raw ClientLog callback), transport_metadata (e.g. remote_addr from HTTP), and a logger property returning a LoggerAdapter with request context pre-bound. The parameter is injected by the framework — it does not appear in the Protocol definition.
Authentication: AuthContext (frozen dataclass) carries domain, authenticated, principal, and claims. For HTTP transport, make_wsgi_app(authenticate=...) installs _AuthMiddleware that calls the callback on each request and populates CallContext.auth. Pipe transport gets anonymous auth by default. Methods can call ctx.auth.require_authenticated() to gate access.
Server identity: Each RpcServer gets a server_id (auto-generated 12-char hex or caller-supplied). This ID is attached to all log and error batches as vgi_rpc.server_id metadata for distributed tracing. RpcServer also accepts enable_describe=True to register the synthetic __describe__ introspection method.
Error propagation: Server exceptions become zero-row batches with error metadata; clients receive RpcError with error_type, error_message, and remote_traceback. The transport stays clean for subsequent requests.
- Line length 120, double quotes, target Python 3.12+
- Strict mypy (
python_version = "3.12",strict = true) - Ruff rules: E, F, I, UP, B, SIM, D, RUF, PERF (includes docstring enforcement)
- Google-style docstrings with Args/Returns/Raises sections