-
-
Notifications
You must be signed in to change notification settings - Fork 333
feat: add --max-requests for worker process recycling #1354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
53ddacc
62ce87c
bba766c
e2ddef7
7546da5
cfe0649
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,7 +30,10 @@ def run_processes( | |
| open_browser: bool, | ||
| client_timeout: int = 30, | ||
| keep_alive_timeout: int = 20, | ||
| max_requests: Optional[int] = None, | ||
| ) -> List[Process]: | ||
| import time | ||
|
|
||
| socket = SocketHeld(url, port) | ||
|
|
||
| process_pool = init_processpool( | ||
|
|
@@ -48,12 +51,24 @@ def run_processes( | |
| excluded_response_headers_paths, | ||
| client_timeout, | ||
| keep_alive_timeout, | ||
| max_requests, | ||
| ) | ||
|
|
||
| shutting_down = False | ||
|
|
||
| def terminating_signal_handler(_sig, _frame): | ||
| logger.info("Terminating server!!", bold=True) | ||
| nonlocal shutting_down | ||
| shutting_down = True | ||
| logger.info("Gracefully shutting down server...", bold=True) | ||
| for process in process_pool: | ||
| process.terminate() | ||
| for process in process_pool: | ||
| process.kill() | ||
| process.join(timeout=30) | ||
| if process.is_alive(): | ||
| logger.warning("Process %s did not shut down in time, forcing kill.", process.pid) | ||
| process.kill() | ||
| process.join(timeout=5) | ||
| sys.exit(0) | ||
|
|
||
| signal.signal(signal.SIGINT, terminating_signal_handler) | ||
| signal.signal(signal.SIGTERM, terminating_signal_handler) | ||
|
|
@@ -63,8 +78,39 @@ def terminating_signal_handler(_sig, _frame): | |
| webbrowser.open_new_tab(f"http://{url}:{port}/") | ||
|
|
||
| logger.info("Press Ctrl + C to stop \n") | ||
| for process in process_pool: | ||
| process.join() | ||
|
|
||
| if max_requests and max_requests > 0 and len(process_pool) > 0: | ||
| while not shutting_down: | ||
| for i, process in enumerate(process_pool): | ||
| if not process.is_alive() and not shutting_down: | ||
| process.join() | ||
| logger.info("Worker process exited (recycling), spawning replacement.") | ||
| copied_socket = socket.try_clone() | ||
| new_process = Process( | ||
| target=spawn_process, | ||
| args=( | ||
| directories, | ||
| request_headers, | ||
| routes, | ||
| global_middlewares, | ||
| route_middlewares, | ||
| web_sockets, | ||
| event_handlers, | ||
| copied_socket, | ||
| workers, | ||
| response_headers, | ||
| excluded_response_headers_paths, | ||
| client_timeout, | ||
| keep_alive_timeout, | ||
| max_requests, | ||
| ), | ||
| ) | ||
| new_process.start() | ||
| process_pool[i] = new_process | ||
| time.sleep(5) | ||
|
coderabbitai[bot] marked this conversation as resolved.
Comment on lines
+82
to
+110
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Normalize Line 82 and Line 136 use truthiness, but 💡 Minimal guardrail def run_processes(
@@
keep_alive_timeout: int = 20,
max_requests: Optional[int] = None,
) -> List[Process]:
+ if max_requests is not None and max_requests <= 0:
+ max_requests = None
@@
- if max_requests and max_requests > 0 and len(process_pool) > 0:
+ if max_requests is not None and len(process_pool) > 0:
while not shutting_down:
... def init_processpool(
@@
keep_alive_timeout: int = 20,
max_requests: Optional[int] = None,
) -> List[Process]:
+ if max_requests is not None and max_requests <= 0:
+ max_requests = None
+ if sys.platform.startswith("win32") and max_requests is not None:
+ raise RuntimeError("--max-requests is not supported on Windows yet")
@@
- if sys.platform.startswith("win32") or (processes == 1 and not max_requests):
+ if sys.platform.startswith("win32") or (processes == 1 and max_requests is None):
spawn_process(...)Also applies to: 133-152 🤖 Prompt for AI AgentsWorker replacement starts late enough to cause outages. The parent only notices exited workers on the next 5-second sleep boundary at Line 110. In 🤖 Prompt for AI Agents |
||
| else: | ||
| for process in process_pool: | ||
| process.join() | ||
|
|
||
| return process_pool | ||
|
|
||
|
|
@@ -84,9 +130,10 @@ def init_processpool( | |
| excluded_response_headers_paths: Optional[List[str]], | ||
| client_timeout: int = 30, | ||
| keep_alive_timeout: int = 20, | ||
| max_requests: Optional[int] = None, | ||
| ) -> List[Process]: | ||
| process_pool: List = [] | ||
| if sys.platform.startswith("win32") or processes == 1: | ||
| if sys.platform.startswith("win32") or (processes == 1 and not max_requests): | ||
| spawn_process( | ||
| directories, | ||
| request_headers, | ||
|
|
@@ -101,11 +148,12 @@ def init_processpool( | |
| excluded_response_headers_paths, | ||
| client_timeout, | ||
| keep_alive_timeout, | ||
| max_requests, | ||
| ) | ||
|
|
||
| return process_pool | ||
|
|
||
| for _ in range(processes): | ||
| for _ in range(max(1, processes)): | ||
| copied_socket = socket.try_clone() | ||
| process = Process( | ||
| target=spawn_process, | ||
|
|
@@ -123,6 +171,7 @@ def init_processpool( | |
| excluded_response_headers_paths, | ||
| client_timeout, | ||
| keep_alive_timeout, | ||
| max_requests, | ||
| ), | ||
| ) | ||
| process.start() | ||
|
|
@@ -161,6 +210,7 @@ def spawn_process( | |
| excluded_response_headers_paths: Optional[List[str]], | ||
| client_timeout: int = 30, | ||
| keep_alive_timeout: int = 20, | ||
| max_requests: Optional[int] = None, | ||
| ): | ||
| """ | ||
| This function is called by the main process handler to create a server runtime. | ||
|
|
@@ -175,14 +225,13 @@ def spawn_process( | |
| :param socket SocketHeld: This is the main tcp socket, which is being shared across multiple processes. | ||
| :param process_name string: This is the name given to the process to identify the process | ||
| :param workers int: This is the name given to the process to identify the process | ||
| :param max_requests Optional[int]: Recycle this worker after N requests | ||
| """ | ||
|
|
||
| loop = initialize_event_loop() | ||
|
|
||
| server = Server() | ||
|
|
||
| # TODO: if we remove the dot access | ||
| # the startup time will improve in the server | ||
| for directory in directories: | ||
| server.add_directory(*directory.as_list()) | ||
|
|
||
|
|
@@ -218,8 +267,10 @@ def spawn_process( | |
| ) | ||
|
|
||
| try: | ||
| server.start(socket, workers) | ||
| server.start(socket, workers, max_requests) | ||
| loop = asyncio.get_event_loop() | ||
| loop.run_forever() | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| except KeyboardInterrupt: | ||
| pass | ||
| finally: | ||
| loop.close() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,10 @@ use std::{env, thread}; | |
|
|
||
| use actix_files::Files; | ||
| use actix_http::KeepAlive; | ||
| use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform}; | ||
| use actix_web::*; | ||
| use futures_util::future::{ok, LocalBoxFuture, Ready}; | ||
| use std::task::{Context, Poll}; | ||
|
|
||
| use log::error; | ||
| use once_cell::sync::OnceCell; | ||
|
|
@@ -40,6 +43,55 @@ const MAX_PAYLOAD_SIZE: &str = "ROBYN_MAX_PAYLOAD_SIZE"; | |
| const DEFAULT_MAX_PAYLOAD_SIZE: usize = 1_000_000; // 1Mb | ||
|
|
||
| static STARTED: AtomicBool = AtomicBool::new(false); | ||
| static REQUEST_COUNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); | ||
|
|
||
| pub fn get_request_count() -> u64 { | ||
| REQUEST_COUNT.load(Relaxed) | ||
| } | ||
|
|
||
| struct RequestCounter; | ||
|
|
||
| impl<S, B> Transform<S, ServiceRequest> for RequestCounter | ||
| where | ||
| S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>, | ||
| S::Future: 'static, | ||
| B: 'static, | ||
| { | ||
| type Response = ServiceResponse<B>; | ||
| type Error = Error; | ||
| type InitError = (); | ||
| type Transform = RequestCounterMiddleware<S>; | ||
| type Future = Ready<Result<Self::Transform, Self::InitError>>; | ||
|
|
||
| fn new_transform(&self, service: S) -> Self::Future { | ||
| ok(RequestCounterMiddleware { service }) | ||
| } | ||
| } | ||
|
|
||
| struct RequestCounterMiddleware<S> { | ||
| service: S, | ||
| } | ||
|
|
||
| impl<S, B> Service<ServiceRequest> for RequestCounterMiddleware<S> | ||
| where | ||
| S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>, | ||
| S::Future: 'static, | ||
| B: 'static, | ||
| { | ||
| type Response = ServiceResponse<B>; | ||
| type Error = Error; | ||
| type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>; | ||
|
|
||
| fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
| self.service.poll_ready(cx) | ||
| } | ||
|
|
||
| fn call(&self, req: ServiceRequest) -> Self::Future { | ||
| REQUEST_COUNT.fetch_add(1, Relaxed); | ||
| let fut = self.service.call(req); | ||
| Box::pin(fut) | ||
| } | ||
| } | ||
|
|
||
| #[derive(Clone)] | ||
| struct Directory { | ||
|
|
@@ -81,11 +133,16 @@ impl Server { | |
| } | ||
| } | ||
|
|
||
| pub fn get_request_count(&self) -> u64 { | ||
| get_request_count() | ||
| } | ||
|
|
||
| pub fn start( | ||
| &mut self, | ||
| _py: Python, | ||
| socket: PyRef<SocketHeld>, | ||
| workers: usize, | ||
| max_requests: Option<u64>, | ||
| ) -> PyResult<()> { | ||
| pyo3_log::init(); | ||
|
|
||
|
|
@@ -144,8 +201,8 @@ impl Server { | |
| const_router.bake_global_headers(&global_response_headers); | ||
| } | ||
|
|
||
| HttpServer::new(move || { | ||
| let mut app = App::new(); | ||
| let server = HttpServer::new(move || { | ||
| let mut app = App::new().wrap(RequestCounter); | ||
|
|
||
| let directories = directories.read().unwrap(); | ||
|
|
||
|
|
@@ -280,9 +337,33 @@ impl Server { | |
| .client_request_timeout(std::time::Duration::from_secs(0)) | ||
| .listen(raw_socket.into()) | ||
| .unwrap() | ||
| .run() | ||
| .await | ||
| .unwrap(); | ||
| .run(); | ||
|
|
||
| let server_handle = server.handle(); | ||
|
|
||
| if let Some(limit) = max_requests { | ||
| let handle = server_handle.clone(); | ||
| actix_web::rt::spawn(async move { | ||
| loop { | ||
| actix_web::rt::time::sleep(std::time::Duration::from_secs(5)).await; | ||
| if REQUEST_COUNT.load(Relaxed) >= limit { | ||
| log::info!( | ||
| "Max requests ({}) reached, worker shutting down for recycling.", | ||
| limit | ||
| ); | ||
| handle.stop(true).await; | ||
| break; | ||
| } | ||
| } | ||
| }); | ||
|
Comment on lines
+344
to
+358
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The recycle trigger is enforced far too late under load. Line 348 only wakes every 5 seconds, so a hot worker can serve far more than 🤖 Prompt for AI Agents |
||
| } | ||
|
sansyrox marked this conversation as resolved.
|
||
|
|
||
| server.await.unwrap(); | ||
|
|
||
| if max_requests.is_some() { | ||
| log::info!("Actix server stopped, exiting worker process for recycling."); | ||
| std::process::exit(0); | ||
| } | ||
|
Comment on lines
+361
to
+366
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Show the recycle exit path and the only Python-side shutdown/cleanup locations.
rg -n -C2 'std::process::exit\(0\)|shutdown_handler|run_forever|loop.close' src/server.rs robyn/processpool.pyRepository: sparckles/Robyn Length of output: 2553 🏁 Script executed: #!/bin/bash
# Get broader context around the exit call and the event loop handling
sed -n '355,380p' src/server.rs | cat -nRepository: sparckles/Robyn Length of output: 1215 🏁 Script executed: #!/bin/bash
# Also check the full structure of the run method to understand task spawning/awaiting
sed -n '150,175p' src/server.rs | cat -nRepository: sparckles/Robyn Length of output: 1298 🏁 Script executed: #!/bin/bash
# Get full context of the run method structure, including what spawns/awaits the async block
sed -n '140,200p' src/server.rs | cat -nRepository: sparckles/Robyn Length of output: 2774 🏁 Script executed: #!/bin/bash
# Check what happens in the Python processpool.py around the run_forever and finally block
sed -n '265,285p' robyn/processpool.py | cat -nRepository: sparckles/Robyn Length of output: 426 Hard When
A graceful recycle must signal shutdown to the Python event loop instead of terminating the process directly. 🤖 Prompt for AI Agents |
||
| }); | ||
| }); | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.