Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions openviking/server/routers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ async def get_task(
tracker = get_task_tracker()
task = tracker.get(
task_id,
owner_account_id=_ctx.account_id,
owner_user_id=_ctx.user.user_id,
account_id=_ctx.account_id,
user_id=_ctx.user.user_id,
)
if not task:
raise OpenVikingError(
Expand Down Expand Up @@ -58,7 +58,7 @@ async def list_tasks(
status=status,
resource_id=resource_id,
limit=limit,
owner_account_id=_ctx.account_id,
owner_user_id=_ctx.user.user_id,
account_id=_ctx.account_id,
user_id=_ctx.user.user_id,
)
return Response(status="ok", result=[t.to_dict() for t in tasks])
2 changes: 2 additions & 0 deletions openviking/service/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from openviking.service.resource_service import ResourceService
from openviking.service.search_service import SearchService
from openviking.service.session_service import SessionService
from openviking.service.task_tracker import set_task_tracker
from openviking.session import SessionCompressor, create_session_compressor
from openviking.storage import VikingDBManager
from openviking.storage.collection_schemas import init_context_collection
Expand Down Expand Up @@ -149,6 +150,7 @@ def _init_storage(
lock_expire=tx_cfg.lock_expire,
redo_recovery_enabled=tx_cfg.redo_recovery_enabled,
)
set_task_tracker(config.build_task_tracker(self._agfs_client))

@property
def _agfs(self) -> Any:
Expand Down
14 changes: 7 additions & 7 deletions openviking/service/reindex_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ async def execute(
if tracker.has_running(
REINDEX_TASK_TYPE,
uri,
owner_account_id=ctx.account_id,
owner_user_id=ctx.user.user_id,
account_id=ctx.account_id,
user_id=ctx.user.user_id,
):
raise OpenVikingError(
f"URI {uri} already has a reindex in progress",
Expand All @@ -107,8 +107,8 @@ async def execute(
task = tracker.create_if_no_running(
REINDEX_TASK_TYPE,
uri,
owner_account_id=ctx.account_id,
owner_user_id=ctx.user.user_id,
account_id=ctx.account_id,
user_id=ctx.user.user_id,
)
if task is None:
raise OpenVikingError(
Expand Down Expand Up @@ -376,17 +376,17 @@ async def _run_tracked(
ctx: RequestContext,
) -> None:
tracker = get_task_tracker()
tracker.start(task_id)
tracker.start(task_id, account_id=ctx.account_id, user_id=ctx.user.user_id)
try:
result = await self._run(
uri=uri,
object_type=object_type,
mode=mode,
ctx=ctx,
)
tracker.complete(task_id, result)
tracker.complete(task_id, result, account_id=ctx.account_id, user_id=ctx.user.user_id)
except Exception as exc:
tracker.fail(task_id, str(exc))
tracker.fail(task_id, str(exc), account_id=ctx.account_id, user_id=ctx.user.user_id)

async def _reindex_resource(
self,
Expand Down
73 changes: 58 additions & 15 deletions openviking/service/resource_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,29 @@ async def add_resource(
task = task_tracker.create(
"add_resource",
resource_id=root_uri,
owner_account_id=ctx.account_id,
owner_user_id=ctx.user.user_id,
account_id=ctx.account_id,
user_id=ctx.user.user_id,
)
result["task_id"] = task.task_id
if telemetry_id:
monitor_started = True
asyncio.create_task(self._monitor_queue_processing(task.task_id, telemetry_id))
asyncio.create_task(
self._monitor_queue_processing(
task.task_id,
telemetry_id,
ctx.account_id,
)
)
else:
task_tracker.start(task.task_id)
task_tracker.complete(task.task_id, {"root_uri": root_uri})
task_tracker.start(
task.task_id, account_id=ctx.account_id, user_id=ctx.user.user_id
)
task_tracker.complete(
task.task_id,
{"root_uri": root_uri},
account_id=ctx.account_id,
user_id=ctx.user.user_id,
)
return result
except Exception as exc:
telemetry.set_error(
Expand All @@ -322,22 +335,38 @@ async def add_resource(
get_request_wait_tracker().cleanup(telemetry_id)
unregister_wait_telemetry(telemetry_id)

async def _monitor_queue_processing(self, task_id: str, telemetry_id: str) -> None:
async def _monitor_queue_processing(
self,
task_id: str,
telemetry_id: str,
account_id: str,
user_id: str,
) -> None:
from openviking.service.task_tracker import get_task_tracker

task_tracker = get_task_tracker()
request_wait_tracker = get_request_wait_tracker()
task_tracker.start(task_id)
task_tracker.start(task_id, account_id=account_id, user_id=user_id)
try:
await request_wait_tracker.wait_for_request(telemetry_id)
status = request_wait_tracker.build_queue_status(telemetry_id)
errors = sum(int(group.get("error_count", 0) or 0) for group in status.values())
if errors:
task_tracker.fail(task_id, f"queue processing failed: {status}")
task_tracker.fail(
task_id,
f"queue processing failed: {status}",
account_id=account_id,
user_id=user_id,
)
else:
task_tracker.complete(task_id, {"queue_status": status})
task_tracker.complete(
task_id,
{"queue_status": status},
account_id=account_id,
user_id=user_id,
)
except Exception as exc:
task_tracker.fail(task_id, str(exc))
task_tracker.fail(task_id, str(exc), account_id=account_id, user_id=user_id)
finally:
request_wait_tracker.cleanup(telemetry_id)
unregister_wait_telemetry(telemetry_id)
Expand Down Expand Up @@ -518,16 +547,30 @@ async def add_skill(
task_tracker = get_task_tracker()
task = task_tracker.create(
"add_skill",
owner_account_id=ctx.account_id,
owner_user_id=ctx.user.user_id,
account_id=ctx.account_id,
user_id=ctx.user.user_id,
)
result["task_id"] = task.task_id
if telemetry_id:
monitor_started = True
asyncio.create_task(self._monitor_queue_processing(task.task_id, telemetry_id))
asyncio.create_task(
self._monitor_queue_processing(
task.task_id,
telemetry_id,
ctx.account_id,
ctx.user.user_id,
)
)
else:
task_tracker.start(task.task_id)
task_tracker.complete(task.task_id, {})
task_tracker.start(
task.task_id, account_id=ctx.account_id, user_id=ctx.user.user_id
)
task_tracker.complete(
task.task_id,
{},
account_id=ctx.account_id,
user_id=ctx.user.user_id,
)

return result
finally:
Expand Down
3 changes: 2 additions & 1 deletion openviking/service/session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ async def get_commit_task(self, task_id: str, ctx: RequestContext) -> Optional[D
"""Query background commit task status by task_id for the calling owner."""
task = get_task_tracker().get(
task_id,
owner_account_id=ctx.account_id,
account_id=ctx.account_id,
user_id=ctx.user.user_id,
)
return task.to_dict() if task else None

Expand Down
192 changes: 192 additions & 0 deletions openviking/service/task_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: AGPL-3.0
"""Internal storage backends for TaskTracker."""

from __future__ import annotations

import json
from copy import deepcopy
from typing import Any, Dict, List, Optional, Protocol

from openviking.pyagfs.exceptions import AGFSAlreadyExistsError


class TaskStore(Protocol):
def create(self, task: Any) -> None: ...

def update(self, task: Any) -> None: ...

def get(
self,
task_id: str,
*,
account_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> Optional[Dict[str, Any]]: ...

def list(self, account_id: str, *, user_id: Optional[str] = None) -> List[Dict[str, Any]]: ...

def delete(self, task_id: str, *, account_id: str, user_id: Optional[str] = None) -> None: ...


class InMemoryTaskStore:
"""Simple in-process task store."""

def __init__(self) -> None:
self._tasks: Dict[str, Dict[str, Any]] = {}

def create(self, task: Any) -> None:
self._tasks[task.task_id] = _task_to_payload(task)

def update(self, task: Any) -> None:
self._tasks[task.task_id] = _task_to_payload(task)

def get(
self,
task_id: str,
*,
account_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
payload = self._tasks.get(task_id)
if payload is None:
return None
if account_id is not None and payload.get("account_id") != account_id:
return None
if user_id is not None and payload.get("user_id") != user_id:
return None
return deepcopy(payload)

def list(self, account_id: str, *, user_id: Optional[str] = None) -> List[Dict[str, Any]]:
return [
deepcopy(payload)
for payload in self._tasks.values()
if payload.get("account_id") == account_id
and (user_id is None or payload.get("user_id") == user_id)
]

def delete(self, task_id: str, *, account_id: str, user_id: Optional[str] = None) -> None:
payload = self._tasks.get(task_id)
if (
payload
and payload.get("account_id") == account_id
and (user_id is None or payload.get("user_id") == user_id)
):
del self._tasks[task_id]


class PersistentTaskStore:
"""Persist task records into AGFS under account-scoped task directories."""

ROOT_PREFIX = "/local"
RESERVED_DIRNAME = "tasks"

def __init__(self, agfs: Any) -> None:
self._agfs = agfs

def create(self, task: Any) -> None:
self._write_task(task)

def update(self, task: Any) -> None:
self._write_task(task)

def get(
self,
task_id: str,
*,
account_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
if not account_id or not user_id:
return None
path = self._task_path(account_id, user_id, task_id)
try:
raw = self._agfs.read(path)
except Exception:
return None
return json.loads(_decode_bytes(raw))

def list(self, account_id: str, *, user_id: Optional[str] = None) -> List[Dict[str, Any]]:
if not user_id:
return []
directory = self._task_dir(account_id, user_id)
try:
items = self._agfs.ls(directory)
except Exception:
return []
tasks: List[Dict[str, Any]] = []
for item in items:
path = item.get("path") or f"{directory}/{item.get('name', '')}"
if not path.endswith(".json"):
continue
try:
raw = self._agfs.read(path)
tasks.append(json.loads(_decode_bytes(raw)))
except Exception:
continue
return tasks

def delete(self, task_id: str, *, account_id: str, user_id: Optional[str] = None) -> None:
if not user_id:
return
self._agfs.rm(self._task_path(account_id, user_id, task_id), force=True)

def _write_task(self, task: Any) -> None:
account_id = getattr(task, "account_id", None)
user_id = getattr(task, "user_id", None)
if not account_id or not user_id:
raise ValueError("PersistentTaskStore requires account_id and user_id")
self._ensure_task_dir(account_id, user_id)
self._agfs.write(
self._task_path(account_id, user_id, task.task_id),
json.dumps(_task_to_payload(task), ensure_ascii=False).encode("utf-8"),
)

def _ensure_task_dir(self, account_id: str, user_id: str) -> None:
self._mkdir_if_missing(self._account_dir(account_id))
self._mkdir_if_missing(self._task_root_dir(account_id))
self._mkdir_if_missing(self._task_dir(account_id, user_id))

def _mkdir_if_missing(self, path: str) -> None:
try:
self._agfs.mkdir(path)
except AGFSAlreadyExistsError:
return
except Exception as exc:
if "already exists" in str(exc).lower():
return
raise

def _account_dir(self, account_id: str) -> str:
return f"{self.ROOT_PREFIX}/{account_id}"

def _task_root_dir(self, account_id: str) -> str:
return f"{self._account_dir(account_id)}/{self.RESERVED_DIRNAME}"

def _task_dir(self, account_id: str, user_id: str) -> str:
return f"{self._task_root_dir(account_id)}/{user_id}"

def _task_path(self, account_id: str, user_id: str, task_id: str) -> str:
return f"{self._task_dir(account_id, user_id)}/{task_id}.json"


def _task_to_payload(task: Any) -> Dict[str, Any]:
status = getattr(task, "status", None)
return {
"task_id": task.task_id,
"task_type": task.task_type,
"status": status.value if hasattr(status, "value") else status,
"created_at": task.created_at,
"updated_at": task.updated_at,
"resource_id": task.resource_id,
"account_id": task.account_id,
"user_id": task.user_id,
"result": deepcopy(task.result),
"error": task.error,
}


def _decode_bytes(raw: Any) -> str:
if isinstance(raw, bytes):
return raw.decode("utf-8")
return str(raw)
Loading
Loading