Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions openviking/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,17 @@ async def session_exists(self, session_id: str) -> bool:
await self._ensure_initialized()
return await self._client.session_exists(session_id)

async def create_session(self, session_id: Optional[str] = None) -> Dict[str, Any]:
async def create_session(
self, session_id: Optional[str] = None, telemetry: TelemetryRequest = False
) -> Dict[str, Any]:
"""Create a new session.

Args:
session_id: Optional session ID. If provided, creates a session with the given ID.
If None, creates a new session with auto-generated ID.
"""
await self._ensure_initialized()
return await self._client.create_session(session_id)
return await self._client.create_session(session_id, telemetry=telemetry)

async def list_sessions(self) -> List[Any]:
"""List all sessions."""
Expand Down Expand Up @@ -173,6 +175,7 @@ async def add_message(
parts: list[dict] | None = None,
created_at: str | None = None,
role_id: str | None = None,
telemetry: TelemetryRequest = False,
) -> Dict[str, Any]:
"""Add a message to a session.

Expand All @@ -194,6 +197,7 @@ async def add_message(
parts=parts,
created_at=created_at,
role_id=role_id,
telemetry=telemetry,
)

async def commit_session(
Expand Down
42 changes: 41 additions & 1 deletion openviking/client/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,26 @@ async def unlink(self, from_uri: str, to_uri: str) -> None:

# ============= Sessions =============

async def create_session(self, session_id: Optional[str] = None) -> Dict[str, Any]:
async def create_session(
self, session_id: Optional[str] = None, telemetry: TelemetryRequest = False
) -> Dict[str, Any]:
"""Create a new session.

Args:
session_id: Optional session ID. If provided, creates a session with the given ID.
If None, creates a new session with auto-generated ID.
"""
execution = await run_with_telemetry(
operation="session.create",
telemetry=telemetry,
fn=lambda: self._create_session_impl(session_id),
)
return attach_telemetry_payload(
execution.result,
execution.telemetry,
)

async def _create_session_impl(self, session_id: Optional[str]) -> Dict[str, Any]:
await self._service.initialize_user_directories(self._ctx)
await self._service.initialize_agent_directories(self._ctx)
session = await self._service.sessions.create(self._ctx, session_id)
Expand Down Expand Up @@ -471,6 +484,7 @@ async def add_message(
parts: Optional[List[Dict[str, Any]]] = None,
created_at: Optional[str] = None,
role_id: Optional[str] = None,
telemetry: TelemetryRequest = False,
) -> Dict[str, Any]:
"""Add a message to a session.

Expand All @@ -484,6 +498,32 @@ async def add_message(

If both content and parts are provided, parts takes precedence.
"""
execution = await run_with_telemetry(
operation="session.add_message",
telemetry=telemetry,
fn=lambda: self._add_message_impl(
session_id,
role,
content,
parts,
created_at,
role_id,
),
)
return attach_telemetry_payload(
execution.result,
execution.telemetry,
)

async def _add_message_impl(
self,
session_id: str,
role: str,
content: Optional[str],
parts: Optional[List[Dict[str, Any]]],
created_at: Optional[str],
role_id: Optional[str],
) -> Dict[str, Any]:
from openviking.message.part import Part, TextPart, part_from_dict

session = await self._service.sessions.get(session_id, self._ctx, auto_create=True)
Expand Down
113 changes: 68 additions & 45 deletions openviking/server/routers/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from openviking.server.identity import AuthMode, RequestContext
from openviking.server.models import ErrorInfo, Response
from openviking.server.responses import error_response
from openviking.server.telemetry import run_operation
from openviking.telemetry import TelemetryRequest

router = APIRouter(prefix="/api/v1/sessions", tags=["sessions"])
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -67,6 +69,7 @@ class AddMessageRequest(BaseModel):
content: Optional[str] = None
parts: Optional[List[Dict[str, Any]]] = None
created_at: Optional[str] = None
telemetry: TelemetryRequest = False

@model_validator(mode="after")
def validate_content_or_parts(self) -> "AddMessageRequest":
Expand All @@ -86,6 +89,7 @@ class CreateSessionRequest(BaseModel):
"""Request model for creating a session."""

session_id: Optional[str] = None
telemetry: TelemetryRequest = False


def _to_jsonable(value: Any) -> Any:
Expand All @@ -109,7 +113,7 @@ def _request_auth_mode(request: Request) -> AuthMode:

@router.post("")
async def create_session(
request: Optional[CreateSessionRequest] = None,
request: CreateSessionRequest = Body(default_factory=CreateSessionRequest),
_ctx: RequestContext = Depends(get_request_context),
):
"""Create a new session.
Expand All @@ -118,17 +122,22 @@ async def create_session(
If session_id is None, creates a new session with auto-generated ID.
"""
service = get_service()
await service.initialize_user_directories(_ctx)
await service.initialize_agent_directories(_ctx)
session_id = request.session_id if request else None
session = await service.sessions.create(_ctx, session_id)
return Response(
status="ok",
result={

async def _create() -> dict[str, Any]:
await service.initialize_user_directories(_ctx)
await service.initialize_agent_directories(_ctx)
session = await service.sessions.create(_ctx, request.session_id)
return {
"session_id": session.session_id,
"user": session.user.to_dict(),
},
}

execution = await run_operation(
operation="session.create",
telemetry=request.telemetry,
fn=_create,
)
return Response(status="ok", result=execution.result, telemetry=execution.telemetry)


@router.get("")
Expand Down Expand Up @@ -237,6 +246,7 @@ class CommitRequest(BaseModel):
"(default 10); compact path passes 0 to archive everything."
),
)
telemetry: TelemetryRequest = False


@router.post("/{session_id}/commit")
Expand All @@ -252,10 +262,18 @@ async def commit_session(
polling progress via ``GET /tasks/{task_id}``.
"""
service = get_service()
result = await service.sessions.commit_async(
session_id, _ctx, keep_recent_count=body.keep_recent_count
execution = await run_operation(
operation="session.commit",
telemetry=body.telemetry,
fn=lambda: service.sessions.commit_async(
session_id, _ctx, keep_recent_count=body.keep_recent_count
),
)
return Response(status="ok", result=result).model_dump(exclude_none=True)
return Response(
status="ok",
result=execution.result,
telemetry=execution.telemetry,
).model_dump(exclude_none=True)


@router.post("/{session_id}/extract")
Expand Down Expand Up @@ -291,42 +309,47 @@ async def add_message(
Missing sessions are auto-created on first add.
"""
service = get_service()
session = await service.sessions.get(session_id, _ctx, auto_create=True)
role_id = _ctx.resolve_role_id(request.role, request.role_id)

if request.parts is not None:
# Resolve path variables in URIs within parts
resolved_parts = []
for p in request.parts:
part_copy = dict(p)
# Resolve uri in context parts
if part_copy.get("type") == "context" and "uri" in part_copy:
part_copy["uri"] = resolve_path_variables(part_copy["uri"])
# Resolve tool_uri and skill_uri in tool parts
if part_copy.get("type") == "tool":
if "tool_uri" in part_copy:
part_copy["tool_uri"] = resolve_path_variables(part_copy["tool_uri"])
if "skill_uri" in part_copy:
part_copy["skill_uri"] = resolve_path_variables(part_copy["skill_uri"])
resolved_parts.append(part_copy)
parts = [part_from_dict(p) for p in resolved_parts]
else:
parts = [TextPart(text=request.content or "")]

# created_at 直接传递给 session (ISO string)
session.add_message(
request.role,
parts,
role_id=role_id,
created_at=request.created_at,
)
return Response(
status="ok",
result={

async def _add() -> dict[str, Any]:
session = await service.sessions.get(session_id, _ctx, auto_create=True)
role_id = _ctx.resolve_role_id(request.role, request.role_id)

if request.parts is not None:
# Resolve path variables in URIs within parts
resolved_parts = []
for p in request.parts:
part_copy = dict(p)
# Resolve uri in context parts
if part_copy.get("type") == "context" and "uri" in part_copy:
part_copy["uri"] = resolve_path_variables(part_copy["uri"])
# Resolve tool_uri and skill_uri in tool parts
if part_copy.get("type") == "tool":
if "tool_uri" in part_copy:
part_copy["tool_uri"] = resolve_path_variables(part_copy["tool_uri"])
if "skill_uri" in part_copy:
part_copy["skill_uri"] = resolve_path_variables(part_copy["skill_uri"])
resolved_parts.append(part_copy)
parts = [part_from_dict(p) for p in resolved_parts]
else:
parts = [TextPart(text=request.content or "")]

session.add_message(
request.role,
parts,
role_id=role_id,
created_at=request.created_at,
)
return {
"session_id": session_id,
"message_count": len(session.messages),
},
}

execution = await run_operation(
operation="session.add_message",
telemetry=request.telemetry,
fn=_add,
)
return Response(status="ok", result=execution.result, telemetry=execution.telemetry)


@router.post("/{session_id}/used")
Expand Down
17 changes: 14 additions & 3 deletions openviking/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ def session_exists(self, session_id: str) -> bool:
"""Check whether a session exists in storage."""
return run_async(self._async_client.session_exists(session_id))

def create_session(self, session_id: Optional[str] = None) -> Dict[str, Any]:
def create_session(
self, session_id: Optional[str] = None, telemetry: TelemetryRequest = False
) -> Dict[str, Any]:
"""Create a new session.

Args:
session_id: Optional session ID. If provided, creates a session with the given ID.
If None, creates a new session with auto-generated ID.
"""
return run_async(self._async_client.create_session(session_id))
return run_async(self._async_client.create_session(session_id, telemetry=telemetry))

def list_sessions(self) -> List[Any]:
"""List all sessions."""
Expand Down Expand Up @@ -78,6 +80,7 @@ def add_message(
parts: list[dict] | None = None,
created_at: str | None = None,
role_id: str | None = None,
telemetry: TelemetryRequest = False,
) -> Dict[str, Any]:
"""Add a message to a session.

Expand All @@ -92,7 +95,15 @@ def add_message(
If both content and parts are provided, parts takes precedence.
"""
return run_async(
self._async_client.add_message(session_id, role, content, parts, created_at, role_id)
self._async_client.add_message(
session_id,
role,
content,
parts,
created_at,
role_id,
telemetry,
)
)

def commit_session(
Expand Down
7 changes: 6 additions & 1 deletion openviking_cli/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,15 @@ async def unlink(self, from_uri: str, to_uri: str) -> None:
# ============= Sessions =============

@abstractmethod
async def create_session(self, session_id: Optional[str] = None) -> Dict[str, Any]:
async def create_session(
self, session_id: Optional[str] = None, telemetry: TelemetryRequest = False
) -> Dict[str, Any]:
"""Create a new session.

Args:
session_id: Optional session ID. If provided, creates a session with the given ID.
If None, creates a new session with auto-generated ID.
telemetry: Whether to attach operation telemetry data to the result.
"""
...

Expand Down Expand Up @@ -275,6 +278,7 @@ async def add_message(
parts: list[dict] | None = None,
created_at: str | None = None,
role_id: str | None = None,
telemetry: TelemetryRequest = False,
) -> Dict[str, Any]:
"""Add a message to a session.

Expand All @@ -285,6 +289,7 @@ async def add_message(
parts: Parts array (full Part support: TextPart, ContextPart, ToolPart)
created_at: Message creation time (ISO format string)
role_id: Optional explicit actor identity. Omit to let the server derive it.
telemetry: Whether to attach operation telemetry data to the result.

If both content and parts are provided, parts takes precedence.
"""
Expand Down
Loading
Loading