Skip to content

Commit 874641c

Browse files
authored
Merge pull request #108 from kernel/raf/telemetry-default-routing
feat: route browser telemetry directly to the VM by default
2 parents 4c5dde5 + a0ee2b2 commit 874641c

5 files changed

Lines changed: 88 additions & 3 deletions

File tree

examples/browser_telemetry.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
"""Example: stream live browser telemetry events from a session."""
2+
3+
from kernel import Kernel
4+
5+
6+
def main() -> None:
7+
client = Kernel()
8+
9+
# Enable telemetry capture when creating the browser.
10+
browser = client.browsers.create(telemetry={"enabled": True})
11+
12+
try:
13+
# Telemetry is a default direct-to-VM routing subresource, so the stream
14+
# connects straight to the browser VM automatically.
15+
stream = client.browsers.telemetry.stream(browser.session_id)
16+
17+
# Make a few browser activity calls to generate events. The "api" telemetry
18+
# category emits an event per VM API call, so events arrive within ~1s.
19+
for _ in range(3):
20+
client.browsers.curl(browser.session_id, url="https://example.com", method="GET")
21+
22+
# Print a few events, then stop so we don't wait on the 15s keepalive.
23+
for count, message in enumerate(stream, start=1):
24+
print(message.seq, message.event.type)
25+
if count >= 3:
26+
break
27+
finally:
28+
client.browsers.delete_by_id(browser.session_id)
29+
30+
31+
if __name__ == "__main__":
32+
main()

src/kernel/_streaming.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,12 @@ def decode(self, line: str) -> ServerSentEvent | None:
251251
# See: https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation # noqa: E501
252252

253253
if not line:
254-
if not self._event and not self._data and not self._last_event_id and self._retry is None:
254+
# Whether to dispatch depends only on what was set in the *current* block. last_event_id
255+
# is sticky across events (per the SSE spec, it is intentionally not reset below), so it
256+
# must not be part of this check -- otherwise, once any event carries an id, every
257+
# subsequent comment-only block (e.g. a ``:\n\n`` keepalive) would dispatch an empty
258+
# event, which then fails to JSON-decode in the typed Stream wrapper.
259+
if not self._event and not self._data and self._retry is None:
255260
return None
256261

257262
sse = ServerSentEvent(

src/kernel/lib/browser_routing/routing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class BrowserRoutingConfig:
4141
def browser_routing_config_from_env() -> BrowserRoutingConfig:
4242
raw = os.environ.get("KERNEL_BROWSER_ROUTING_SUBRESOURCES")
4343
if raw is None:
44-
return BrowserRoutingConfig(subresources=("curl",))
44+
return BrowserRoutingConfig(subresources=("curl", "telemetry"))
4545
if raw.strip() == "":
4646
return BrowserRoutingConfig()
4747

tests/test_browser_routing.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,28 @@ def test_browser_request_uses_curl_raw() -> None:
9797
assert request.url.params.get("jwt") == "token-abc"
9898

9999

100+
@respx.mock
101+
def test_telemetry_stream_routes_directly_to_vm(monkeypatch: pytest.MonkeyPatch) -> None:
102+
monkeypatch.setenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", "telemetry")
103+
route = respx.get("http://browser-session.test/browser/kernel/telemetry/stream").mock(
104+
return_value=httpx.Response(
105+
200,
106+
headers={"content-type": "text/event-stream"},
107+
content=b'id: 1\ndata: {"category":"api"}\n\n',
108+
)
109+
)
110+
with Kernel(base_url=base_url, api_key=api_key, _strict_response_validation=True) as client:
111+
_cache_browser(client)
112+
stream = client.browsers.telemetry.stream("sess-1")
113+
stream.close()
114+
115+
assert route.called
116+
request = cast(httpx.Request, cast(Any, route.calls[0]).request)
117+
assert request.url.path == "/browser/kernel/telemetry/stream"
118+
assert request.url.params.get("jwt") == "token-abc"
119+
assert request.headers.get("Authorization") is None
120+
121+
100122
@respx.mock
101123
def test_browser_request_params_cannot_override_target_url_or_jwt() -> None:
102124
route = respx.get("http://browser-session.test/browser/kernel/curl/raw").mock(
@@ -315,7 +337,7 @@ def test_browser_route_from_browser_requires_base_url_and_jwt() -> None:
315337

316338
def test_browser_routing_config_from_env_defaults_to_curl(monkeypatch: pytest.MonkeyPatch) -> None:
317339
monkeypatch.delenv("KERNEL_BROWSER_ROUTING_SUBRESOURCES", raising=False)
318-
assert browser_routing_config_from_env().subresources == ("curl",)
340+
assert browser_routing_config_from_env().subresources == ("curl", "telemetry")
319341

320342

321343
def test_browser_routing_config_from_env_empty_string_disables_routing(monkeypatch: pytest.MonkeyPatch) -> None:

tests/test_streaming.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,32 @@ def body() -> Iterator[bytes]:
2626
await assert_empty_iter(iterator)
2727

2828

29+
@pytest.mark.asyncio
30+
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
31+
async def test_keepalive_comment_after_event_with_id(sync: bool, client: Kernel, async_client: AsyncKernel) -> None:
32+
# A ``:`` comment frame (the server's SSE keepalive) that arrives after an event which set an
33+
# id must be ignored, not dispatched as an empty event. last_event_id is sticky, so this is a
34+
# regression guard against it leaking an undecodable empty frame into the typed stream.
35+
def body() -> Iterator[bytes]:
36+
yield b"id: 1\n"
37+
yield b'data: {"foo":true}\n'
38+
yield b"\n"
39+
yield b":\n"
40+
yield b"\n"
41+
yield b'data: {"bar":false}\n'
42+
yield b"\n"
43+
44+
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
45+
46+
sse = await iter_next(iterator)
47+
assert sse.json() == {"foo": True}
48+
49+
sse = await iter_next(iterator)
50+
assert sse.json() == {"bar": False}
51+
52+
await assert_empty_iter(iterator)
53+
54+
2955
@pytest.mark.asyncio
3056
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
3157
async def test_data_missing_event(sync: bool, client: Kernel, async_client: AsyncKernel) -> None:

0 commit comments

Comments
 (0)