Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions packages/grpc-web/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# weaviate-python-grpc-web

A grpc-web / WebAssembly (Pyodide) transport for the
[Weaviate Python client](https://github.com/weaviate/weaviate-python-client), so the
client's **async** gRPC data path can run inside a browser (marimo notebooks, Pyodide,
WASM workers) where there is no socket and no `grpcio` wheel.

It is built from the same repository as `weaviate-client` and reuses its generated
protobuf stubs — it does **not** fork code generation.

## How it works

Under Pyodide there is no `grpcio` Emscripten wheel, and `import weaviate` hard-imports
`grpc` at module load. This package installs a small pure-Python `grpc` shim into
`sys.modules` **before** `import weaviate`, which:

- satisfies every import-time `import grpc` / `from grpc(.aio) import ...` in the base
client and its generated `*_pb2_grpc` stubs;
- provides `grpc.aio.Channel` as a real base class, so the grpc-web channel
(`GrpcWebChannel`) subclasses it and the client's `isinstance(..., grpc.aio.Channel)`
assertions pass;
- satisfies the generated v6300 stub's version gate
(`grpc.__version__` / `grpc._utilities.first_version_is_lower`).

The `GrpcWebChannel` frames unary RPCs as grpc-web (a 5-byte header + protobuf payload)
and POSTs them via `pyodide.http.pyfetch` to a server fronted by a grpc-web transcoder
(e.g. Envoy or [connectrpc/vanguard](https://github.com/connectrpc/vanguard-go)). Call
metadata (API key / OIDC bearer) is folded into `fetch` headers.

## Usage

```python
import weaviate_grpc_web # installs the grpc shim under Emscripten (no-op elsewhere)
import weaviate

client = weaviate.use_async_with_local(skip_init_checks=True)
await client.connect()
collection = client.collections.get("Article")
await collection.query.near_text("hello", limit=3)
```

## Supported / unsupported

| RPC | Kind | Status |
|----------------------------------------------------------|-----------------|--------|
| Search, Aggregate, TenantsGet, BatchObjects, BatchDelete | unary | ✅ works over grpc-web |
| Health check (`/grpc.health.v1.Health/Check`) | unary | ✅ (recommend `skip_init_checks=True` + REST `/.well-known/ready`) |
| References (`/batch/references`) | REST | ✅ via httpx-in-Pyodide |
| `batch.stream()` / `batch.experimental()` (BatchStream) | bidi streaming | ❌ not possible over grpc-web/fetch — use `insert_many()` / `batch.dynamic()` / `fixed_size()` / `rate_limit()` |
| Synchronous client | — | ❌ async-only under WASM |

## Testing on CPython

`weaviate_grpc_web.install(force=True)` installs the shim on a normal CPython
interpreter (run it in a fresh process, before importing `weaviate`). Inject a sender
with `weaviate_grpc_web.set_sender(...)` (e.g. `make_httpx_sender()`) to exercise the
transport against an Envoy/vanguard transcoder without a browser.
30 changes: 30 additions & 0 deletions packages/grpc-web/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[build-system]
requires = ["setuptools>=65", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "weaviate-python-grpc-web"
description = "grpc-web / WASM (Pyodide) transport for the Weaviate Python client"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "BSD-3-Clause" }
authors = [{ name = "Weaviate", email = "hello@weaviate.io" }]
keywords = ["weaviate", "grpc-web", "pyodide", "wasm", "emscripten"]
# Version is kept in lockstep with weaviate-client. TODO(lockstep): derive from the same
# git tag via setuptools_scm and assert the built versions match in CI before publishing.
version = "0.0.1.dev0"
# Deliberately depends on weaviate-client WITHOUT grpcio (grpcio is excluded under
# Emscripten by the `sys_platform != "emscripten"` marker in the base package's deps).
dependencies = [
"weaviate-client",
]

[project.urls]
Source = "https://github.com/weaviate/weaviate-python-client"
Tracker = "https://github.com/weaviate/weaviate-python-client/issues"

[tool.setuptools.packages.find]
where = ["src"]

[tool.setuptools.package-data]
weaviate_grpc_web = ["py.typed"]
51 changes: 51 additions & 0 deletions packages/grpc-web/src/weaviate_grpc_web/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""grpc-web / WASM transport for the Weaviate Python client.

Under Pyodide/Emscripten there is no ``grpcio`` wheel. Importing this package installs a
pure-Python ``grpc`` shim into ``sys.modules`` (and forces the pure-Python protobuf
runtime) so that the subsequent ``import weaviate`` succeeds and its async gRPC data path
runs over grpc-web (``fetch``) instead of HTTP/2 sockets.

Usage under Pyodide::

import weaviate_grpc_web # installs the grpc shim (no-op off Emscripten)
import weaviate

client = weaviate.use_async_with_local(skip_init_checks=True)
await client.connect()

The shim is installed automatically only under Emscripten, so importing this package on a
normal CPython install never clobbers a real, working ``grpcio``. Async clients only —
the synchronous client is not supported in the browser.
"""

import os
import sys

from ._shim import StatusCode, install, is_installed

__all__ = [
"install",
"is_installed",
"set_sender",
"make_httpx_sender",
"GrpcWebChannel",
"StatusCode",
]


def _bootstrap() -> None:
if sys.platform == "emscripten":
# The pure-Python protobuf runtime always works; the upb C-extension may not be
# present. Set before ``import weaviate`` (which imports protobuf) so it takes
# effect. ``setdefault`` lets a user override it explicitly.
os.environ.setdefault("PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION", "python")
install()


_bootstrap()

# Imported after the bootstrap. These modules pull their grpc base classes directly from
# ``._shim`` (not via ``sys.modules['grpc']``), so importing them is safe regardless of
# whether the shim was installed.
from ._channel import GrpcWebChannel, set_sender # noqa: E402
from ._sender import make_httpx_sender # noqa: E402
233 changes: 233 additions & 0 deletions packages/grpc-web/src/weaviate_grpc_web/_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""The grpc-web channel and multicallables.

:class:`GrpcWebChannel` implements the small slice of the ``grpc.aio`` channel interface
that ``weaviate``'s generated stub and ``ConnectionV4`` actually use — ``unary_unary``,
``stream_stream`` and ``close`` — by framing requests as grpc-web and POSTing them via a
pluggable async sender. It subclasses the shim's ``grpc.aio.Channel`` (:class:`AioChannel`)
so the ``isinstance(..., grpc.aio.Channel)`` assertions in ``connect/v4.py`` hold.

Only unary RPCs are supported (Search, Aggregate, TenantsGet, BatchObjects,
BatchReferences, BatchDelete, and the unary health check). ``stream_stream`` (the bidi
``BatchStream`` used by opt-in server-side batching) cannot work over grpc-web/fetch and
raises a clear error.
"""

import base64
import urllib.parse
from typing import Any, Callable, Dict, Optional

from ._framing import encode_message, split_response
from ._sender import Sender, pyfetch_sender
from ._shim import AioChannel, AioRpcError, StatusCode, status_from_int

# Module-level default sender; overridable for tests / non-browser runtimes.
_default_sender: Sender = pyfetch_sender


def set_sender(sender: Sender) -> None:
"""Override the default async sender used by new channels (tests/integration)."""
global _default_sender
_default_sender = sender


def get_sender() -> Sender:
return _default_sender


def _encode_timeout(seconds: float) -> str:
"""Encode a timeout as a grpc-timeout header value (``<positive int><unit>``)."""
millis = max(1, int(seconds * 1000))
if millis < 100_000_000:
return f"{millis}m"
return f"{max(1, int(seconds))}S"
Comment thread
g-despot marked this conversation as resolved.
Outdated


def _fold_metadata(headers: Dict[str, str], metadata: Any) -> None:
"""Fold gRPC call metadata (``[(key, value), ...]``) into fetch headers.

Binary ``-bin`` keys are base64-encoded as grpc-web requires.
"""
if not metadata:
return
for key, value in metadata:
name = key.lower()
if name.endswith("-bin"):
raw = value if isinstance(value, (bytes, bytearray)) else str(value).encode()
headers[name] = base64.b64encode(raw).decode("ascii")
else:
headers[name] = value if isinstance(value, str) else str(value)


def _header_lookup(headers: Dict[str, str], name: str) -> Optional[str]:
target = name.lower()
for key, value in headers.items():
if key.lower() == target:
return value
return None


class _UnaryUnaryMultiCallable:
"""Awaitable multicallable bound by ``WeaviateStub.__init__``.

Called as ``await mc(request, metadata=..., timeout=...)`` (and, for the health
check, as ``mc(request, timeout=...)`` with no metadata).
"""

def __init__(
self,
channel: "GrpcWebChannel",
path: str,
request_serializer: Callable[[Any], bytes],
response_deserializer: Callable[[bytes], Any],
) -> None:
self._channel = channel
self._path = path
self._serialize = request_serializer
self._deserialize = response_deserializer

async def __call__(
self,
request: Any,
*,
metadata: Any = None,
timeout: Optional[float] = None,
credentials: Any = None,
wait_for_ready: Any = None,
compression: Any = None,
) -> Any:
payload = self._serialize(request)
return await self._channel._unary(self._path, payload, self._deserialize, metadata, timeout)


class _UnsupportedStreamMultiCallable:
"""Placeholder for ``stream_stream`` (bidirectional streaming).

Calling it raises immediately, before the ``async for`` in ``connect/v4.py:1243``
begins iterating.
"""

def __init__(self, path: str) -> None:
self._path = path

def __call__(self, *args: Any, **kwargs: Any) -> Any:
raise RuntimeError(
f"Bidirectional streaming RPC {self._path!r} (server-side batching / "
"BatchStream) is not supported over grpc-web/fetch. Use insert_many(), or "
"batch.dynamic() / fixed_size() / rate_limit(), instead of batch.stream()."
)


class GrpcWebChannel(AioChannel):
"""grpc-web/fetch implementation of the async grpc channel slice the client uses."""

def __init__(
self,
target: Optional[str],
secure: bool,
options: Any = None,
sender: Optional[Sender] = None,
) -> None:
if not target:
raise ValueError("GrpcWebChannel requires a target (host:port)")
scheme = "https" if secure else "http"
self._base_url = f"{scheme}://{target}"
self._sender: Sender = sender or get_sender()

def unary_unary(
self,
method: str,
request_serializer: Callable[[Any], bytes],
response_deserializer: Callable[[bytes], Any],
_registered_method: bool = False,
) -> _UnaryUnaryMultiCallable:
return _UnaryUnaryMultiCallable(self, method, request_serializer, response_deserializer)

def stream_stream(
self,
method: str,
request_serializer: Callable[[Any], bytes],
response_deserializer: Callable[[bytes], Any],
_registered_method: bool = False,
) -> _UnsupportedStreamMultiCallable:
return _UnsupportedStreamMultiCallable(method)

async def close(self, grace: Optional[float] = None) -> None:
# Nothing to tear down: each call is an independent fetch.
return None

async def _unary(
self,
path: str,
payload: bytes,
deserialize: Callable[[bytes], Any],
metadata: Any,
timeout: Optional[float],
) -> Any:
headers: Dict[str, str] = {
"content-type": "application/grpc-web+proto",
"accept": "application/grpc-web+proto",
"x-grpc-web": "1",
"x-user-agent": "weaviate-python-grpc-web",
}
_fold_metadata(headers, metadata)
if timeout is not None:
headers["grpc-timeout"] = _encode_timeout(timeout)

url = self._base_url + path
status, resp_headers, body = await self._sender(
url, headers, encode_message(payload), timeout
)
return self._handle_response(status, resp_headers, body, deserialize)

@staticmethod
def _handle_response(
http_status: int,
resp_headers: Dict[str, str],
body: bytes,
deserialize: Callable[[bytes], Any],
) -> Any:
messages, trailers = split_response(body) if body else ([], {})

Comment thread
g-despot marked this conversation as resolved.
raw_status = trailers.get("grpc-status")
if raw_status is None:
raw_status = _header_lookup(resp_headers, "grpc-status")
raw_message = (
trailers.get("grpc-message") or _header_lookup(resp_headers, "grpc-message") or ""
)
message = urllib.parse.unquote(raw_message)

if raw_status is None:
if http_status != 200:
raise AioRpcError(
code=_status_from_http(http_status),
details=f"HTTP {http_status} from grpc-web endpoint",
)
code = StatusCode.OK
else:
code = status_from_int(int(raw_status))

Comment thread
g-despot marked this conversation as resolved.
if code is not StatusCode.OK:
raise AioRpcError(code=code, details=message)
if not messages:
raise AioRpcError(
code=StatusCode.INTERNAL,
details="grpc-web response contained no message frame",
)
return deserialize(messages[0])


def _status_from_http(http_status: int) -> StatusCode:
"""Map an HTTP status to a gRPC status when no grpc-status is present.

Mirrors the grpc-web spec's HTTP-to-gRPC code mapping.
"""
return {
400: StatusCode.INTERNAL,
401: StatusCode.UNAUTHENTICATED,
403: StatusCode.PERMISSION_DENIED,
404: StatusCode.UNIMPLEMENTED,
429: StatusCode.UNAVAILABLE,
502: StatusCode.UNAVAILABLE,
503: StatusCode.UNAVAILABLE,
504: StatusCode.UNAVAILABLE,
}.get(http_status, StatusCode.UNKNOWN)
Loading
Loading