Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions openviking/storage/content_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,16 @@ async def _write_direct_with_refresh(
) -> Dict[str, Any]:
lock_manager = get_lock_manager()
handle = lock_manager.create_handle()
lock_path = self._viking_fs._uri_to_path(root_uri, ctx=ctx)
acquired = await lock_manager.acquire_tree(handle, lock_path)
lock_path = self._viking_fs._uri_to_path(uri, ctx=ctx)
acquired = await lock_manager.acquire_exact_path(handle, lock_path)
if not acquired:
await lock_manager.release(handle)
raise InvalidArgumentError(f"resource is busy and cannot be written now: {uri}")

previous_content: Optional[str] = None
content_written = False
lock_transferred = False
semantic_enqueued = False
lock_released = False
try:
if mode != "create":
previous_content = await self._viking_fs.read_file(uri, ctx=ctx)
Expand All @@ -200,10 +201,12 @@ async def _write_direct_with_refresh(
changed_uri=uri,
context_type=context_type,
ctx=ctx,
lifecycle_lock_handle_id=handle.id,
lifecycle_lock_handle_id="",
change_type="added" if mode == "create" else "modified",
)
lock_transferred = True
semantic_enqueued = True
await lock_manager.release(handle)
lock_released = True
queue_status = (
await self._wait_for_request(telemetry_id=telemetry_id, timeout=timeout)
if wait
Expand All @@ -219,15 +222,15 @@ async def _write_direct_with_refresh(
queue_status=queue_status,
)
except Exception:
if not lock_transferred and content_written:
if not semantic_enqueued and content_written:
await self._rollback_direct_write(
uri=uri,
previous_content=previous_content,
mode=mode,
ctx=ctx,
lock_handle=handle,
)
if not lock_transferred:
if not lock_released:
await lock_manager.release(handle)
raise
finally:
Expand Down
6 changes: 3 additions & 3 deletions tests/server/test_content_write_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ async def test_resource_write_semantic_refresh_uses_coalesce_key(monkeypatch):


@pytest.mark.asyncio
async def test_write_timeout_after_enqueue_does_not_release_resource_lock(monkeypatch):
async def test_write_timeout_after_enqueue_releases_resource_lock(monkeypatch):
file_uri = "viking://resources/demo/doc.md"
root_uri = "viking://resources/demo"
ctx = RequestContext(user=UserIdentifier.the_default_user(), role=Role.USER)
Expand Down Expand Up @@ -317,7 +317,7 @@ async def _fake_wait_for_request(*, telemetry_id, timeout):
wait=True,
)

assert lock_manager.release_calls == []
assert lock_manager.release_calls == ["lock-1"]
assert viking_fs.delete_temp_calls == []
assert viking_fs.content[file_uri] == "updated"

Expand Down Expand Up @@ -358,7 +358,7 @@ async def _fake_enqueue_semantic_refresh(**kwargs):
assert captured_enqueue["changed_uri"] == file_uri
assert captured_enqueue["change_type"] == "modified"
assert viking_fs.delete_temp_calls == []
assert lock_manager.release_calls == []
assert lock_manager.release_calls == ["lock-1"]


@pytest.mark.asyncio
Expand Down
Loading