-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Expand file tree
/
Copy pathoption_h_linear.py
More file actions
63 lines (44 loc) · 3.09 KB
/
option_h_linear.py
File metadata and controls
63 lines (44 loc) · 3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
"""Option H: continuation-based linear MRTR. ``await ctx.elicit()`` is genuine.
The Option B footgun was: ``await elicit()`` *looks* like a suspension point
but is actually a re-entry point, so everything above it runs twice. This
fixes that by making it a *real* suspension point — the coroutine frame is
held in a ``ContinuationStore`` across MRTR rounds, keyed by
``request_state``.
Handler code stays exactly as it was in the SSE era. Side-effects above
the await fire once because the function never restarts — it resumes.
Trade-off: the server holds the frame in memory between rounds. Client
still sees pure MRTR (no SSE), but the server is stateful *within* a
single tool call. Horizontally-scaled deployments need sticky routing on
the ``request_state`` token. Same operational shape as Option A's SSE
hold, without the long-lived connection.
When to use: migrating existing SSE-era tools to MRTR wire protocol
without rewriting the handler, or when the linear style is genuinely
clearer than guard-first (complex branching, many rounds).
When not to: if you need true statelessness across server instances.
Use E/F/G — they encode everything the server needs in ``request_state``
itself.
"""
from __future__ import annotations
from typing import Any
from pydantic import BaseModel
from mcp.server.experimental.mrtr import ContinuationStore, LinearCtx, linear_mrtr
from ._shared import audit_log, build_server, lookup_weather
class UnitsPref(BaseModel):
units: str
# ───────────────────────────────────────────────────────────────────────────
# This is what the tool author writes. Linear, front-to-back, no re-entry
# contract to reason about. The ``audit_log`` above the await fires
# exactly once — the await is a real suspension point.
# ───────────────────────────────────────────────────────────────────────────
async def weather(ctx: LinearCtx, args: dict[str, Any]) -> str:
location = args["location"]
audit_log(location) # runs once — unlike Option B
prefs = await ctx.elicit("Which units?", UnitsPref)
return lookup_weather(location, prefs.units)
# ───────────────────────────────────────────────────────────────────────────
# Registration. The store must be entered as an async context manager
# around the server's run loop — it owns the task group that keeps the
# suspended coroutines alive.
# ───────────────────────────────────────────────────────────────────────────
store = ContinuationStore()
server = build_server("mrtr-option-h", on_call_tool=linear_mrtr(weather, store=store))