Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.coverage
.agentseek/
.pytest_cache/
.ruff_cache/
.tmp/
.venv/
__pycache__/
Expand All @@ -15,3 +16,5 @@ venv/
ENV/
env.bak/
venv.bak/
graph/
.langgraph_api/
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dev = [
"httpx>=0.27.0",
"ruff>=0.6.0",
"a2a-sdk>=1.0.3",
"langgraph-cli[inmem]",
]

[build-system]
Expand Down
129 changes: 100 additions & 29 deletions src/agentseek_api/api/assistants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import Any

from sqlalchemy import select

from fastapi import APIRouter, Depends, HTTPException, Response
from fastapi import APIRouter, Depends, HTTPException, Query, Response
from langchain_core.runnables.utils import create_model

from agentseek_api.core.auth_deps import get_current_user
from agentseek_api.core.database import db_manager
Expand All @@ -13,12 +16,13 @@
AssistantVersionInfo,
ErrorDetailResponse,
)
from agentseek_api.services.default_assistants import resolve_assistant_id
from agentseek_api.services.langgraph_service import get_langgraph_service

router = APIRouter(dependencies=[Depends(get_current_user)])
ASSISTANT_SUBGRAPHS_UNSUPPORTED = "Assistant subgraph inspection is not supported"
ASSISTANT_VERSION_PROMOTION_UNSUPPORTED = "Assistant version promotion is not supported"
DELETE_THREADS_UNSUPPORTED = "delete_threads=true is not supported"
SUBGRAPHS_UNSUPPORTED = "The graph does not support subgraphs"


def _detail_response(*, description: str, detail: str) -> dict[str, object]:
Expand Down Expand Up @@ -111,19 +115,21 @@ async def count_assistants(payload: AssistantSearchRequest) -> int:

@router.get("/{assistant_id}", response_model=AssistantRead)
async def get_assistant(assistant_id: str) -> AssistantRead:
resolved_id = resolve_assistant_id(assistant_id)
session_factory = db_manager.get_session_factory()
async with session_factory() as session:
row = await session.scalar(select(Assistant).where(Assistant.assistant_id == assistant_id))
row = await session.scalar(select(Assistant).where(Assistant.assistant_id == resolved_id))
if row is None:
raise HTTPException(status_code=404, detail="Assistant not found")
return _to_read_model(row)


@router.patch("/{assistant_id}", response_model=AssistantRead)
async def patch_assistant(assistant_id: str, payload: AssistantPatch) -> AssistantRead:
resolved_id = resolve_assistant_id(assistant_id)
session_factory = db_manager.get_session_factory()
async with session_factory() as session:
row = await session.scalar(select(Assistant).where(Assistant.assistant_id == assistant_id))
row = await session.scalar(select(Assistant).where(Assistant.assistant_id == resolved_id))
if row is None:
raise HTTPException(status_code=404, detail="Assistant not found")
if payload.graph_id is not None:
Expand Down Expand Up @@ -154,9 +160,10 @@ async def patch_assistant(assistant_id: str, payload: AssistantPatch) -> Assista
async def delete_assistant(assistant_id: str, delete_threads: bool = False) -> Response:
if delete_threads:
raise HTTPException(status_code=400, detail=DELETE_THREADS_UNSUPPORTED)
resolved_id = resolve_assistant_id(assistant_id)
session_factory = db_manager.get_session_factory()
async with session_factory() as session:
row = await session.scalar(select(Assistant).where(Assistant.assistant_id == assistant_id))
row = await session.scalar(select(Assistant).where(Assistant.assistant_id == resolved_id))
if row is None:
raise HTTPException(status_code=404, detail="Assistant not found")
await session.delete(row)
Expand All @@ -165,55 +172,119 @@ async def delete_assistant(assistant_id: str, delete_threads: bool = False) -> R


@router.get("/{assistant_id}/graph")
async def get_assistant_graph(assistant_id: str) -> dict[str, object]:
async def get_assistant_graph(
assistant_id: str,
xray: bool | int | None = Query(
None,
description="Expand subgraph nodes. Pass true or a positive integer depth.",
),
) -> dict[str, object]:
assistant = await get_assistant(assistant_id)
return {
"assistant_id": assistant.assistant_id,
"graph_id": assistant.graph_id,
"registered_graph_ids": get_langgraph_service().registered_graph_ids(),
}
entry = get_langgraph_service().get_entry(assistant.graph_id)
graph = entry.build_graph()
if isinstance(xray, int) and not isinstance(xray, bool) and xray <= 0:
raise HTTPException(status_code=422, detail="Invalid xray value")
xray_value: bool | int = xray if xray is not None else False
try:
drawable_graph = await graph.aget_graph(xray=xray_value)
except NotImplementedError as exc:
raise HTTPException(status_code=422, detail="The graph does not support visualization") from exc
json_graph = drawable_graph.to_json()
for node in json_graph.get("nodes", []):
data = node.get("data") if isinstance(node, dict) else None
if isinstance(data, dict):
data.pop("id", None)
return json_graph


@router.get("/{assistant_id}/schemas")
async def get_assistant_schemas(assistant_id: str) -> dict[str, object]:
assistant = await get_assistant(assistant_id)
entry = get_langgraph_service().get_entry(assistant.graph_id)
graph = entry.build_graph()
return {"graph_id": assistant.graph_id, **_extract_graph_schemas(graph)}


def _safe_schema(getter, *args, **kwargs) -> dict[str, object] | None:
try:
return getter(*args, **kwargs)
except Exception: # noqa: BLE001 - graph helpers raise broad errors
return None


def _state_jsonschema(graph) -> dict[str, object] | None:
channel_list = getattr(graph, "stream_channels_list", None)
channels = getattr(graph, "channels", None)
if not channel_list or channels is None:
return None
fields: dict[str, tuple[object, object]] = {}
for key in channel_list:
channel = channels.get(key) if isinstance(channels, dict) else getattr(channels, key, None)
update_type = getattr(channel, "UpdateType", Any) if channel is not None else Any
fields[key] = (update_type, None)
try:
name = graph.get_name("State") if hasattr(graph, "get_name") else "State"
return create_model(name, **fields).model_json_schema()
except Exception: # noqa: BLE001
return None


def _extract_graph_schemas(graph) -> dict[str, object | None]:
return {
"assistant_id": assistant.assistant_id,
"graph_id": assistant.graph_id,
"name": assistant.name,
"description": assistant.description,
"input_schema": entry.input_schema,
"output_schema": entry.output_schema,
"input_schema": _safe_schema(graph.get_input_jsonschema),
"output_schema": _safe_schema(graph.get_output_jsonschema),
"state_schema": _state_jsonschema(graph),
"config_schema": _safe_schema(graph.get_config_jsonschema) if hasattr(graph, "get_config_jsonschema") else None,
"context_schema": _safe_schema(graph.get_context_jsonschema) if hasattr(graph, "get_context_jsonschema") else None,
}


async def _collect_subgraphs(assistant_id: str, *, namespace: str | None, recurse: bool) -> dict[str, dict[str, object | None]]:
assistant = await get_assistant(assistant_id)
entry = get_langgraph_service().get_entry(assistant.graph_id)
graph = entry.build_graph()
aget_subgraphs = getattr(graph, "aget_subgraphs", None)
if not callable(aget_subgraphs):
raise HTTPException(status_code=422, detail=SUBGRAPHS_UNSUPPORTED)
try:
return {
ns: _extract_graph_schemas(subgraph)
async for ns, subgraph in aget_subgraphs(namespace=namespace, recurse=recurse)
}
except NotImplementedError as exc:
raise HTTPException(status_code=422, detail=SUBGRAPHS_UNSUPPORTED) from exc


@router.get(
"/{assistant_id}/subgraphs",
status_code=501,
response_model=None,
response_model=dict[str, dict[str, object | None]],
responses={
404: _detail_response(description="Assistant not found", detail="Assistant not found"),
501: _detail_response(description="Unsupported helper endpoint", detail=ASSISTANT_SUBGRAPHS_UNSUPPORTED),
422: _detail_response(description="Graph does not support subgraphs", detail=SUBGRAPHS_UNSUPPORTED),
},
)
async def get_assistant_subgraphs(assistant_id: str) -> None:
_ = await get_assistant(assistant_id)
raise HTTPException(status_code=501, detail=ASSISTANT_SUBGRAPHS_UNSUPPORTED)
async def get_assistant_subgraphs(
assistant_id: str,
recurse: bool = Query(False, description="Recursively include nested subgraphs."),
namespace: str | None = Query(None, description="Filter to a specific subgraph namespace."),
) -> dict[str, dict[str, object | None]]:
return await _collect_subgraphs(assistant_id, namespace=namespace, recurse=recurse)


@router.get(
"/{assistant_id}/subgraphs/{namespace}",
status_code=501,
response_model=None,
response_model=dict[str, dict[str, object | None]],
responses={
404: _detail_response(description="Assistant not found", detail="Assistant not found"),
501: _detail_response(description="Unsupported helper endpoint", detail=ASSISTANT_SUBGRAPHS_UNSUPPORTED),
422: _detail_response(description="Graph does not support subgraphs", detail=SUBGRAPHS_UNSUPPORTED),
},
)
async def get_assistant_subgraphs_by_namespace(assistant_id: str, namespace: str) -> None:
_ = (await get_assistant(assistant_id), namespace)
raise HTTPException(status_code=501, detail=ASSISTANT_SUBGRAPHS_UNSUPPORTED)
async def get_assistant_subgraphs_by_namespace(
assistant_id: str,
namespace: str,
recurse: bool = Query(False, description="Recursively include nested subgraphs."),
) -> dict[str, dict[str, object | None]]:
return await _collect_subgraphs(assistant_id, namespace=namespace, recurse=recurse)


@router.post(
Expand Down
15 changes: 9 additions & 6 deletions src/agentseek_api/api/crons.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@
)
from agentseek_api.models.auth import User
from agentseek_api.services import cron_service
from agentseek_api.services.default_assistants import resolve_assistant_id

router = APIRouter(tags=["Crons"])


async def _ensure_assistant_exists(*, assistant_id: str) -> None:
async def _ensure_assistant_exists(*, assistant_id: str) -> str:
resolved_id = resolve_assistant_id(assistant_id)
session_factory = db_manager.get_session_factory()
async with session_factory() as session:
existing = await session.scalar(select(Assistant.assistant_id).where(Assistant.assistant_id == assistant_id))
existing = await session.scalar(select(Assistant.assistant_id).where(Assistant.assistant_id == resolved_id))
if existing is None:
raise HTTPException(status_code=404, detail="Assistant not found")
return resolved_id


async def _create_cron(
Expand All @@ -44,19 +47,19 @@ async def _create_cron(

@router.post("/runs/crons", response_model=CronRead)
async def create_stateless_cron(payload: CronCreate, user: User = Depends(get_current_user)) -> CronRead:
await _ensure_assistant_exists(assistant_id=payload.assistant_id)
return await _create_cron(assistant_id=payload.assistant_id, thread_id=None, payload=payload, user=user)
resolved_id = await _ensure_assistant_exists(assistant_id=payload.assistant_id)
return await _create_cron(assistant_id=resolved_id, thread_id=None, payload=payload, user=user)


@router.post("/threads/{thread_id}/runs/crons", response_model=CronRead)
async def create_thread_cron(thread_id: str, payload: CronCreate, user: User = Depends(get_current_user)) -> CronRead:
await _ensure_assistant_exists(assistant_id=payload.assistant_id)
resolved_id = await _ensure_assistant_exists(assistant_id=payload.assistant_id)
session_factory = db_manager.get_session_factory()
async with session_factory() as session:
thread = await session.scalar(select(Thread).where(Thread.thread_id == thread_id, Thread.user_id == user.identity))
if thread is None:
raise HTTPException(status_code=404, detail="Thread not found")
return await _create_cron(assistant_id=payload.assistant_id, thread_id=thread_id, payload=payload, user=user)
return await _create_cron(assistant_id=resolved_id, thread_id=thread_id, payload=payload, user=user)


@router.post("/runs/crons/search", response_model=CronSearchResponse)
Expand Down
Loading
Loading