Skip to content

Commit 41c6ec2

Browse files
Fix store port conflict handling (issue #221) (#227) (#235)
(cherry picked from commit 01955ae) Co-authored-by: ddsfda99 <168000702+ddsfda99@users.noreply.github.com>
1 parent 268bd77 commit 41c6ec2

3 files changed

Lines changed: 118 additions & 7 deletions

File tree

agentlightning/cli/store.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66

77
import argparse
88
import asyncio
9+
import logging
910
from typing import Iterable
1011

1112
from agentlightning.logging import configure_logger
1213
from agentlightning.store.client_server import LightningStoreServer
1314
from agentlightning.store.memory import InMemoryLightningStore
1415

16+
logger = logging.getLogger(__name__)
17+
1518

1619
def main(argv: Iterable[str] | None = None) -> int:
1720
parser = argparse.ArgumentParser(description="Run a LightningStore server")
@@ -22,7 +25,11 @@ def main(argv: Iterable[str] | None = None) -> int:
2225

2326
store = InMemoryLightningStore()
2427
server = LightningStoreServer(store, host="0.0.0.0", port=args.port)
25-
asyncio.run(server.run_forever())
28+
try:
29+
asyncio.run(server.run_forever())
30+
except RuntimeError as exc:
31+
logger.error("LightningStore server failed to start: %s", exc, exc_info=True)
32+
return 1
2633
return 0
2734

2835

agentlightning/store/client_server.py

Lines changed: 76 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ def __init__(self, store: LightningStore, host: str, port: int):
109109
self._uvicorn_server: uvicorn.Server | None = uvicorn.Server(self._uvicorn_config)
110110

111111
self._serving_thread: Optional[threading.Thread] = None
112+
self._server_start_exception: Optional[BaseException] = None
112113

113114
# Process-awareness:
114115
# LightningStoreServer holds a plain Python object (self.store) in one process
@@ -167,17 +168,45 @@ async def start(self):
167168
logger.info(f"Starting server at {self.endpoint}")
168169

169170
uvicorn_server = self._uvicorn_server
171+
self._server_start_exception = None
170172

171173
def run_server_forever():
172-
asyncio.run(uvicorn_server.serve())
173-
174-
self._serving_thread = threading.Thread(target=run_server_forever, daemon=True)
175-
self._serving_thread.start()
174+
try:
175+
asyncio.run(uvicorn_server.serve())
176+
except (SystemExit, Exception) as exc:
177+
logger.debug("LightningStore server thread exiting due to %s", exc, exc_info=exc)
178+
self._server_start_exception = exc
179+
180+
serving_thread = threading.Thread(target=run_server_forever, daemon=True)
181+
self._serving_thread = serving_thread
182+
serving_thread.start()
183+
184+
# Wait for uvicorn to report that it has started before pinging /health.
185+
start_deadline = time.time() + 10
186+
while time.time() < start_deadline:
187+
if uvicorn_server.started:
188+
break
189+
if self._server_start_exception is not None or not serving_thread.is_alive():
190+
self._handle_failed_start()
191+
raise RuntimeError(self._format_start_failure_reason())
192+
await asyncio.sleep(0.05)
193+
else:
194+
self._handle_failed_start()
195+
raise RuntimeError("Server failed to start within the 10 seconds.")
176196

177-
# Wait for /health to be available
197+
# Wait for /health to be available once uvicorn reports started.
178198
if not await self._server_health_check():
199+
self._handle_failed_start()
179200
raise RuntimeError("Server failed to start within the 10 seconds.")
180201

202+
# If startup failed (e.g. port already in use), uvicorn never flips `started`
203+
# and the worker thread stops immediately. Guard against latching on to a
204+
# different process that happened to satisfy the health check.
205+
if not uvicorn_server.started or not serving_thread.is_alive() or self._server_start_exception is not None:
206+
self._handle_failed_start()
207+
failure_reason = self._format_start_failure_reason()
208+
raise RuntimeError(failure_reason)
209+
181210
async def _server_health_check(self) -> bool:
182211
"""Checks if the server is healthy."""
183212
current_time = time.time()
@@ -190,22 +219,63 @@ async def _server_health_check(self) -> bool:
190219
await asyncio.sleep(0.1)
191220
return False
192221

222+
def _handle_failed_start(self) -> None:
223+
"""Clean up thread state when startup fails."""
224+
if self._uvicorn_server is not None:
225+
self._uvicorn_server.should_exit = True
226+
if self._serving_thread is not None:
227+
# Thread already exited in most failure scenarios; join defensively.
228+
self._serving_thread.join(timeout=0.1)
229+
self._serving_thread = None
230+
231+
def _format_start_failure_reason(self) -> str:
232+
base_message = f"LightningStore server failed to start on {self.endpoint}."
233+
if isinstance(self._server_start_exception, SystemExit):
234+
return f"{base_message} Another process may already be using this port."
235+
if isinstance(self._server_start_exception, OSError):
236+
return f"{base_message} {self._server_start_exception.strerror}."
237+
if self._server_start_exception is not None:
238+
return f"{base_message} Reason: {self._server_start_exception}."
239+
return f"{base_message} Another process may already be using this port."
240+
193241
async def run_forever(self):
194242
"""Runs the FastAPI server indefinitely.
195243
196244
You need to call this method in the same process as the server was created in.
197245
"""
198246
assert self._uvicorn_server is not None
247+
uvicorn_server = self._uvicorn_server
199248

200249
async def _wait_till_healthy():
201250
health = await self._server_health_check()
202251
if not health:
203252
raise RuntimeError("Server did not become healthy within the 10 seconds.")
204253
logger.info("Store server is online at %s", self.endpoint)
205254

255+
async def _serve_capture():
256+
try:
257+
await uvicorn_server.serve()
258+
except KeyboardInterrupt:
259+
raise
260+
except (SystemExit, Exception) as exc:
261+
logger.debug("LightningStore server serve() raised %s", exc, exc_info=exc)
262+
self._server_start_exception = exc
263+
raise RuntimeError("LightningStore server failed to serve") from exc
264+
206265
# We run _wait_till_healthy and self._uvicorn_server.serve in parallel
207266
# until one of them raises an exception.
208-
await asyncio.gather(_wait_till_healthy(), self._uvicorn_server.serve())
267+
try:
268+
await asyncio.gather(_wait_till_healthy(), _serve_capture())
269+
except BaseException as exc:
270+
if isinstance(exc, KeyboardInterrupt):
271+
raise
272+
startup_failed = not uvicorn_server.started or isinstance(
273+
self._server_start_exception, (SystemExit, OSError)
274+
)
275+
if startup_failed:
276+
self._handle_failed_start()
277+
raise RuntimeError(self._format_start_failure_reason())
278+
raise
209279

210280
async def stop(self):
211281
"""Gracefully stops the running FastAPI server.

tests/store/test_client_server.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,40 @@ async def server_client() -> AsyncGenerator[Tuple[LightningStoreServer, Lightnin
7676
await server.stop()
7777

7878

79+
@pytest.mark.asyncio
80+
async def test_server_start_rejects_port_conflict() -> None:
81+
"""Ensure startup fails loudly when the port is already owned by another store."""
82+
store_a = InMemoryLightningStore()
83+
port = _get_free_port()
84+
server_a = LightningStoreServer(store_a, "127.0.0.1", port)
85+
await server_a.start()
86+
87+
store_b = InMemoryLightningStore()
88+
server_b = LightningStoreServer(store_b, "127.0.0.1", port)
89+
90+
with pytest.raises(RuntimeError, match="Another process may already be using this port"):
91+
await server_b.start()
92+
93+
await server_a.stop()
94+
95+
96+
@pytest.mark.asyncio
97+
async def test_run_forever_rejects_port_conflict() -> None:
98+
"""Ensure run_forever also reports port conflicts with the friendly message."""
99+
store_a = InMemoryLightningStore()
100+
port = _get_free_port()
101+
server_a = LightningStoreServer(store_a, "127.0.0.1", port)
102+
await server_a.start()
103+
104+
store_b = InMemoryLightningStore()
105+
server_b = LightningStoreServer(store_b, "127.0.0.1", port)
106+
107+
with pytest.raises(RuntimeError, match="Another process may already be using this port"):
108+
await server_b.run_forever()
109+
110+
await server_a.stop()
111+
112+
79113
@pytest.mark.asyncio
80114
async def test_add_resources_via_server(server_client: Tuple[LightningStoreServer, LightningStoreClient]) -> None:
81115
"""Test that add_resources works correctly via server."""

0 commit comments

Comments
 (0)