Skip to content

Commit 0158571

Browse files
authored
feat(server): add operation telemetry for session create/add_message/… (#1943)
* feat(server): add operation telemetry for session create/add_message/commit APIs Wrap session.create, session.add_message and session.commit HTTP handlers with run_operation so callers can opt in via TelemetryRequest and receive a telemetry summary in the response. Propagate the telemetry parameter through the async/sync HTTP clients, the local client and the public SDK so all client modes expose a consistent surface. * refactor(client/local): move part imports into _add_message_impl where they are used
1 parent d0ab0d4 commit 0158571

7 files changed

Lines changed: 166 additions & 59 deletions

File tree

openviking/async_client.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,17 @@ async def session_exists(self, session_id: str) -> bool:
128128
await self._ensure_initialized()
129129
return await self._client.session_exists(session_id)
130130

131-
async def create_session(self, session_id: Optional[str] = None) -> Dict[str, Any]:
131+
async def create_session(
132+
self, session_id: Optional[str] = None, telemetry: TelemetryRequest = False
133+
) -> Dict[str, Any]:
132134
"""Create a new session.
133135
134136
Args:
135137
session_id: Optional session ID. If provided, creates a session with the given ID.
136138
If None, creates a new session with auto-generated ID.
137139
"""
138140
await self._ensure_initialized()
139-
return await self._client.create_session(session_id)
141+
return await self._client.create_session(session_id, telemetry=telemetry)
140142

141143
async def list_sessions(self) -> List[Any]:
142144
"""List all sessions."""
@@ -173,6 +175,7 @@ async def add_message(
173175
parts: list[dict] | None = None,
174176
created_at: str | None = None,
175177
role_id: str | None = None,
178+
telemetry: TelemetryRequest = False,
176179
) -> Dict[str, Any]:
177180
"""Add a message to a session.
178181
@@ -194,6 +197,7 @@ async def add_message(
194197
parts=parts,
195198
created_at=created_at,
196199
role_id=role_id,
200+
telemetry=telemetry,
197201
)
198202

199203
async def commit_session(

openviking/client/local.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,13 +399,26 @@ async def unlink(self, from_uri: str, to_uri: str) -> None:
399399

400400
# ============= Sessions =============
401401

402-
async def create_session(self, session_id: Optional[str] = None) -> Dict[str, Any]:
402+
async def create_session(
403+
self, session_id: Optional[str] = None, telemetry: TelemetryRequest = False
404+
) -> Dict[str, Any]:
403405
"""Create a new session.
404406
405407
Args:
406408
session_id: Optional session ID. If provided, creates a session with the given ID.
407409
If None, creates a new session with auto-generated ID.
408410
"""
411+
execution = await run_with_telemetry(
412+
operation="session.create",
413+
telemetry=telemetry,
414+
fn=lambda: self._create_session_impl(session_id),
415+
)
416+
return attach_telemetry_payload(
417+
execution.result,
418+
execution.telemetry,
419+
)
420+
421+
async def _create_session_impl(self, session_id: Optional[str]) -> Dict[str, Any]:
409422
await self._service.initialize_user_directories(self._ctx)
410423
await self._service.initialize_agent_directories(self._ctx)
411424
session = await self._service.sessions.create(self._ctx, session_id)
@@ -471,6 +484,7 @@ async def add_message(
471484
parts: Optional[List[Dict[str, Any]]] = None,
472485
created_at: Optional[str] = None,
473486
role_id: Optional[str] = None,
487+
telemetry: TelemetryRequest = False,
474488
) -> Dict[str, Any]:
475489
"""Add a message to a session.
476490
@@ -484,6 +498,32 @@ async def add_message(
484498
485499
If both content and parts are provided, parts takes precedence.
486500
"""
501+
execution = await run_with_telemetry(
502+
operation="session.add_message",
503+
telemetry=telemetry,
504+
fn=lambda: self._add_message_impl(
505+
session_id,
506+
role,
507+
content,
508+
parts,
509+
created_at,
510+
role_id,
511+
),
512+
)
513+
return attach_telemetry_payload(
514+
execution.result,
515+
execution.telemetry,
516+
)
517+
518+
async def _add_message_impl(
519+
self,
520+
session_id: str,
521+
role: str,
522+
content: Optional[str],
523+
parts: Optional[List[Dict[str, Any]]],
524+
created_at: Optional[str],
525+
role_id: Optional[str],
526+
) -> Dict[str, Any]:
487527
from openviking.message.part import Part, TextPart, part_from_dict
488528

489529
session = await self._service.sessions.get(session_id, self._ctx, auto_create=True)

openviking/server/routers/sessions.py

Lines changed: 68 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
from openviking.server.identity import AuthMode, RequestContext
1616
from openviking.server.models import ErrorInfo, Response
1717
from openviking.server.responses import error_response
18+
from openviking.server.telemetry import run_operation
19+
from openviking.telemetry import TelemetryRequest
1820

1921
router = APIRouter(prefix="/api/v1/sessions", tags=["sessions"])
2022
logger = logging.getLogger(__name__)
@@ -67,6 +69,7 @@ class AddMessageRequest(BaseModel):
6769
content: Optional[str] = None
6870
parts: Optional[List[Dict[str, Any]]] = None
6971
created_at: Optional[str] = None
72+
telemetry: TelemetryRequest = False
7073

7174
@model_validator(mode="after")
7275
def validate_content_or_parts(self) -> "AddMessageRequest":
@@ -86,6 +89,7 @@ class CreateSessionRequest(BaseModel):
8689
"""Request model for creating a session."""
8790

8891
session_id: Optional[str] = None
92+
telemetry: TelemetryRequest = False
8993

9094

9195
def _to_jsonable(value: Any) -> Any:
@@ -109,7 +113,7 @@ def _request_auth_mode(request: Request) -> AuthMode:
109113

110114
@router.post("")
111115
async def create_session(
112-
request: Optional[CreateSessionRequest] = None,
116+
request: CreateSessionRequest = Body(default_factory=CreateSessionRequest),
113117
_ctx: RequestContext = Depends(get_request_context),
114118
):
115119
"""Create a new session.
@@ -118,17 +122,22 @@ async def create_session(
118122
If session_id is None, creates a new session with auto-generated ID.
119123
"""
120124
service = get_service()
121-
await service.initialize_user_directories(_ctx)
122-
await service.initialize_agent_directories(_ctx)
123-
session_id = request.session_id if request else None
124-
session = await service.sessions.create(_ctx, session_id)
125-
return Response(
126-
status="ok",
127-
result={
125+
126+
async def _create() -> dict[str, Any]:
127+
await service.initialize_user_directories(_ctx)
128+
await service.initialize_agent_directories(_ctx)
129+
session = await service.sessions.create(_ctx, request.session_id)
130+
return {
128131
"session_id": session.session_id,
129132
"user": session.user.to_dict(),
130-
},
133+
}
134+
135+
execution = await run_operation(
136+
operation="session.create",
137+
telemetry=request.telemetry,
138+
fn=_create,
131139
)
140+
return Response(status="ok", result=execution.result, telemetry=execution.telemetry)
132141

133142

134143
@router.get("")
@@ -237,6 +246,7 @@ class CommitRequest(BaseModel):
237246
"(default 10); compact path passes 0 to archive everything."
238247
),
239248
)
249+
telemetry: TelemetryRequest = False
240250

241251

242252
@router.post("/{session_id}/commit")
@@ -252,10 +262,18 @@ async def commit_session(
252262
polling progress via ``GET /tasks/{task_id}``.
253263
"""
254264
service = get_service()
255-
result = await service.sessions.commit_async(
256-
session_id, _ctx, keep_recent_count=body.keep_recent_count
265+
execution = await run_operation(
266+
operation="session.commit",
267+
telemetry=body.telemetry,
268+
fn=lambda: service.sessions.commit_async(
269+
session_id, _ctx, keep_recent_count=body.keep_recent_count
270+
),
257271
)
258-
return Response(status="ok", result=result).model_dump(exclude_none=True)
272+
return Response(
273+
status="ok",
274+
result=execution.result,
275+
telemetry=execution.telemetry,
276+
).model_dump(exclude_none=True)
259277

260278

261279
@router.post("/{session_id}/extract")
@@ -291,42 +309,47 @@ async def add_message(
291309
Missing sessions are auto-created on first add.
292310
"""
293311
service = get_service()
294-
session = await service.sessions.get(session_id, _ctx, auto_create=True)
295-
role_id = _ctx.resolve_role_id(request.role, request.role_id)
296-
297-
if request.parts is not None:
298-
# Resolve path variables in URIs within parts
299-
resolved_parts = []
300-
for p in request.parts:
301-
part_copy = dict(p)
302-
# Resolve uri in context parts
303-
if part_copy.get("type") == "context" and "uri" in part_copy:
304-
part_copy["uri"] = resolve_path_variables(part_copy["uri"])
305-
# Resolve tool_uri and skill_uri in tool parts
306-
if part_copy.get("type") == "tool":
307-
if "tool_uri" in part_copy:
308-
part_copy["tool_uri"] = resolve_path_variables(part_copy["tool_uri"])
309-
if "skill_uri" in part_copy:
310-
part_copy["skill_uri"] = resolve_path_variables(part_copy["skill_uri"])
311-
resolved_parts.append(part_copy)
312-
parts = [part_from_dict(p) for p in resolved_parts]
313-
else:
314-
parts = [TextPart(text=request.content or "")]
315-
316-
# created_at 直接传递给 session (ISO string)
317-
session.add_message(
318-
request.role,
319-
parts,
320-
role_id=role_id,
321-
created_at=request.created_at,
322-
)
323-
return Response(
324-
status="ok",
325-
result={
312+
313+
async def _add() -> dict[str, Any]:
314+
session = await service.sessions.get(session_id, _ctx, auto_create=True)
315+
role_id = _ctx.resolve_role_id(request.role, request.role_id)
316+
317+
if request.parts is not None:
318+
# Resolve path variables in URIs within parts
319+
resolved_parts = []
320+
for p in request.parts:
321+
part_copy = dict(p)
322+
# Resolve uri in context parts
323+
if part_copy.get("type") == "context" and "uri" in part_copy:
324+
part_copy["uri"] = resolve_path_variables(part_copy["uri"])
325+
# Resolve tool_uri and skill_uri in tool parts
326+
if part_copy.get("type") == "tool":
327+
if "tool_uri" in part_copy:
328+
part_copy["tool_uri"] = resolve_path_variables(part_copy["tool_uri"])
329+
if "skill_uri" in part_copy:
330+
part_copy["skill_uri"] = resolve_path_variables(part_copy["skill_uri"])
331+
resolved_parts.append(part_copy)
332+
parts = [part_from_dict(p) for p in resolved_parts]
333+
else:
334+
parts = [TextPart(text=request.content or "")]
335+
336+
session.add_message(
337+
request.role,
338+
parts,
339+
role_id=role_id,
340+
created_at=request.created_at,
341+
)
342+
return {
326343
"session_id": session_id,
327344
"message_count": len(session.messages),
328-
},
345+
}
346+
347+
execution = await run_operation(
348+
operation="session.add_message",
349+
telemetry=request.telemetry,
350+
fn=_add,
329351
)
352+
return Response(status="ok", result=execution.result, telemetry=execution.telemetry)
330353

331354

332355
@router.post("/{session_id}/used")

openviking/sync_client.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,16 @@ def session_exists(self, session_id: str) -> bool:
3939
"""Check whether a session exists in storage."""
4040
return run_async(self._async_client.session_exists(session_id))
4141

42-
def create_session(self, session_id: Optional[str] = None) -> Dict[str, Any]:
42+
def create_session(
43+
self, session_id: Optional[str] = None, telemetry: TelemetryRequest = False
44+
) -> Dict[str, Any]:
4345
"""Create a new session.
4446
4547
Args:
4648
session_id: Optional session ID. If provided, creates a session with the given ID.
4749
If None, creates a new session with auto-generated ID.
4850
"""
49-
return run_async(self._async_client.create_session(session_id))
51+
return run_async(self._async_client.create_session(session_id, telemetry=telemetry))
5052

5153
def list_sessions(self) -> List[Any]:
5254
"""List all sessions."""
@@ -78,6 +80,7 @@ def add_message(
7880
parts: list[dict] | None = None,
7981
created_at: str | None = None,
8082
role_id: str | None = None,
83+
telemetry: TelemetryRequest = False,
8184
) -> Dict[str, Any]:
8285
"""Add a message to a session.
8386
@@ -92,7 +95,15 @@ def add_message(
9295
If both content and parts are provided, parts takes precedence.
9396
"""
9497
return run_async(
95-
self._async_client.add_message(session_id, role, content, parts, created_at, role_id)
98+
self._async_client.add_message(
99+
session_id,
100+
role,
101+
content,
102+
parts,
103+
created_at,
104+
role_id,
105+
telemetry,
106+
)
96107
)
97108

98109
def commit_session(

openviking_cli/client/base.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,12 +223,15 @@ async def unlink(self, from_uri: str, to_uri: str) -> None:
223223
# ============= Sessions =============
224224

225225
@abstractmethod
226-
async def create_session(self, session_id: Optional[str] = None) -> Dict[str, Any]:
226+
async def create_session(
227+
self, session_id: Optional[str] = None, telemetry: TelemetryRequest = False
228+
) -> Dict[str, Any]:
227229
"""Create a new session.
228230
229231
Args:
230232
session_id: Optional session ID. If provided, creates a session with the given ID.
231233
If None, creates a new session with auto-generated ID.
234+
telemetry: Whether to attach operation telemetry data to the result.
232235
"""
233236
...
234237

@@ -275,6 +278,7 @@ async def add_message(
275278
parts: list[dict] | None = None,
276279
created_at: str | None = None,
277280
role_id: str | None = None,
281+
telemetry: TelemetryRequest = False,
278282
) -> Dict[str, Any]:
279283
"""Add a message to a session.
280284
@@ -285,6 +289,7 @@ async def add_message(
285289
parts: Parts array (full Part support: TextPart, ContextPart, ToolPart)
286290
created_at: Message creation time (ISO format string)
287291
role_id: Optional explicit actor identity. Omit to let the server derive it.
292+
telemetry: Whether to attach operation telemetry data to the result.
288293
289294
If both content and parts are provided, parts takes precedence.
290295
"""

0 commit comments

Comments
 (0)