Skip to content

Commit 834224c

Browse files
committed
refactor(cli,compiler): close litellm async clients inside the compile loop
#91 closed litellm's cached async clients from the CLI layer in a separate asyncio.run(), i.e. after the loop that created them was already torn down, with the error swallowed by a bare except. Close them in the same loop that created them instead, by moving the cleanup into the compile coroutines. - add compiler._close_async_llm_clients() (best-effort, logs at debug) - call it from compile_short_doc / compile_long_doc finally blocks - drop cli._close_litellm_async_clients() and the three try/finally wraps - drop the no-op cleanup around index_long_document (indexer uses no litellm) - revert the test_add_command assertion to assert_called_once()
1 parent 933fd12 commit 834224c

3 files changed

Lines changed: 44 additions & 45 deletions

File tree

openkb/agent/compiler.py

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,23 @@ async def _llm_call_async(model: str, messages: list[dict], step_name: str, **kw
359359
return content.strip()
360360

361361

362+
async def _close_async_llm_clients() -> None:
363+
"""Close LiteLLM's cached async (aiohttp) clients for the current loop.
364+
365+
LiteLLM caches its async clients per event loop. ``add_single_file`` runs
366+
each doc in its own ``asyncio.run`` loop, so without this the clients are
367+
orphaned when the loop is torn down and their connections pile up in
368+
CLOSE-WAIT, leaking sockets/FDs across a long ingest. Call this from a
369+
``finally`` inside the compile coroutines so the clients are closed in the
370+
same loop that created them. Best-effort: never raises, so cleanup can't
371+
mask a real compilation error or break ingest.
372+
"""
373+
try:
374+
await litellm.close_litellm_async_clients()
375+
except Exception:
376+
logger.debug("litellm async client cleanup failed", exc_info=True)
377+
378+
362379
def _warn_if_truncated(response, step_name: str, max_tokens: int | None) -> None:
363380
"""Emit a warning when the LLM hit the max_tokens cap.
364381
@@ -1981,11 +1998,16 @@ async def compile_short_doc(
19811998
summary = summary_raw
19821999

19832000
# --- Steps 2-4: Concept plan → generate/update → summary rewrite → index ---
1984-
await _compile_concepts(
1985-
wiki_dir, kb_dir, model, system_msg, doc_msg,
1986-
summary, doc_name, max_concurrency, doc_brief=doc_brief,
1987-
doc_type="short", rewrite_summary=True, entity_types=entity_types,
1988-
)
2001+
try:
2002+
await _compile_concepts(
2003+
wiki_dir, kb_dir, model, system_msg, doc_msg,
2004+
summary, doc_name, max_concurrency, doc_brief=doc_brief,
2005+
doc_type="short", rewrite_summary=True, entity_types=entity_types,
2006+
)
2007+
finally:
2008+
# Close per-loop litellm async clients before asyncio.run tears this
2009+
# loop down, to avoid the CLOSE-WAIT/FD leak across a long ingest.
2010+
await _close_async_llm_clients()
19892011

19902012

19912013
async def compile_long_doc(
@@ -2026,8 +2048,13 @@ async def compile_long_doc(
20262048
overview = _llm_call(model, [system_msg, doc_msg], "overview")
20272049

20282050
# --- Steps 2-4: Concept plan → generate/update → index ---
2029-
await _compile_concepts(
2030-
wiki_dir, kb_dir, model, system_msg, doc_msg,
2031-
overview, doc_name, max_concurrency, doc_brief=doc_description,
2032-
doc_type="pageindex", entity_types=entity_types,
2033-
)
2051+
try:
2052+
await _compile_concepts(
2053+
wiki_dir, kb_dir, model, system_msg, doc_msg,
2054+
overview, doc_name, max_concurrency, doc_brief=doc_description,
2055+
doc_type="pageindex", entity_types=entity_types,
2056+
)
2057+
finally:
2058+
# Close per-loop litellm async clients before asyncio.run tears this
2059+
# loop down, to avoid the CLOSE-WAIT/FD leak across a long ingest.
2060+
await _close_async_llm_clients()

openkb/cli.py

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -259,20 +259,6 @@ def _clear_existing_skill_dir(kb_dir: Path, name: str) -> None:
259259
shutil.rmtree(target)
260260

261261

262-
def _close_litellm_async_clients() -> None:
263-
"""Best-effort cleanup of cached LiteLLM async clients.
264-
265-
LiteLLM caches aiohttp clients per event loop; with ``asyncio.run`` creating
266-
a fresh loop per doc, the old clients' connections linger in CLOSE-WAIT and
267-
accumulate sockets/FDs over a long ingest. Closing them after each
268-
compile/index frees those connections. Cleanup must never break ingest.
269-
"""
270-
try:
271-
asyncio.run(litellm.close_litellm_async_clients())
272-
except Exception:
273-
pass
274-
275-
276262
def add_single_file(file_path: Path, kb_dir: Path) -> Literal["added", "skipped", "failed"]:
277263
"""Convert, index, and compile a single document into the knowledge base.
278264
@@ -321,10 +307,7 @@ def add_single_file(file_path: Path, kb_dir: Path) -> Literal["added", "skipped"
321307
click.echo(f" Long document detected — indexing with PageIndex...")
322308
try:
323309
from openkb.indexer import index_long_document
324-
try:
325-
index_result = index_long_document(result.raw_path, kb_dir)
326-
finally:
327-
_close_litellm_async_clients()
310+
index_result = index_long_document(result.raw_path, kb_dir)
328311
except Exception as exc:
329312
click.echo(f" [ERROR] Indexing failed: {exc}")
330313
logger.debug("Indexing traceback:", exc_info=True)
@@ -334,13 +317,10 @@ def add_single_file(file_path: Path, kb_dir: Path) -> Literal["added", "skipped"
334317
click.echo(f" Compiling long doc (doc_id={index_result.doc_id})...")
335318
for attempt in range(2):
336319
try:
337-
try:
338-
asyncio.run(
339-
compile_long_doc(doc_name, summary_path, index_result.doc_id, kb_dir, model,
340-
doc_description=index_result.description)
341-
)
342-
finally:
343-
_close_litellm_async_clients()
320+
asyncio.run(
321+
compile_long_doc(doc_name, summary_path, index_result.doc_id, kb_dir, model,
322+
doc_description=index_result.description)
323+
)
344324
break
345325
except Exception as exc:
346326
if attempt == 0:
@@ -354,10 +334,7 @@ def add_single_file(file_path: Path, kb_dir: Path) -> Literal["added", "skipped"
354334
click.echo(f" Compiling short doc...")
355335
for attempt in range(2):
356336
try:
357-
try:
358-
asyncio.run(compile_short_doc(doc_name, result.source_path, kb_dir, model))
359-
finally:
360-
_close_litellm_async_clients()
337+
asyncio.run(compile_short_doc(doc_name, result.source_path, kb_dir, model))
361338
break
362339
except Exception as exc:
363340
if attempt == 0:

tests/test_add_command.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,5 @@ def test_add_short_doc_runs_compiler(self, tmp_path):
145145
patch("openkb.cli.convert_document", return_value=mock_result), \
146146
patch("openkb.cli.asyncio.run") as mock_arun:
147147
result = runner.invoke(cli, ["add", str(doc)])
148-
# asyncio.run drives both the compile and the post-compile
149-
# litellm async-client cleanup, so it is called more than once;
150-
# assert the compiler coroutine itself was run.
151-
assert mock_arun.called
152-
ran = [c.args[0] for c in mock_arun.call_args_list if c.args]
153-
assert any(getattr(co, "__name__", "") == "compile_short_doc" for co in ran)
148+
mock_arun.assert_called_once()
154149
assert "OK" in result.output

0 commit comments

Comments
 (0)