Skip to content

Commit c0f4c06

Browse files
qin-ptrqin-ctx
andauthored
Fix/semantic target sync (volcengine#2207)
* fix(storage): sync semantic target before DAG * Update CONTRIBUTING_CN.md (volcengine#2206) * Update CONTRIBUTING_CN.md --------- Co-authored-by: qin-ctx <qinhaojie.exe@bytedance.com>
1 parent 2f12439 commit c0f4c06

6 files changed

Lines changed: 179 additions & 218 deletions

File tree

openviking/storage/queuefs/semantic_dag.py

Lines changed: 7 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import asyncio
66
from dataclasses import dataclass, field
7-
from typing import Awaitable, Callable, Dict, List, Optional
7+
from typing import Dict, List, Optional
88

99
from openviking.server.identity import RequestContext
1010
from openviking.storage.queuefs.semantic_sidecar import write_semantic_sidecars
@@ -82,7 +82,6 @@ def __init__(
8282
recursive: bool = True,
8383
lock: LockLease = NO_LOCK,
8484
is_code_repo: bool = False,
85-
sync_to_target: bool = False,
8685
changes: Optional[Dict[str, List[str]]] = None,
8786
skip_vectorization: bool = False,
8887
coalesce_key: str = "",
@@ -99,7 +98,6 @@ def __init__(
9998
self._recursive = recursive
10099
self._lock = lock
101100
self._is_code_repo = is_code_repo
102-
self._sync_to_target = sync_to_target
103101
self._changes = changes or {}
104102
self._skip_vectorization = skip_vectorization
105103
self._coalesce_key = coalesce_key
@@ -123,50 +121,6 @@ def __init__(
123121
self._overview_cache: Dict[str, Dict[str, str]] = {}
124122
self._overview_cache_lock = asyncio.Lock()
125123

126-
def _create_on_complete_callback(self) -> Callable[[], Awaitable[None]]:
127-
"""Create on_complete callback for incremental update or full update."""
128-
129-
async def noop_callback() -> None:
130-
return
131-
132-
if not self._target_uri or not self._root_uri:
133-
return noop_callback
134-
135-
if self._target_uri == self._root_uri:
136-
return noop_callback
137-
138-
# Full resource updates may still need a deferred temp -> target sync
139-
# when semantic generation owns the resource lock.
140-
if not self._incremental_update and not self._sync_to_target:
141-
return noop_callback
142-
143-
async def sync_diff_callback() -> None:
144-
try:
145-
diff = await self._processor._sync_topdown_recursive(
146-
self._root_uri,
147-
self._target_uri,
148-
ctx=self._ctx,
149-
file_change_status=self._file_change_status,
150-
lock=self._lock,
151-
)
152-
logger.info(
153-
f"[SyncDiff] Diff computed: "
154-
f"added_files={len(diff.added_files)}, "
155-
f"deleted_files={len(diff.deleted_files)}, "
156-
f"updated_files={len(diff.updated_files)}, "
157-
f"added_dirs={len(diff.added_dirs)}, "
158-
f"deleted_dirs={len(diff.deleted_dirs)}"
159-
)
160-
except Exception as e:
161-
logger.error(
162-
f"[SyncDiff] Error in sync_diff_callback: "
163-
f"root_uri={self._root_uri}, target_uri={self._target_uri} "
164-
f"error={e}",
165-
exc_info=True,
166-
)
167-
168-
return sync_diff_callback
169-
170124
async def run(self, root_uri: str) -> None:
171125
"""Run DAG execution starting from root_uri."""
172126
self._root_uri = root_uri
@@ -179,13 +133,9 @@ async def run(self, root_uri: str) -> None:
179133
await self._lock.close()
180134
raise
181135

182-
original_on_complete = self._create_on_complete_callback()
183-
184136
# Release owned semantic locks after downstream vectorization finishes.
185137
async def wrapped_on_complete() -> None:
186138
try:
187-
if original_on_complete:
188-
await original_on_complete()
189139
if self._telemetry_id and self._semantic_msg_id:
190140
get_request_wait_tracker().mark_semantic_done(
191141
self._telemetry_id, self._semantic_msg_id
@@ -325,13 +275,13 @@ def _get_target_file_path(self, current_uri: str) -> Optional[str]:
325275
f"Invalid target_uri or root_uri for incremental update: target_uri={self._target_uri}, root_uri={self._root_uri}"
326276
)
327277
return None
328-
try:
329-
relative_path = current_uri[len(self._root_uri) :]
330-
if relative_path.startswith("/"):
331-
relative_path = relative_path[1:]
332-
return f"{self._target_uri}/{relative_path}" if relative_path else self._target_uri
333-
except Exception:
278+
if self._target_uri != self._root_uri:
279+
logger.warning(
280+
"Incremental semantic update expects target_uri == root_uri: "
281+
f"target_uri={self._target_uri}, root_uri={self._root_uri}"
282+
)
334283
return None
284+
return current_uri
335285

336286
def _is_direct_incremental_update(self) -> bool:
337287
return (

openviking/storage/queuefs/semantic_processor.py

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ class DiffResult:
6262
added_dirs: List[str] = field(default_factory=list)
6363
deleted_dirs: List[str] = field(default_factory=list)
6464

65+
def to_changes(self) -> Dict[str, List[str]]:
66+
return {
67+
"added": self.added_files + self.added_dirs,
68+
"modified": self.updated_files,
69+
"deleted": self.deleted_files + self.deleted_dirs,
70+
}
71+
6572

6673
class RequestQueueStats:
6774
processed: int = 0
@@ -352,24 +359,36 @@ async def on_dequeue(
352359
else:
353360
is_incremental = False
354361
target_uri = msg.target_uri
362+
run_uri = msg.uri
363+
changes = msg.changes
355364
viking_fs = get_viking_fs()
356365
if msg.target_uri:
357366
target_exists = await viking_fs.exists(
358367
msg.target_uri, ctx=self._current_ctx
359368
)
360-
# Check if target URI exists and is not the same as the source URI(避免重复处理)
361369
if msg.uri != msg.target_uri:
362-
if msg.target_preexisting is None:
363-
target_preexisting = target_exists
364-
else:
365-
target_preexisting = bool(msg.target_preexisting)
366-
else:
367-
target_preexisting = target_exists
368-
if target_preexisting and msg.uri != msg.target_uri:
369-
is_incremental = True
370370
logger.info(
371-
f"Target URI exists, using incremental update: {msg.target_uri}"
371+
"Syncing semantic source into target before processing: "
372+
f"{msg.uri} -> {msg.target_uri}"
372373
)
374+
diff = await self._sync_topdown_recursive(
375+
msg.uri,
376+
msg.target_uri,
377+
ctx=self._current_ctx,
378+
lock=semantic_lock.lock,
379+
)
380+
logger.info(
381+
"[SyncDiff] Diff computed: "
382+
f"added_files={len(diff.added_files)}, "
383+
f"deleted_files={len(diff.deleted_files)}, "
384+
f"updated_files={len(diff.updated_files)}, "
385+
f"added_dirs={len(diff.added_dirs)}, "
386+
f"deleted_dirs={len(diff.deleted_dirs)}"
387+
)
388+
changes = diff.to_changes()
389+
is_incremental = True
390+
target_uri = msg.target_uri
391+
run_uri = msg.target_uri
373392
elif target_exists and msg.changes and msg.uri == msg.target_uri:
374393
is_incremental = True
375394
logger.info(
@@ -394,18 +413,17 @@ async def on_dequeue(
394413
recursive=msg.recursive,
395414
lock=semantic_lock.lock,
396415
is_code_repo=msg.is_code_repo,
397-
sync_to_target=bool(target_uri and msg.uri != target_uri),
398-
changes=msg.changes,
416+
changes=changes,
399417
skip_vectorization=msg.skip_vectorization,
400418
coalesce_key=msg.coalesce_key,
401419
coalesce_version=msg.coalesce_version,
402420
)
403421
self._dag_executor = executor
404422
lock_transferred = True
405-
await executor.run(msg.uri)
423+
await executor.run(run_uri)
406424
self._cache_dag_stats(
407425
msg.telemetry_id,
408-
msg.uri,
426+
run_uri,
409427
executor.get_stats(),
410428
)
411429
if not executor.stale:
@@ -723,7 +741,7 @@ async def list_children(dir_uri: str) -> Tuple[Dict[str, str], Dict[str, str]]:
723741
name = entry.get("name", "")
724742
if not name or name in [".", ".."]:
725743
continue
726-
if name.startswith(".") and name not in [".abstract.md", ".overview.md"]:
744+
if name.startswith("."):
727745
continue
728746
item_uri = VikingURI(dir_uri).join(name).uri
729747
if entry.get("isDir", False):
@@ -736,18 +754,6 @@ async def sync_dir(root_dir: str, target_dir: str) -> None:
736754
root_files, root_dirs = await list_children(root_dir)
737755
target_files, target_dirs = await list_children(target_dir)
738756

739-
try:
740-
await viking_fs._mv_vector_store_l0_l1(
741-
root_dir,
742-
target_dir,
743-
ctx=ctx,
744-
lock_handle=lock_handle,
745-
)
746-
except Exception as e:
747-
logger.error(
748-
f"[SyncDiff] Failed to move L0/L1 index: {root_dir} -> {target_dir}, error={e}"
749-
)
750-
751757
file_names = set(root_files.keys()) | set(target_files.keys())
752758
for name in sorted(file_names):
753759
root_file = root_files.get(name)
@@ -804,7 +810,7 @@ async def sync_dir(root_dir: str, target_dir: str) -> None:
804810
)
805811
changed = False
806812
if changed:
807-
diff.updated_files.append(root_file)
813+
diff.updated_files.append(target_file)
808814
try:
809815
await viking_fs.rm(target_file, ctx=ctx, lock_handle=lock_handle)
810816
except Exception as e:
@@ -825,8 +831,8 @@ async def sync_dir(root_dir: str, target_dir: str) -> None:
825831
continue
826832

827833
if root_file and not target_file:
828-
diff.added_files.append(root_file)
829834
target_file_uri = VikingURI(target_dir).join(name).uri
835+
diff.added_files.append(target_file_uri)
830836
try:
831837
await viking_fs.mv(
832838
root_file,
@@ -876,8 +882,8 @@ async def sync_dir(root_dir: str, target_dir: str) -> None:
876882
continue
877883

878884
if root_subdir and not target_subdir:
879-
diff.added_dirs.append(root_subdir)
880885
target_subdir_uri = VikingURI(target_dir).join(name).uri
886+
diff.added_dirs.append(target_subdir_uri)
881887
try:
882888
await viking_fs.mv(
883889
root_subdir,
@@ -899,7 +905,7 @@ async def sync_dir(root_dir: str, target_dir: str) -> None:
899905
parent_uri = VikingURI(target_uri).parent
900906
if parent_uri:
901907
await viking_fs.mkdir(parent_uri.uri, exist_ok=True, ctx=ctx)
902-
diff.added_dirs.append(root_uri)
908+
diff.added_dirs.append(target_uri)
903909
await viking_fs.mv(root_uri, target_uri, ctx=ctx, lock_handle=lock_handle)
904910
return diff
905911

openviking/utils/resource_processor.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ async def process_resource(
255255
candidate_uri = getattr(context_tree, "_candidate_uri", None) if context_tree else None
256256
resource_lock: LockLease = NO_LOCK
257257
target_preexisting = False
258+
source_committed = False
258259

259260
if root_uri and temp_uri:
260261
from openviking.storage.transaction import get_lock_manager
@@ -276,6 +277,11 @@ async def process_resource(
276277
resource_lock = await self._acquire_resource_lock(
277278
lock_manager, dst_path, uri=root_uri
278279
)
280+
if not target_preexisting:
281+
await viking_fs.persist_temp_tree(temp_uri, root_uri, ctx=ctx)
282+
await viking_fs.delete_temp(parse_result.temp_dir_path, ctx=ctx)
283+
temp_uri = root_uri
284+
source_committed = True
279285
except Exception:
280286
stage_status = "error"
281287
raise
@@ -334,7 +340,7 @@ async def process_resource(
334340
pass
335341

336342
if resource_lock.active:
337-
if not should_summarize and temp_uri:
343+
if not should_summarize and temp_uri and not source_committed:
338344
viking_fs = get_viking_fs()
339345
await viking_fs.persist_temp_tree(temp_uri, root_uri, ctx=ctx)
340346
await viking_fs.delete_temp(parse_result.temp_dir_path, ctx=ctx)

tests/misc/test_resource_processor_mv.py

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,13 @@ def get_handle(self, handle_id):
9090
class _FakeVikingFS:
9191
def __init__(self, *, exists_result=False, existing_uris=None):
9292
self.agfs = SimpleNamespace(
93-
cat=MagicMock(return_value=b"content"),
94-
ls=MagicMock(return_value=[{"name": "content.md", "isDir": False}]),
95-
mkdir=MagicMock(return_value={"status": "ok"}),
96-
mv=MagicMock(return_value={"status": "ok"}),
97-
stat=MagicMock(return_value={"isDir": True}),
9893
write=MagicMock(return_value={"status": "ok"}),
9994
)
100-
self.mv = AsyncMock(return_value={})
10195
self._exists_result = exists_result
10296
self._existing_uris = set(existing_uris or [])
10397
self.exists_calls = []
98+
self.persist_calls = []
99+
self.delete_temp_calls = []
104100

105101
def bind_request_context(self, ctx):
106102
return _CtxMgr()
@@ -115,21 +111,24 @@ async def mkdir(self, uri, exist_ok=False, ctx=None):
115111
return None
116112

117113
async def delete_temp(self, temp_dir_path, ctx=None):
114+
self.delete_temp_calls.append(temp_dir_path)
118115
return None
119116

120117
async def persist_temp_tree(self, temp_uri, target_uri, ctx=None):
118+
self.persist_calls.append((temp_uri, target_uri))
121119
self.agfs.write(self._uri_to_path(target_uri, ctx=ctx), b"content")
122120

123121
def _uri_to_path(self, uri, ctx=None):
124122
return f"/mock/{uri.replace('viking://', '')}"
125123

126124

127125
@pytest.mark.asyncio
128-
async def test_resource_processor_first_add_persist_does_not_await_agfs_mv(monkeypatch):
126+
async def test_resource_processor_first_add_summarizes_from_committed_uri(monkeypatch):
129127
from openviking.utils.resource_processor import ResourceProcessor
130128

131129
fake_fs = _FakeVikingFS()
132130
fake_lock_manager = _FakeLockManager()
131+
summarize_calls = []
133132

134133
monkeypatch.setattr(
135134
"openviking.utils.resource_processor.get_current_telemetry",
@@ -152,22 +151,27 @@ async def test_resource_processor_first_add_persist_does_not_await_agfs_mv(monke
152151
warnings=[],
153152
)
154153
)
155-
156-
context_tree = SimpleNamespace(
157-
root=SimpleNamespace(uri="viking://resources/root", temp_uri="viking://temp/root_tmp")
154+
rp.tree_builder.finalize_from_temp = AsyncMock(
155+
return_value=SimpleNamespace(
156+
root=SimpleNamespace(uri="viking://resources/root", temp_uri="viking://temp/root_tmp")
157+
)
158+
)
159+
rp._summarizer = SimpleNamespace(
160+
summarize=AsyncMock(
161+
side_effect=lambda *args, **kwargs: (
162+
summarize_calls.append(kwargs) or {"status": "success"}
163+
)
164+
)
158165
)
159-
rp.tree_builder.finalize_from_temp = AsyncMock(return_value=context_tree)
160166

161-
ctx = object()
162-
result = await rp.process_resource(path="x", ctx=ctx, build_index=False, summarize=False)
167+
result = await rp.process_resource(path="x", ctx=object(), build_index=True)
163168

164169
assert result["status"] == "success"
165170
assert result["root_uri"] == "viking://resources/root"
166-
fake_fs.agfs.mv.assert_not_called()
167-
fake_fs.agfs.write.assert_called_once()
168-
fake_fs.mv.assert_not_awaited()
169-
assert fake_lock_manager.acquired_exact_paths == []
170-
assert fake_lock_manager.acquired_tree_paths == ["/mock/resources/root"]
171+
assert fake_fs.persist_calls == [("viking://temp/root_tmp", "viking://resources/root")]
172+
assert fake_fs.delete_temp_calls == ["viking://temp/tmpdir"]
173+
assert summarize_calls[0]["temp_uris"] == ["viking://resources/root"]
174+
assert summarize_calls[0]["target_preexisting"] is False
171175

172176

173177
@pytest.mark.asyncio
@@ -218,11 +222,7 @@ async def test_resource_processor_second_add_preserves_temp_uri_for_incremental(
218222
assert result["root_uri"] == "viking://resources/root"
219223
assert summarize_calls[0]["temp_uris"] == ["viking://temp/root_tmp"]
220224
assert summarize_calls[0]["target_preexisting"] is True
221-
fake_fs.agfs.mv.assert_not_called()
222-
fake_fs.agfs.write.assert_not_called()
223-
fake_fs.mv.assert_not_awaited()
224-
assert fake_lock_manager.acquired_exact_paths == []
225-
assert fake_lock_manager.acquired_tree_paths == ["/mock/resources/root"]
225+
assert fake_fs.persist_calls == []
226226

227227

228228
@pytest.mark.asyncio
@@ -283,5 +283,6 @@ async def test_resource_processor_auto_candidate_skips_existing_and_busy(monkeyp
283283
]
284284
assert fake_lock_manager.acquired_exact_paths == []
285285
assert fake_lock_manager.acquired_tree_paths == ["/mock/resources/root_2"]
286-
assert summarize_calls[0]["temp_uris"] == ["viking://temp/root_tmp"]
286+
assert summarize_calls[0]["temp_uris"] == ["viking://resources/root_2"]
287287
assert summarize_calls[0]["target_preexisting"] is False
288+
assert fake_fs.persist_calls == [("viking://temp/root_tmp", "viking://resources/root_2")]

0 commit comments

Comments
 (0)