-
Notifications
You must be signed in to change notification settings - Fork 128
feat(grpc-web): Pyodide/WASM grpc-web transport for the async client #2056
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
g-despot
wants to merge
10
commits into
main
Choose a base branch
from
feat/grpc-web-wasm-transport
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 1 commit
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
8aa867a
feat(grpc-web): add Pyodide/WASM grpc-web transport
g-despot 99f5356
feat(grpc-web): support grpc-web on the REST host:port via a base-pat…
g-despot fe208c0
fix(grpc-web): surface transport/parse failures as AioRpcError, enfor…
g-despot 833bb59
test(grpc-web): skip get_version unit tests when grpcio/protobuf are …
g-despot 0f2f6da
fix(grpc-web): guard grpc-web mode (shim+async required), tighten tim…
g-despot 3669363
feat(grpc-web): route httpx REST calls over fetch under Emscripten
g-despot b8ac294
fix: make the async client safe under WASM/Pyodide
g-despot 0d685a4
fix(grpc-web): harden the fetch transport and error reporting
g-despot c8f321a
chore: gitignore all egg-info dirs
g-despot 4c5c940
Merge branch 'main' into feat/grpc-web-wasm-transport
g-despot File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| # weaviate-python-grpc-web | ||
|
|
||
| A grpc-web / WebAssembly (Pyodide) transport for the | ||
| [Weaviate Python client](https://github.com/weaviate/weaviate-python-client), so the | ||
| client's **async** gRPC data path can run inside a browser (marimo notebooks, Pyodide, | ||
| WASM workers) where there is no socket and no `grpcio` wheel. | ||
|
|
||
| It is built from the same repository as `weaviate-client` and reuses its generated | ||
| protobuf stubs — it does **not** fork code generation. | ||
|
|
||
| ## How it works | ||
|
|
||
| Under Pyodide there is no `grpcio` Emscripten wheel, and `import weaviate` hard-imports | ||
| `grpc` at module load. This package installs a small pure-Python `grpc` shim into | ||
| `sys.modules` **before** `import weaviate`, which: | ||
|
|
||
| - satisfies every import-time `import grpc` / `from grpc(.aio) import ...` in the base | ||
| client and its generated `*_pb2_grpc` stubs; | ||
| - provides `grpc.aio.Channel` as a real base class, so the grpc-web channel | ||
| (`GrpcWebChannel`) subclasses it and the client's `isinstance(..., grpc.aio.Channel)` | ||
| assertions pass; | ||
| - satisfies the generated v6300 stub's version gate | ||
| (`grpc.__version__` / `grpc._utilities.first_version_is_lower`). | ||
|
|
||
| The `GrpcWebChannel` frames unary RPCs as grpc-web (a 5-byte header + protobuf payload) | ||
| and POSTs them via `pyodide.http.pyfetch` to a server fronted by a grpc-web transcoder | ||
| (e.g. Envoy or [connectrpc/vanguard](https://github.com/connectrpc/vanguard-go)). Call | ||
| metadata (API key / OIDC bearer) is folded into `fetch` headers. | ||
|
|
||
| ## Usage | ||
|
|
||
| ```python | ||
| import weaviate_grpc_web # installs the grpc shim under Emscripten (no-op elsewhere) | ||
| import weaviate | ||
|
|
||
| client = weaviate.use_async_with_local(skip_init_checks=True) | ||
| await client.connect() | ||
| collection = client.collections.get("Article") | ||
| await collection.query.near_text("hello", limit=3) | ||
| ``` | ||
|
|
||
| ## Supported / unsupported | ||
|
|
||
| | RPC | Kind | Status | | ||
| |----------------------------------------------------------|-----------------|--------| | ||
| | Search, Aggregate, TenantsGet, BatchObjects, BatchDelete | unary | ✅ works over grpc-web | | ||
| | Health check (`/grpc.health.v1.Health/Check`) | unary | ✅ (recommend `skip_init_checks=True` + REST `/.well-known/ready`) | | ||
| | References (`/batch/references`) | REST | ✅ via httpx-in-Pyodide | | ||
| | `batch.stream()` / `batch.experimental()` (BatchStream) | bidi streaming | ❌ not possible over grpc-web/fetch — use `insert_many()` / `batch.dynamic()` / `fixed_size()` / `rate_limit()` | | ||
| | Synchronous client | — | ❌ async-only under WASM | | ||
|
|
||
| ## Testing on CPython | ||
|
|
||
| `weaviate_grpc_web.install(force=True)` installs the shim on a normal CPython | ||
| interpreter (run it in a fresh process, before importing `weaviate`). Inject a sender | ||
| with `weaviate_grpc_web.set_sender(...)` (e.g. `make_httpx_sender()`) to exercise the | ||
| transport against an Envoy/vanguard transcoder without a browser. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| [build-system] | ||
| requires = ["setuptools>=65", "wheel"] | ||
| build-backend = "setuptools.build_meta" | ||
|
|
||
| [project] | ||
| name = "weaviate-python-grpc-web" | ||
| description = "grpc-web / WASM (Pyodide) transport for the Weaviate Python client" | ||
| readme = "README.md" | ||
| requires-python = ">=3.10" | ||
| license = { text = "BSD-3-Clause" } | ||
| authors = [{ name = "Weaviate", email = "hello@weaviate.io" }] | ||
| keywords = ["weaviate", "grpc-web", "pyodide", "wasm", "emscripten"] | ||
| # Version is kept in lockstep with weaviate-client. TODO(lockstep): derive from the same | ||
| # git tag via setuptools_scm and assert the built versions match in CI before publishing. | ||
| version = "0.0.1.dev0" | ||
| # Deliberately depends on weaviate-client WITHOUT grpcio (grpcio is excluded under | ||
| # Emscripten by the `sys_platform != "emscripten"` marker in the base package's deps). | ||
| dependencies = [ | ||
| "weaviate-client", | ||
| ] | ||
|
|
||
| [project.urls] | ||
| Source = "https://github.com/weaviate/weaviate-python-client" | ||
| Tracker = "https://github.com/weaviate/weaviate-python-client/issues" | ||
|
|
||
| [tool.setuptools.packages.find] | ||
| where = ["src"] | ||
|
|
||
| [tool.setuptools.package-data] | ||
| weaviate_grpc_web = ["py.typed"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| """grpc-web / WASM transport for the Weaviate Python client. | ||
|
|
||
| Under Pyodide/Emscripten there is no ``grpcio`` wheel. Importing this package installs a | ||
| pure-Python ``grpc`` shim into ``sys.modules`` (and forces the pure-Python protobuf | ||
| runtime) so that the subsequent ``import weaviate`` succeeds and its async gRPC data path | ||
| runs over grpc-web (``fetch``) instead of HTTP/2 sockets. | ||
|
|
||
| Usage under Pyodide:: | ||
|
|
||
| import weaviate_grpc_web # installs the grpc shim (no-op off Emscripten) | ||
| import weaviate | ||
|
|
||
| client = weaviate.use_async_with_local(skip_init_checks=True) | ||
| await client.connect() | ||
|
|
||
| The shim is installed automatically only under Emscripten, so importing this package on a | ||
| normal CPython install never clobbers a real, working ``grpcio``. Async clients only — | ||
| the synchronous client is not supported in the browser. | ||
| """ | ||
|
|
||
| import os | ||
| import sys | ||
|
|
||
| from ._shim import StatusCode, install, is_installed | ||
|
|
||
| __all__ = [ | ||
| "install", | ||
| "is_installed", | ||
| "set_sender", | ||
| "make_httpx_sender", | ||
| "GrpcWebChannel", | ||
| "StatusCode", | ||
| ] | ||
|
|
||
|
|
||
| def _bootstrap() -> None: | ||
| if sys.platform == "emscripten": | ||
| # The pure-Python protobuf runtime always works; the upb C-extension may not be | ||
| # present. Set before ``import weaviate`` (which imports protobuf) so it takes | ||
| # effect. ``setdefault`` lets a user override it explicitly. | ||
| os.environ.setdefault("PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION", "python") | ||
| install() | ||
|
|
||
|
|
||
| _bootstrap() | ||
|
|
||
| # Imported after the bootstrap. These modules pull their grpc base classes directly from | ||
| # ``._shim`` (not via ``sys.modules['grpc']``), so importing them is safe regardless of | ||
| # whether the shim was installed. | ||
| from ._channel import GrpcWebChannel, set_sender # noqa: E402 | ||
| from ._sender import make_httpx_sender # noqa: E402 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,233 @@ | ||
| """The grpc-web channel and multicallables. | ||
|
|
||
| :class:`GrpcWebChannel` implements the small slice of the ``grpc.aio`` channel interface | ||
| that ``weaviate``'s generated stub and ``ConnectionV4`` actually use — ``unary_unary``, | ||
| ``stream_stream`` and ``close`` — by framing requests as grpc-web and POSTing them via a | ||
| pluggable async sender. It subclasses the shim's ``grpc.aio.Channel`` (:class:`AioChannel`) | ||
| so the ``isinstance(..., grpc.aio.Channel)`` assertions in ``connect/v4.py`` hold. | ||
|
|
||
| Only unary RPCs are supported (Search, Aggregate, TenantsGet, BatchObjects, | ||
| BatchReferences, BatchDelete, and the unary health check). ``stream_stream`` (the bidi | ||
| ``BatchStream`` used by opt-in server-side batching) cannot work over grpc-web/fetch and | ||
| raises a clear error. | ||
| """ | ||
|
|
||
| import base64 | ||
| import urllib.parse | ||
| from typing import Any, Callable, Dict, Optional | ||
|
|
||
| from ._framing import encode_message, split_response | ||
| from ._sender import Sender, pyfetch_sender | ||
| from ._shim import AioChannel, AioRpcError, StatusCode, status_from_int | ||
|
|
||
| # Module-level default sender; overridable for tests / non-browser runtimes. | ||
| _default_sender: Sender = pyfetch_sender | ||
|
|
||
|
|
||
| def set_sender(sender: Sender) -> None: | ||
| """Override the default async sender used by new channels (tests/integration).""" | ||
| global _default_sender | ||
| _default_sender = sender | ||
|
|
||
|
|
||
| def get_sender() -> Sender: | ||
| return _default_sender | ||
|
|
||
|
|
||
| def _encode_timeout(seconds: float) -> str: | ||
| """Encode a timeout as a grpc-timeout header value (``<positive int><unit>``).""" | ||
| millis = max(1, int(seconds * 1000)) | ||
| if millis < 100_000_000: | ||
| return f"{millis}m" | ||
| return f"{max(1, int(seconds))}S" | ||
|
|
||
|
|
||
| def _fold_metadata(headers: Dict[str, str], metadata: Any) -> None: | ||
| """Fold gRPC call metadata (``[(key, value), ...]``) into fetch headers. | ||
|
|
||
| Binary ``-bin`` keys are base64-encoded as grpc-web requires. | ||
| """ | ||
| if not metadata: | ||
| return | ||
| for key, value in metadata: | ||
| name = key.lower() | ||
| if name.endswith("-bin"): | ||
| raw = value if isinstance(value, (bytes, bytearray)) else str(value).encode() | ||
| headers[name] = base64.b64encode(raw).decode("ascii") | ||
| else: | ||
| headers[name] = value if isinstance(value, str) else str(value) | ||
|
|
||
|
|
||
| def _header_lookup(headers: Dict[str, str], name: str) -> Optional[str]: | ||
| target = name.lower() | ||
| for key, value in headers.items(): | ||
| if key.lower() == target: | ||
| return value | ||
| return None | ||
|
|
||
|
|
||
| class _UnaryUnaryMultiCallable: | ||
| """Awaitable multicallable bound by ``WeaviateStub.__init__``. | ||
|
|
||
| Called as ``await mc(request, metadata=..., timeout=...)`` (and, for the health | ||
| check, as ``mc(request, timeout=...)`` with no metadata). | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| channel: "GrpcWebChannel", | ||
| path: str, | ||
| request_serializer: Callable[[Any], bytes], | ||
| response_deserializer: Callable[[bytes], Any], | ||
| ) -> None: | ||
| self._channel = channel | ||
| self._path = path | ||
| self._serialize = request_serializer | ||
| self._deserialize = response_deserializer | ||
|
|
||
| async def __call__( | ||
| self, | ||
| request: Any, | ||
| *, | ||
| metadata: Any = None, | ||
| timeout: Optional[float] = None, | ||
| credentials: Any = None, | ||
| wait_for_ready: Any = None, | ||
| compression: Any = None, | ||
| ) -> Any: | ||
| payload = self._serialize(request) | ||
| return await self._channel._unary(self._path, payload, self._deserialize, metadata, timeout) | ||
|
|
||
|
|
||
| class _UnsupportedStreamMultiCallable: | ||
| """Placeholder for ``stream_stream`` (bidirectional streaming). | ||
|
|
||
| Calling it raises immediately, before the ``async for`` in ``connect/v4.py:1243`` | ||
| begins iterating. | ||
| """ | ||
|
|
||
| def __init__(self, path: str) -> None: | ||
| self._path = path | ||
|
|
||
| def __call__(self, *args: Any, **kwargs: Any) -> Any: | ||
| raise RuntimeError( | ||
| f"Bidirectional streaming RPC {self._path!r} (server-side batching / " | ||
| "BatchStream) is not supported over grpc-web/fetch. Use insert_many(), or " | ||
| "batch.dynamic() / fixed_size() / rate_limit(), instead of batch.stream()." | ||
| ) | ||
|
|
||
|
|
||
| class GrpcWebChannel(AioChannel): | ||
| """grpc-web/fetch implementation of the async grpc channel slice the client uses.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| target: Optional[str], | ||
| secure: bool, | ||
| options: Any = None, | ||
| sender: Optional[Sender] = None, | ||
| ) -> None: | ||
| if not target: | ||
| raise ValueError("GrpcWebChannel requires a target (host:port)") | ||
| scheme = "https" if secure else "http" | ||
| self._base_url = f"{scheme}://{target}" | ||
| self._sender: Sender = sender or get_sender() | ||
|
|
||
| def unary_unary( | ||
| self, | ||
| method: str, | ||
| request_serializer: Callable[[Any], bytes], | ||
| response_deserializer: Callable[[bytes], Any], | ||
| _registered_method: bool = False, | ||
| ) -> _UnaryUnaryMultiCallable: | ||
| return _UnaryUnaryMultiCallable(self, method, request_serializer, response_deserializer) | ||
|
|
||
| def stream_stream( | ||
| self, | ||
| method: str, | ||
| request_serializer: Callable[[Any], bytes], | ||
| response_deserializer: Callable[[bytes], Any], | ||
| _registered_method: bool = False, | ||
| ) -> _UnsupportedStreamMultiCallable: | ||
| return _UnsupportedStreamMultiCallable(method) | ||
|
|
||
| async def close(self, grace: Optional[float] = None) -> None: | ||
| # Nothing to tear down: each call is an independent fetch. | ||
| return None | ||
|
|
||
| async def _unary( | ||
| self, | ||
| path: str, | ||
| payload: bytes, | ||
| deserialize: Callable[[bytes], Any], | ||
| metadata: Any, | ||
| timeout: Optional[float], | ||
| ) -> Any: | ||
| headers: Dict[str, str] = { | ||
| "content-type": "application/grpc-web+proto", | ||
| "accept": "application/grpc-web+proto", | ||
| "x-grpc-web": "1", | ||
| "x-user-agent": "weaviate-python-grpc-web", | ||
| } | ||
| _fold_metadata(headers, metadata) | ||
| if timeout is not None: | ||
| headers["grpc-timeout"] = _encode_timeout(timeout) | ||
|
|
||
| url = self._base_url + path | ||
| status, resp_headers, body = await self._sender( | ||
| url, headers, encode_message(payload), timeout | ||
| ) | ||
| return self._handle_response(status, resp_headers, body, deserialize) | ||
|
|
||
| @staticmethod | ||
| def _handle_response( | ||
| http_status: int, | ||
| resp_headers: Dict[str, str], | ||
| body: bytes, | ||
| deserialize: Callable[[bytes], Any], | ||
| ) -> Any: | ||
| messages, trailers = split_response(body) if body else ([], {}) | ||
|
|
||
|
g-despot marked this conversation as resolved.
|
||
| raw_status = trailers.get("grpc-status") | ||
| if raw_status is None: | ||
| raw_status = _header_lookup(resp_headers, "grpc-status") | ||
| raw_message = ( | ||
| trailers.get("grpc-message") or _header_lookup(resp_headers, "grpc-message") or "" | ||
| ) | ||
| message = urllib.parse.unquote(raw_message) | ||
|
|
||
| if raw_status is None: | ||
| if http_status != 200: | ||
| raise AioRpcError( | ||
| code=_status_from_http(http_status), | ||
| details=f"HTTP {http_status} from grpc-web endpoint", | ||
| ) | ||
| code = StatusCode.OK | ||
| else: | ||
| code = status_from_int(int(raw_status)) | ||
|
|
||
|
g-despot marked this conversation as resolved.
|
||
| if code is not StatusCode.OK: | ||
| raise AioRpcError(code=code, details=message) | ||
| if not messages: | ||
| raise AioRpcError( | ||
| code=StatusCode.INTERNAL, | ||
| details="grpc-web response contained no message frame", | ||
| ) | ||
| return deserialize(messages[0]) | ||
|
|
||
|
|
||
| def _status_from_http(http_status: int) -> StatusCode: | ||
| """Map an HTTP status to a gRPC status when no grpc-status is present. | ||
|
|
||
| Mirrors the grpc-web spec's HTTP-to-gRPC code mapping. | ||
| """ | ||
| return { | ||
| 400: StatusCode.INTERNAL, | ||
| 401: StatusCode.UNAUTHENTICATED, | ||
| 403: StatusCode.PERMISSION_DENIED, | ||
| 404: StatusCode.UNIMPLEMENTED, | ||
| 429: StatusCode.UNAVAILABLE, | ||
| 502: StatusCode.UNAVAILABLE, | ||
| 503: StatusCode.UNAVAILABLE, | ||
| 504: StatusCode.UNAVAILABLE, | ||
| }.get(http_status, StatusCode.UNKNOWN) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.