Reject requests on stale session or sleeping engine#4496
Reject requests on stale session or sleeping engine#4496lvhan028 merged 8 commits intoInternLM:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR binds an AsyncEngine “epoch” to each HTTP-bound Session so that /stop_all_session (and related abort flows) can reliably invalidate in-flight work that was bound before the stop/abort, reducing races between request binding and generation.
Changes:
- Stamp
session.epochin the OpenAI API server when resolving/binding a session for a request. - Add
epochtoSessionobjects (with reset behavior) and improve abort logging to include epoch. - In
AsyncEngine.generate(), drop “stale” sessions whenstop_all_session()has bumped the engine epoch since the request bound the session; also adjust some metrics accounting and/sleepbehavior.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| lmdeploy/serve/openai/api_server.py | Binds AsyncEngine.epoch to sessions on request bind; /sleep now stops all sessions before sleeping. |
| lmdeploy/serve/managers/session_manager.py | Adds Session.epoch state, resets it, and logs epoch during abort. |
| lmdeploy/serve/core/async_engine.py | Adds stale-session detection based on epoch, bumps epoch on stop-all, and updates metrics/abort handling in generate(). |
Comments suppressed due to low confidence (2)
lmdeploy/serve/core/async_engine.py:413
- This branch yields
finish_reason='length'(normal completion due to token limit) but incrementsincrease_failed_requests('error'). That will skew scheduler metrics by counting expected length-limited completions as errors; consider incrementing succeeded requests (or not marking as failed) in this case.
if gen_config.max_new_tokens == 0:
logger.info(f'run out of tokens. session={session_id}.')
metrics_processor.increase_failed_requests('error')
yield GenOut(response='',
history_token_len=session.step,
input_token_len=len(input_ids),
generate_token_len=0,
finish_reason='length',
token_ids=[])
lmdeploy/serve/core/async_engine.py:462
- This pre-inference abort path yields
finish_reason='abort'even thoughGenOut.finish_reasonis not typed to allow'abort'. Align the finish_reason enum/typing acrossGenOutand response models so metrics/logging and downstream code don't see unexpected values.
if session.epoch is not None and session.epoch != self.epoch:
logger.warning(f'[generate] session {session_id} got aborted before starting inference, '
f'session.epoch={session.epoch}, epoch={self.epoch}')
metrics_processor.increase_failed_requests('abort')
yield GenOut(response='',
history_token_len=0,
input_token_len=len(input_ids),
generate_token_len=0,
finish_reason='abort',
token_ids=[])
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| metrics_processor.increase_total_requests() | ||
|
|
||
| if (messages is not None) ^ (input_ids is None): | ||
| raise ValueError('You must specify exactly one of messages or input_ids') | ||
| if isinstance(session_id, Session): |
There was a problem hiding this comment.
metrics_processor.increase_total_requests() is now called before the input validation that can raise ValueError (e.g. the messages/input_ids XOR check). If a caller triggers these errors, total requests will be incremented without a corresponding failed-request metric, skewing metrics. Consider moving increase_total_requests() after validation (or ensuring validation errors are counted as failures).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
lmdeploy/serve/core/async_engine.py:376
metrics_processor.increase_total_requests()is executed before argument validation that can raise (messagesvsinput_idsXOR, invalidsession_idtype). If those errors occur, the request is counted but no failed metric is recorded, and the exception may propagate without a proper response. Move the increment after input validation (or wrap validation in try/except to recordincrease_failed_requests('error')).
metrics_processor.increase_total_requests()
if (messages is not None) ^ (input_ids is None):
raise ValueError('You must specify exactly one of messages or input_ids')
if isinstance(session_id, Session):
session = session_id
elif isinstance(session_id, int):
session = self.session_mgr.get(session_id, step=step)
else:
raise ValueError(f'Invalid session_id: {session_id}. It should be an instance of Session or an integer.')
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def is_engine_sleeping() -> bool: | ||
| eng = VariableInterface.async_engine | ||
| return eng is not None and eng.is_sleeping | ||
| app.add_middleware(EngineSleepingMiddleware, is_sleeping=is_engine_sleeping) | ||
|
|
||
| # set the maximum number of concurrent requests | ||
| if max_concurrent_requests is not None: | ||
| app.add_middleware(ConcurrencyLimitMiddleware, max_concurrent_requests=max_concurrent_requests) |
There was a problem hiding this comment.
Middleware ordering: EngineSleepingMiddleware is added before ConcurrencyLimitMiddleware, and Starlette middleware stacking makes the last added middleware outermost. That means sleeping inference requests still acquire a concurrency semaphore slot before being rejected with 503, which can unnecessarily block other endpoints (including /wakeup) under load. Consider adding EngineSleepingMiddleware after the concurrency limiter (or implementing the sleep gate inside the concurrency middleware) so rejections happen before acquiring the semaphore.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (2)
lmdeploy/turbomind/turbomind.py:292
TurboMind.sleepwas changed toasync def, but the implementation still performs a blocking wait over aThreadPoolExecutor(and contains noawait). Whenawaited from FastAPI (e.g.,POST /sleep), this will block the event loop until all GPU workers finish sleeping, preventing the server from responding to other endpoints during that time. Consider running the whole blocking section viaasyncio.to_thread(...)/loop.run_in_executor(...), or keepingsleep()synchronous and calling it viarun_in_executorfrom higher-level async code.
async def sleep(self, level: int = 1):
"""Sleep the model."""
with ThreadPoolExecutor(max_workers=self.gpu_count) as e:
for _ in e.map(self.model_comm.sleep, range(self.gpu_count), [level] * self.gpu_count):
pass
lmdeploy/serve/core/async_engine.py:421
- In
generate(), themax_new_tokens == 0early-exit yieldsfinish_reason='length', but increments failed requests asincrease_failed_requests('error'). This will inflatenum_errored_reqsfor a non-error condition and makes metrics inconsistent with the returnedfinish_reason. Consider treating this as a succeeded request (or at least not counting it as an error) to keep metrics aligned with behavior.
if gen_config.max_new_tokens == 0:
logger.info(f'run out of tokens. session={session_id}.')
metrics_processor.increase_failed_requests('error')
yield GenOut(response='',
history_token_len=session.step,
input_token_len=len(input_ids),
generate_token_len=0,
finish_reason='length',
token_ids=[])
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
No description provided.