Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions robyn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ def start(self, host: str = "127.0.0.1", port: int = 8080, _check_port: bool = T
open_browser,
client_timeout,
keep_alive_timeout,
self.config.max_requests,
)


Expand Down
19 changes: 19 additions & 0 deletions robyn/argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
import os


def _positive_int(value: str) -> int:
try:
ivalue = int(value)
except ValueError:
raise argparse.ArgumentTypeError(f"invalid int value: '{value}'")
if ivalue <= 0:
raise argparse.ArgumentTypeError(f"--max-requests must be a positive integer, got {ivalue}")
return ivalue


class Config:
def __init__(self) -> None:
parser = argparse.ArgumentParser(description="Robyn, a fast async web framework with a rust runtime.")
Expand Down Expand Up @@ -84,6 +94,14 @@ def __init__(self) -> None:
default=False,
help="Fast mode. It sets the optimal values for processes, workers and log level. However, you can override them.",
)
parser.add_argument(
"--max-requests",
dest="max_requests",
type=_positive_int,
default=None,
required=False,
help="Recycle worker processes after this many requests. Helps contain memory leaks. [Default: None (disabled)]",
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

args, unknown_args = parser.parse_known_args()
self.fast = args.fast
Expand All @@ -99,6 +117,7 @@ def __init__(self) -> None:
self.file_path = None
self.disable_openapi = args.disable_openapi
self.log_level = args.log_level
self.max_requests = args.max_requests

if self.fast:
# doing this here before every other check
Expand Down
69 changes: 60 additions & 9 deletions robyn/processpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ def run_processes(
open_browser: bool,
client_timeout: int = 30,
keep_alive_timeout: int = 20,
max_requests: Optional[int] = None,
) -> List[Process]:
import time

socket = SocketHeld(url, port)

process_pool = init_processpool(
Expand All @@ -48,12 +51,24 @@ def run_processes(
excluded_response_headers_paths,
client_timeout,
keep_alive_timeout,
max_requests,
)

shutting_down = False

def terminating_signal_handler(_sig, _frame):
logger.info("Terminating server!!", bold=True)
nonlocal shutting_down
shutting_down = True
logger.info("Gracefully shutting down server...", bold=True)
for process in process_pool:
process.terminate()
for process in process_pool:
process.kill()
process.join(timeout=30)
if process.is_alive():
logger.warning("Process %s did not shut down in time, forcing kill.", process.pid)
process.kill()
process.join(timeout=5)
sys.exit(0)

signal.signal(signal.SIGINT, terminating_signal_handler)
signal.signal(signal.SIGTERM, terminating_signal_handler)
Expand All @@ -63,8 +78,39 @@ def terminating_signal_handler(_sig, _frame):
webbrowser.open_new_tab(f"http://{url}:{port}/")

logger.info("Press Ctrl + C to stop \n")
for process in process_pool:
process.join()

if max_requests and max_requests > 0 and len(process_pool) > 0:
while not shutting_down:
for i, process in enumerate(process_pool):
if not process.is_alive() and not shutting_down:
process.join()
logger.info("Worker process exited (recycling), spawning replacement.")
copied_socket = socket.try_clone()
new_process = Process(
target=spawn_process,
args=(
directories,
request_headers,
routes,
global_middlewares,
route_middlewares,
web_sockets,
event_handlers,
copied_socket,
workers,
response_headers,
excluded_response_headers_paths,
client_timeout,
keep_alive_timeout,
max_requests,
),
)
new_process.start()
process_pool[i] = new_process
time.sleep(5)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment on lines +82 to +110
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Normalize max_requests before these truthiness branches.

Line 82 and Line 136 use truthiness, but src/server.rs enables recycling for any non-None value. 0 therefore disables supervision here while still enabling immediate shutdown in Rust. Line 136 also forces all Windows runs through the inline path, so there is no supervisor to replace that exit. Both cases can turn recycling into a permanent shutdown.

💡 Minimal guardrail
 def run_processes(
@@
     keep_alive_timeout: int = 20,
     max_requests: Optional[int] = None,
 ) -> List[Process]:
+    if max_requests is not None and max_requests <= 0:
+        max_requests = None
@@
-    if max_requests and max_requests > 0 and len(process_pool) > 0:
+    if max_requests is not None and len(process_pool) > 0:
         while not shutting_down:
             ...
 def init_processpool(
@@
     keep_alive_timeout: int = 20,
     max_requests: Optional[int] = None,
 ) -> List[Process]:
+    if max_requests is not None and max_requests <= 0:
+        max_requests = None
+    if sys.platform.startswith("win32") and max_requests is not None:
+        raise RuntimeError("--max-requests is not supported on Windows yet")
@@
-    if sys.platform.startswith("win32") or (processes == 1 and not max_requests):
+    if sys.platform.startswith("win32") or (processes == 1 and max_requests is None):
         spawn_process(...)

Also applies to: 133-152

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@robyn/processpool.py` around lines 82 - 110, The truthy checks on
max_requests are wrong: normalize by testing for None explicitly so a value of 0
does not disable supervision; replace conditions like "if max_requests and
max_requests > 0 and len(process_pool) > 0:" with "if max_requests is not None
and len(process_pool) > 0:" (and similarly update the other branch around the
Windows/inline path at the block referencing process_pool and spawn_process) so
recycling supervision is enabled for any non-None max_requests value.

⚠️ Potential issue | 🟠 Major

Worker replacement starts late enough to cause outages.

The parent only notices exited workers on the next 5-second sleep boundary at Line 110. In processes=1 mode that is a full outage on every recycle; if several workers hit the limit together, it can briefly drop the whole server. Please wait on process sentinels, or at least poll much more frequently, so replacements start immediately.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@robyn/processpool.py` around lines 82 - 110, The parent loop currently sleeps
for 5 seconds which delays detection of exited workers and causes outages;
change the recycling loop so exited processes are reaped and replaced
immediately by calling process.join() as soon as process.is_alive() is False (or
use process.join(timeout=0) to non-blockingly reap), spawn the replacement
Process (using Process(target=spawn_process, ...), socket.try_clone(), etc.)
right away, and reduce or remove the coarse time.sleep(5) (replace with a short
poll interval like 0.1s if needed) so worker replacement happens immediately
instead of waiting on the 5s boundary.

else:
for process in process_pool:
process.join()

return process_pool

Expand All @@ -84,9 +130,10 @@ def init_processpool(
excluded_response_headers_paths: Optional[List[str]],
client_timeout: int = 30,
keep_alive_timeout: int = 20,
max_requests: Optional[int] = None,
) -> List[Process]:
process_pool: List = []
if sys.platform.startswith("win32") or processes == 1:
if sys.platform.startswith("win32") or (processes == 1 and not max_requests):
spawn_process(
directories,
request_headers,
Expand All @@ -101,11 +148,12 @@ def init_processpool(
excluded_response_headers_paths,
client_timeout,
keep_alive_timeout,
max_requests,
)

return process_pool

for _ in range(processes):
for _ in range(max(1, processes)):
copied_socket = socket.try_clone()
process = Process(
target=spawn_process,
Expand All @@ -123,6 +171,7 @@ def init_processpool(
excluded_response_headers_paths,
client_timeout,
keep_alive_timeout,
max_requests,
),
)
process.start()
Expand Down Expand Up @@ -161,6 +210,7 @@ def spawn_process(
excluded_response_headers_paths: Optional[List[str]],
client_timeout: int = 30,
keep_alive_timeout: int = 20,
max_requests: Optional[int] = None,
):
"""
This function is called by the main process handler to create a server runtime.
Expand All @@ -175,14 +225,13 @@ def spawn_process(
:param socket SocketHeld: This is the main tcp socket, which is being shared across multiple processes.
:param process_name string: This is the name given to the process to identify the process
:param workers int: This is the name given to the process to identify the process
:param max_requests Optional[int]: Recycle this worker after N requests
"""

loop = initialize_event_loop()

server = Server()

# TODO: if we remove the dot access
# the startup time will improve in the server
for directory in directories:
server.add_directory(*directory.as_list())

Expand Down Expand Up @@ -218,8 +267,10 @@ def spawn_process(
)

try:
server.start(socket, workers)
server.start(socket, workers, max_requests)
loop = asyncio.get_event_loop()
loop.run_forever()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
except KeyboardInterrupt:
pass
finally:
loop.close()
6 changes: 5 additions & 1 deletion robyn/robyn.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ from typing import Callable, Optional, Union
def get_version() -> str:
pass

def get_request_count() -> int:
"""Returns the total number of HTTP requests handled by this worker process."""
pass

class SocketHeld:
def __init__(self, url: str, port: int):
pass
Expand Down Expand Up @@ -514,7 +518,7 @@ class Server:
use_channel: bool,
) -> None:
pass
def start(self, socket: SocketHeld, workers: int, client_timeout: int, keep_alive_timeout: int) -> None:
def start(self, socket: SocketHeld, workers: int, max_requests: int | None = None) -> None:
pass

class WebSocketConnector:
Expand Down
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,16 @@ fn get_version() -> String {
env!("CARGO_PKG_VERSION").into()
}

#[pyfunction]
fn get_request_count() -> u64 {
server::get_request_count()
}

#[pymodule]
pub fn robyn(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
// the pymodule class/function to make the rustPyFunctions available
m.add_function(wrap_pyfunction!(get_version, m)?)?;
m.add_function(wrap_pyfunction!(get_request_count, m)?)?;

m.add_class::<Server>()?;
m.add_class::<Headers>()?;
Expand Down
91 changes: 86 additions & 5 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use std::{env, thread};

use actix_files::Files;
use actix_http::KeepAlive;
use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
use actix_web::*;
use futures_util::future::{ok, LocalBoxFuture, Ready};
use std::task::{Context, Poll};

use log::error;
use once_cell::sync::OnceCell;
Expand All @@ -40,6 +43,55 @@ const MAX_PAYLOAD_SIZE: &str = "ROBYN_MAX_PAYLOAD_SIZE";
const DEFAULT_MAX_PAYLOAD_SIZE: usize = 1_000_000; // 1Mb

static STARTED: AtomicBool = AtomicBool::new(false);
static REQUEST_COUNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);

pub fn get_request_count() -> u64 {
REQUEST_COUNT.load(Relaxed)
}

struct RequestCounter;

impl<S, B> Transform<S, ServiceRequest> for RequestCounter
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type InitError = ();
type Transform = RequestCounterMiddleware<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;

fn new_transform(&self, service: S) -> Self::Future {
ok(RequestCounterMiddleware { service })
}
}

struct RequestCounterMiddleware<S> {
service: S,
}

impl<S, B> Service<ServiceRequest> for RequestCounterMiddleware<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&self, req: ServiceRequest) -> Self::Future {
REQUEST_COUNT.fetch_add(1, Relaxed);
let fut = self.service.call(req);
Box::pin(fut)
}
}

#[derive(Clone)]
struct Directory {
Expand Down Expand Up @@ -81,11 +133,16 @@ impl Server {
}
}

pub fn get_request_count(&self) -> u64 {
get_request_count()
}

pub fn start(
&mut self,
_py: Python,
socket: PyRef<SocketHeld>,
workers: usize,
max_requests: Option<u64>,
) -> PyResult<()> {
pyo3_log::init();

Expand Down Expand Up @@ -144,8 +201,8 @@ impl Server {
const_router.bake_global_headers(&global_response_headers);
}

HttpServer::new(move || {
let mut app = App::new();
let server = HttpServer::new(move || {
let mut app = App::new().wrap(RequestCounter);

let directories = directories.read().unwrap();

Expand Down Expand Up @@ -280,9 +337,33 @@ impl Server {
.client_request_timeout(std::time::Duration::from_secs(0))
.listen(raw_socket.into())
.unwrap()
.run()
.await
.unwrap();
.run();

let server_handle = server.handle();

if let Some(limit) = max_requests {
let handle = server_handle.clone();
actix_web::rt::spawn(async move {
loop {
actix_web::rt::time::sleep(std::time::Duration::from_secs(5)).await;
if REQUEST_COUNT.load(Relaxed) >= limit {
log::info!(
"Max requests ({}) reached, worker shutting down for recycling.",
limit
);
handle.stop(true).await;
break;
}
}
});
Comment on lines +344 to +358
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The recycle trigger is enforced far too late under load.

Line 348 only wakes every 5 seconds, so a hot worker can serve far more than max_requests before Line 354 starts shutdown. That weakens the leak-containment goal of this feature. You already have the exact threshold crossing at Line 90 via fetch_add; using that one-shot transition would keep the limit meaningful.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server.rs` around lines 344 - 358, The periodic 5s polling allows a hot
worker to far overshoot max_requests; instead, detect the threshold immediately
when incrementing REQUEST_COUNT: in the code path that does
REQUEST_COUNT.fetch_add(1, Relaxed) (the one-shot transition referenced), check
the returned previous value and if previous + 1 >= max_requests (or previous <
max_requests && previous + 1 == max_requests) call
server_handle.clone().stop(true).await (or otherwise schedule an immediate
shutdown) so the worker begins recycling right when the limit is crossed; keep
or remove the existing spawn loop but ensure the atomic fetch_add branch is the
primary immediate trigger.

}
Comment thread
sansyrox marked this conversation as resolved.

server.await.unwrap();

if max_requests.is_some() {
log::info!("Actix server stopped, exiting worker process for recycling.");
std::process::exit(0);
}
Comment on lines +361 to +366
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Show the recycle exit path and the only Python-side shutdown/cleanup locations.
rg -n -C2 'std::process::exit\(0\)|shutdown_handler|run_forever|loop.close' src/server.rs robyn/processpool.py

Repository: sparckles/Robyn

Length of output: 2553


🏁 Script executed:

#!/bin/bash
# Get broader context around the exit call and the event loop handling
sed -n '355,380p' src/server.rs | cat -n

Repository: sparckles/Robyn

Length of output: 1215


🏁 Script executed:

#!/bin/bash
# Also check the full structure of the run method to understand task spawning/awaiting
sed -n '150,175p' src/server.rs | cat -n

Repository: sparckles/Robyn

Length of output: 1298


🏁 Script executed:

#!/bin/bash
# Get full context of the run method structure, including what spawns/awaits the async block
sed -n '140,200p' src/server.rs | cat -n

Repository: sparckles/Robyn

Length of output: 2774


🏁 Script executed:

#!/bin/bash
# Check what happens in the Python processpool.py around the run_forever and finally block
sed -n '265,285p' robyn/processpool.py | cat -n

Repository: sparckles/Robyn

Length of output: 426


Hard exit(0) from the spawned Actix thread terminates the process before Python shutdown handlers and event loop cleanup can execute.

When std::process::exit(0) is called on line 365 within the spawned thread's async block, it kills the entire process immediately. This prevents the Python event loop at line 370 from ever running, which means:

  • The shutdown_handler check at line 372 is never reached
  • The Python event loop cleanup in robyn/processpool.py line 276 (the finally block with loop.close()) is skipped
  • Any registered Events.SHUTDOWN handlers are never invoked

A graceful recycle must signal shutdown to the Python event loop instead of terminating the process directly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server.rs` around lines 361 - 366, The spawned Actix thread must not call
std::process::exit(0) when max_requests.is_some(); instead signal the Python
event loop to perform a graceful shutdown and return from the async block so the
process can continue cleanup. Replace the hard exit in the block that awaits
server (the code referencing server.await and max_requests) with logic that
notifies the Python shutdown mechanism used by your project (e.g., trigger the
existing shutdown_handler or send the same inter-thread signal the Python side
expects) and allow the spawned task to finish normally; ensure the Python-side
shutdown_handler and its event loop cleanup in robyn/processpool.py still run
and that any Events.SHUTDOWN handlers are invoked.

});
});

Expand Down
Loading