Skip to content

Commit 3b4dfd3

Browse files
fix(queue): preserve embedding message ids across serialization (#1380)
EmbeddingMsg instances are registered with RequestWaitTracker before they are written to the embedding queue. The queue serializes messages via dataclasses.asdict(), but id was only assigned in __init__ and was not declared as a dataclass field, so the payload lost the registered root id. When the worker deserialized the message it generated a fresh id and marked the wrong root as complete, allowing wait=True callers to hang until timeout. Declare id as a dataclass field so queue payloads retain the registered id, and add a regression test covering the serialize/dequeue round trip with RequestWaitTracker. Fixes #1379
1 parent 044e259 commit 3b4dfd3

2 files changed

Lines changed: 31 additions & 1 deletion

File tree

openviking/storage/queuefs/embedding_msg.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
22
# SPDX-License-Identifier: AGPL-3.0
33
import json
4-
from dataclasses import asdict, dataclass
4+
from dataclasses import asdict, dataclass, field
55
from typing import Any, Dict, List, Optional, Union
66
from uuid import uuid4
77

@@ -10,6 +10,7 @@
1010
class EmbeddingMsg:
1111
message: Union[str, List[Dict[str, Any]]]
1212
context_data: Dict[str, Any]
13+
id: str = field(default_factory=lambda: str(uuid4()))
1314
telemetry_id: str = ""
1415
semantic_msg_id: Optional[str] = None
1516

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
2+
# SPDX-License-Identifier: AGPL-3.0
3+
4+
from uuid import uuid4
5+
6+
from openviking.storage.queuefs.embedding_msg import EmbeddingMsg
7+
from openviking.telemetry.request_wait_tracker import RequestWaitTracker
8+
9+
10+
def test_embedding_msg_roundtrip_preserves_id_for_request_wait_tracker():
11+
telemetry_id = f"tm_{uuid4().hex}"
12+
tracker = RequestWaitTracker.get_instance()
13+
tracker.register_request(telemetry_id)
14+
15+
try:
16+
msg = EmbeddingMsg(
17+
"hello",
18+
{"uri": "viking://agent/skills/demo"},
19+
telemetry_id=telemetry_id,
20+
)
21+
tracker.register_embedding_root(telemetry_id, msg.id)
22+
23+
restored = EmbeddingMsg.from_dict(msg.to_dict())
24+
25+
assert restored.id == msg.id
26+
tracker.mark_embedding_done(telemetry_id, restored.id)
27+
assert tracker.is_complete(telemetry_id)
28+
finally:
29+
tracker.cleanup(telemetry_id)

0 commit comments

Comments
 (0)