Skip to content

Commit 9b43734

Browse files
sponge225qin-ctx
andauthored
feat(task): add async task tracking for add-resource and add-skill operations (#1763)
* feat: add async task tracking for add-resource/add-skill/write operations - Return task_id when add-resource/add-skill/write called without --wait - Add 'ov task status <task_id>' and 'ov task list' CLI commands - Bridge RequestWaitTracker and TaskTracker via background monitor coroutines - Format TaskRecord timestamps as ISO 8601 in to_dict() - Always generate telemetry_id (remove 'not wait' condition) - Extract _create_write_task helper to eliminate code duplication - Add unregister_wait_telemetry in _monitor_write_queue for consistency - Update CLI async prompt from 'ov wait' to 'ov task status <task_id>' - Add 7 new tests covering async task tracking * refactor: remove task_id from write operations, keep only add-resource/add-skill Write operations (write/create_file/write_memory) are primarily called by internal flows like session commit, which have their own task tracking. Only add-resource and add-skill are user-facing CLI commands that need task_id for async progress tracking. * fix: address PR review feedback 1. Fix async failure path leaking request-scoped tracker/telemetry state - Add monitor_started flag to ensure cleanup when monitor coroutine hasn't been launched yet - Finally block now cleans up if wait or not telemetry_id or not monitor_started 2. Fix TaskRecord timestamp API compatibility break - Keep original created_at/updated_at as float (backward compatible) - Add new created_at_iso/updated_at_iso fields with ISO 8601 strings 3. Fix ruff format lint failure on resource_service.py 4. Add regression test for async failure cleanup - test_add_resource_async_failure_cleans_up_tracker verifies no RequestWaitTracker or telemetry registry state leaks when processor raises before task/monitor creation * fix: add missing unregister_wait_telemetry in add_skill finally block * fix: remove unused imports in test_add_resource_async_failure_cleans_up_tracker * fix: queue failure shows as completed and business error creates unreachable task - _monitor_queue_processing: check error_count from build_queue_status, mark task as failed when queue processing has errors - add_resource: skip task creation when process_resource returns status=error, preventing unreachable ghost tasks - Improve test_add_resource_async_failure_cleans_up_tracker: patch internal processor instead of add_resource itself to cover finally cleanup logic - Add test_add_skill_async_returns_task_id for add_skill coverage - Add test_add_resource_business_error_no_task regression test - Add test_monitor_marks_failed_on_queue_error regression test * test: move add_skill tests to test_session_task_tracking.py Move add_skill task tracking tests from test_api_resources.py to test_session_task_tracking.py where other task tracking tests live, and add sync no-task-id coverage. * test: update async task queryable assertion to include failed status Queue errors now correctly mark task as failed (Bug 1 fix), so the test assertion must accept 'failed' as a valid terminal status. * refactor: use explicit return result for business error path * fix: remove duplicate asyncio imports in test_api_resources.py * fix: avoid task creation on watch conflict --------- Co-authored-by: qin-ctx <qinhaojie.exe@bytedance.com>
1 parent b9a8c4e commit 9b43734

12 files changed

Lines changed: 584 additions & 10 deletions

File tree

crates/ov_cli/src/client.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,28 @@ impl HttpClient {
576576
}
577577
}
578578

579+
// ============ Task Methods ============
580+
581+
pub async fn get_task(&self, task_id: &str) -> Result<serde_json::Value> {
582+
let path = format!("/api/v1/tasks/{}", task_id);
583+
self.get(&path, &[]).await
584+
}
585+
586+
pub async fn list_tasks(
587+
&self,
588+
task_type: Option<&str>,
589+
status: Option<&str>,
590+
) -> Result<serde_json::Value> {
591+
let mut params: Vec<(String, String)> = Vec::new();
592+
if let Some(t) = task_type {
593+
params.push(("task_type".to_string(), t.to_string()));
594+
}
595+
if let Some(s) = status {
596+
params.push(("status".to_string(), s.to_string()));
597+
}
598+
self.get("/api/v1/tasks", &params).await
599+
}
600+
579601
// ============ Relation Methods ============
580602

581603
pub async fn relations(&self, uri: &str) -> Result<serde_json::Value> {

crates/ov_cli/src/commands/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ pub mod resources;
1111
pub mod search;
1212
pub mod session;
1313
pub mod system;
14+
pub mod task;

crates/ov_cli/src/commands/resources.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,9 @@ pub async fn add_resource(
4444
)
4545
.await?;
4646

47-
// Print helpful message for async processing
4847
if !wait && matches!(format, OutputFormat::Table) {
4948
eprintln!("Note: Resource is being processed in the background.");
50-
eprintln!("Use 'ov wait' to wait for completion, or 'ov observer queue' to check status.");
49+
eprintln!("Use 'ov task status <task_id>' to check progress, or 'ov task list' to see all tasks.");
5150
}
5251

5352
output_success(&result, format, compact);
@@ -64,10 +63,9 @@ pub async fn add_skill(
6463
) -> Result<()> {
6564
let result = client.add_skill(data, wait, timeout).await?;
6665

67-
// Print helpful message for async processing
6866
if !wait && matches!(format, OutputFormat::Table) {
6967
eprintln!("Note: Skill is being processed in the background.");
70-
eprintln!("Use 'ov wait' to wait for completion, or 'ov observer queue' to check status.");
68+
eprintln!("Use 'ov task status <task_id>' to check progress, or 'ov task list' to see all tasks.");
7169
}
7270

7371
output_success(&result, format, compact);

crates/ov_cli/src/commands/task.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use crate::client::HttpClient;
2+
use crate::error::Result;
3+
use crate::output::{OutputFormat, output_success};
4+
5+
pub async fn status(
6+
client: &HttpClient,
7+
task_id: &str,
8+
output_format: OutputFormat,
9+
compact: bool,
10+
) -> Result<()> {
11+
let result = client.get_task(task_id).await?;
12+
output_success(&result, output_format, compact);
13+
Ok(())
14+
}
15+
16+
pub async fn list(
17+
client: &HttpClient,
18+
task_type: Option<&str>,
19+
status: Option<&str>,
20+
output_format: OutputFormat,
21+
compact: bool,
22+
) -> Result<()> {
23+
let result = client.list_tasks(task_type, status).await?;
24+
output_success(&result, output_format, compact);
25+
Ok(())
26+
}

crates/ov_cli/src/main.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,11 @@ enum Commands {
543543
#[arg(long)]
544544
timeout: Option<f64>,
545545
},
546+
/// [Status] Track async resource processing tasks
547+
Task {
548+
#[command(subcommand)]
549+
action: TaskCommands,
550+
},
546551
/// [Status] All OpenViking Server components status
547552
Status,
548553
/// [Status] Observe OpenViking Server components status
@@ -596,6 +601,24 @@ impl Commands {
596601
}
597602
}
598603

604+
#[derive(Subcommand)]
605+
enum TaskCommands {
606+
/// Show status of a specific task
607+
Status {
608+
/// Task ID returned by add-resource/add-skill
609+
task_id: String,
610+
},
611+
/// List all tracked tasks
612+
List {
613+
/// Filter by task type (e.g. add_resource, add_skill, session_commit, reindex)
614+
#[arg(long)]
615+
task_type: Option<String>,
616+
/// Filter by status (pending, running, completed, failed)
617+
#[arg(long)]
618+
status: Option<String>,
619+
},
620+
}
621+
599622
#[derive(Subcommand)]
600623
enum SystemCommands {
601624
/// Wait for queued async processing to complete
@@ -1041,6 +1064,23 @@ async fn main() {
10411064
let client = ctx.get_client();
10421065
commands::system::wait(&client, timeout, ctx.output_format, ctx.compact).await
10431066
}
1067+
Commands::Task { action } => match action {
1068+
TaskCommands::Status { task_id } => {
1069+
let client = ctx.get_client();
1070+
commands::task::status(&client, &task_id, ctx.output_format, ctx.compact).await
1071+
}
1072+
TaskCommands::List { task_type, status } => {
1073+
let client = ctx.get_client();
1074+
commands::task::list(
1075+
&client,
1076+
task_type.as_deref(),
1077+
status.as_deref(),
1078+
ctx.output_format,
1079+
ctx.compact,
1080+
)
1081+
.await
1082+
}
1083+
},
10441084
Commands::Status => {
10451085
let client = ctx.get_client();
10461086
commands::observer::system(&client, ctx.output_format, ctx.compact).await

openviking/service/resource_service.py

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
Provides resource management operations: add_resource, add_skill, wait_processed.
77
"""
88

9+
import asyncio
910
import json
1011
import time
1112
from typing import TYPE_CHECKING, Any, Dict, List, Optional
@@ -161,7 +162,8 @@ async def add_resource(
161162
telemetry = get_current_telemetry()
162163
telemetry_id = register_wait_telemetry(wait)
163164
request_wait_tracker = get_request_wait_tracker()
164-
if wait and telemetry_id:
165+
monitor_started = False
166+
if telemetry_id:
165167
request_wait_tracker.register_request(telemetry_id)
166168
watch_manager = self._get_watch_manager()
167169
watch_enabled = bool(
@@ -212,7 +214,9 @@ async def add_resource(
212214
**kwargs,
213215
)
214216

215-
if wait:
217+
if result.get("status") == "error":
218+
return result
219+
elif wait:
216220
wait_start = time.perf_counter()
217221
try:
218222
with telemetry.measure("resource.wait"):
@@ -283,6 +287,24 @@ async def add_resource(
283287
logger.warning(
284288
f"[ResourceService] Failed to cancel watch task for {to}: {e}"
285289
)
290+
if not wait:
291+
from openviking.service.task_tracker import get_task_tracker
292+
293+
task_tracker = get_task_tracker()
294+
root_uri = result.get("root_uri", "")
295+
task = task_tracker.create(
296+
"add_resource",
297+
resource_id=root_uri,
298+
owner_account_id=ctx.account_id,
299+
owner_user_id=ctx.user.user_id,
300+
)
301+
result["task_id"] = task.task_id
302+
if telemetry_id:
303+
monitor_started = True
304+
asyncio.create_task(self._monitor_queue_processing(task.task_id, telemetry_id))
305+
else:
306+
task_tracker.start(task.task_id)
307+
task_tracker.complete(task.task_id, {"root_uri": root_uri})
286308
return result
287309
except Exception as exc:
288310
telemetry.set_error(
@@ -296,7 +318,28 @@ async def add_resource(
296318
"resource.request.duration_ms",
297319
round((time.perf_counter() - request_start) * 1000, 3),
298320
)
299-
get_request_wait_tracker().cleanup(telemetry_id)
321+
if wait or not telemetry_id or not monitor_started:
322+
get_request_wait_tracker().cleanup(telemetry_id)
323+
unregister_wait_telemetry(telemetry_id)
324+
325+
async def _monitor_queue_processing(self, task_id: str, telemetry_id: str) -> None:
326+
from openviking.service.task_tracker import get_task_tracker
327+
328+
task_tracker = get_task_tracker()
329+
request_wait_tracker = get_request_wait_tracker()
330+
task_tracker.start(task_id)
331+
try:
332+
await request_wait_tracker.wait_for_request(telemetry_id)
333+
status = request_wait_tracker.build_queue_status(telemetry_id)
334+
errors = sum(int(group.get("error_count", 0) or 0) for group in status.values())
335+
if errors:
336+
task_tracker.fail(task_id, f"queue processing failed: {status}")
337+
else:
338+
task_tracker.complete(task_id, {"queue_status": status})
339+
except Exception as exc:
340+
task_tracker.fail(task_id, str(exc))
341+
finally:
342+
request_wait_tracker.cleanup(telemetry_id)
300343
unregister_wait_telemetry(telemetry_id)
301344

302345
async def _handle_watch_task_creation(
@@ -434,7 +477,8 @@ async def add_skill(
434477
self._ensure_initialized()
435478
telemetry_id = get_current_telemetry().telemetry_id
436479
request_wait_tracker = get_request_wait_tracker()
437-
if wait and telemetry_id:
480+
monitor_started = False
481+
if telemetry_id:
438482
request_wait_tracker.register_request(telemetry_id)
439483

440484
try:
@@ -468,10 +512,28 @@ async def add_skill(
468512
round((time.perf_counter() - wait_start) * 1000, 3),
469513
)
470514
result["queue_status"] = status
515+
else:
516+
from openviking.service.task_tracker import get_task_tracker
517+
518+
task_tracker = get_task_tracker()
519+
task = task_tracker.create(
520+
"add_skill",
521+
owner_account_id=ctx.account_id,
522+
owner_user_id=ctx.user.user_id,
523+
)
524+
result["task_id"] = task.task_id
525+
if telemetry_id:
526+
monitor_started = True
527+
asyncio.create_task(self._monitor_queue_processing(task.task_id, telemetry_id))
528+
else:
529+
task_tracker.start(task.task_id)
530+
task_tracker.complete(task.task_id, {})
471531

472532
return result
473533
finally:
474-
request_wait_tracker.cleanup(telemetry_id)
534+
if wait or not telemetry_id or not monitor_started:
535+
request_wait_tracker.cleanup(telemetry_id)
536+
unregister_wait_telemetry(telemetry_id)
475537

476538
async def build_index(
477539
self, resource_uris: List[str], ctx: RequestContext, **kwargs

openviking/service/task_tracker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import time
2121
from copy import deepcopy
2222
from dataclasses import asdict, dataclass, field
23+
from datetime import datetime, timezone
2324
from enum import Enum
2425
from typing import Any, Dict, List, Optional
2526
from uuid import uuid4
@@ -57,6 +58,8 @@ def to_dict(self) -> Dict[str, Any]:
5758
"""Serialize for JSON response."""
5859
d = asdict(self)
5960
d["status"] = self.status.value
61+
d["created_at_iso"] = datetime.fromtimestamp(self.created_at, tz=timezone.utc).isoformat()
62+
d["updated_at_iso"] = datetime.fromtimestamp(self.updated_at, tz=timezone.utc).isoformat()
6063
d.pop("owner_account_id", None)
6164
d.pop("owner_user_id", None)
6265
return d

openviking/telemetry/resource_summary.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def _consume_semantic_dag_stats(telemetry_id: str, root_uri: str | None):
4141
def register_wait_telemetry(wait: bool) -> str:
4242
"""Register current telemetry collector for async queue consumers when needed."""
4343
handle = get_current_telemetry()
44-
if not wait or not handle.telemetry_id:
44+
if not handle.telemetry_id:
4545
return ""
4646
if handle.enabled:
4747
register_telemetry(handle)

0 commit comments

Comments
 (0)