Skip to content

Commit fc274be

Browse files
Merge branch 'feature/agentserver-durable-tasks' into feature/agentserver-responses-spec016
2 parents 6e2ddf0 + 5fe5b8c commit fc274be

2 files changed

Lines changed: 65 additions & 0 deletions

File tree

sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/streaming/_concrete.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,24 @@ def _compact_on_disk(self) -> None:
650650
tmp.write(self._serialize_terminal(time.time()))
651651
# Atomic replace (POSIX guarantees atomicity on same fs).
652652
os.replace(tmp_path, self._path)
653+
# ``os.replace`` swapped ``self._path`` to a brand-new inode; our
654+
# ``self._file`` handle still points at the old (now-unlinked)
655+
# inode, so every subsequent ``emit``/``close`` write would land in
656+
# the orphaned file and be lost on the next process lifetime (and
657+
# the single-writer ``flock`` would be held on the dead inode).
658+
# Reopen against the live path and re-acquire the lock. Open + lock
659+
# the new handle BEFORE closing the old one so the single-writer
660+
# guarantee is never released across the swap.
661+
old_file = self._file
662+
new_file = open(self._path, "a+b") # pylint: disable=consider-using-with
663+
if _HAS_FCNTL:
664+
fcntl.flock(new_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
665+
new_file.seek(0, os.SEEK_END)
666+
self._file = new_file
667+
try:
668+
old_file.close()
669+
except Exception: # pylint: disable=broad-except
670+
pass
653671
except Exception: # pylint: disable=broad-except
654672
try:
655673
tmp_path.unlink(missing_ok=True)

sdk/agentserver/azure-ai-agentserver-core/tests/streaming/test_file_backed_replay_event_stream.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,3 +322,50 @@ async def test_file_deleted_when_close_clock_elapses(self, tmp_path: Path) -> No
322322
f"MUST be deleted when the close-clock tombstone fires; "
323323
f"{file_path} still exists."
324324
)
325+
326+
327+
# ----------------------------------------------------------------
328+
# Rule 30 — lazy compaction must NOT lose post-compaction writes
329+
# (regression: stale file descriptor after os.replace)
330+
# ----------------------------------------------------------------
331+
332+
333+
class TestCompactionPreservesPostCompactionWrites:
334+
"""After an on-disk compaction swaps the file via ``os.replace``, the
335+
stream must keep writing to the LIVE file — not the orphaned pre-swap
336+
inode. Regression for the stale-``self._file`` data-loss bug where every
337+
``emit``/``close`` after the first compaction was written to an unlinked
338+
inode and lost on the next process lifetime.
339+
"""
340+
341+
async def test_emit_after_compaction_persists_to_live_file(self, tmp_path: Path) -> None:
342+
p = tmp_path / "fb-compact.jsonl"
343+
s = FileBackedReplayEventStream(path=p, cursor_fn=lambda e: e["n"], ttl_seconds=600)
344+
await s.emit({"n": 1})
345+
await s.emit({"n": 2})
346+
# Force a compaction (in real runs this fires once the eviction
347+
# interval is crossed; calling it directly is the accepted
348+
# intra-process construction-recovery pattern for this suite).
349+
s._compact_on_disk()
350+
await s.emit({"n": 3}) # post-compaction write — must NOT be lost
351+
await s.close() # terminal — must NOT be lost
352+
353+
content = p.read_text()
354+
assert '"n": 3' in content, f"post-compaction emit lost to orphaned inode; file={content!r}"
355+
assert "__terminal__" in content, f"post-compaction terminal lost; file={content!r}"
356+
357+
async def test_rehydrate_after_compaction_sees_post_compaction_event(self, tmp_path: Path) -> None:
358+
p = tmp_path / "fb-compact-rehydrate.jsonl"
359+
s = FileBackedReplayEventStream(path=p, cursor_fn=lambda e: e["n"], ttl_seconds=600)
360+
await s.emit({"n": 1})
361+
s._compact_on_disk()
362+
await s.emit({"n": 2})
363+
s._cleanup_locks() # simulate crash: release lock, keep the file
364+
del s
365+
366+
# A new lifetime rehydrating from the same path MUST see the
367+
# post-compaction event (it was written to the live file).
368+
s2 = FileBackedReplayEventStream(path=p, cursor_fn=lambda e: e["n"], ttl_seconds=600)
369+
cursors = [e.payload["n"] for e in s2._buffer]
370+
assert 2 in cursors, f"post-compaction event missing after rehydrate; buffer={cursors}"
371+
await s2._on_delete()

0 commit comments

Comments
 (0)