Skip to content

Commit cb3b86c

Browse files
Improve channel capacity handling.
1 parent a771bd8 commit cb3b86c

File tree

1 file changed

+83
-10
lines changed

1 file changed

+83
-10
lines changed

ext/hyper_ruby/src/lib.rs

Lines changed: 83 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ struct ServerConfig {
7878
tokio_threads: Option<usize>,
7979
debug: bool,
8080
recv_timeout: u64,
81+
channel_capacity: usize,
82+
send_timeout: u64,
8183
}
8284

8385
impl ServerConfig {
@@ -87,6 +89,8 @@ impl ServerConfig {
8789
tokio_threads: None,
8890
debug: false,
8991
recv_timeout: 30000, // Default 30 second timeout
92+
channel_capacity: 5000, // Default capacity for worker channel
93+
send_timeout: 1000, // Default 1 second timeout for send backpressure
9094
}
9195
}
9296
}
@@ -109,10 +113,11 @@ struct Server {
109113

110114
impl Server {
111115
pub fn new() -> Self {
112-
let (work_tx, work_rx) = crossbeam_channel::bounded(1000);
116+
let config = ServerConfig::new();
117+
let (work_tx, work_rx) = crossbeam_channel::bounded(config.channel_capacity);
113118
Self {
114119
server_handle: Arc::new(Mutex::new(None)),
115-
config: RefCell::new(ServerConfig::new()),
120+
config: RefCell::new(config),
116121
work_rx: RefCell::new(Some(work_rx)),
117122
work_tx: RefCell::new(Some(Arc::new(work_tx))),
118123
runtime: RefCell::new(None),
@@ -138,6 +143,14 @@ impl Server {
138143
server_config.recv_timeout = u64::try_convert(recv_timeout)?;
139144
}
140145

146+
if let Some(channel_capacity) = config.get(magnus::Symbol::new("channel_capacity")) {
147+
server_config.channel_capacity = usize::try_convert(channel_capacity)?;
148+
}
149+
150+
if let Some(send_timeout) = config.get(magnus::Symbol::new("send_timeout")) {
151+
server_config.send_timeout = u64::try_convert(send_timeout)?;
152+
}
153+
141154
// Initialize logging if not already initialized
142155
LOGGER_INIT.call_once(|| {
143156
let mut builder = env_logger::Builder::from_env(env_logger::Env::default());
@@ -342,7 +355,7 @@ impl Server {
342355
let work_tx = work_tx.clone();
343356
let conn = builder.serve_connection(io, service_fn(move |req: HyperRequest<Incoming>| {
344357
debug!("Service handling request");
345-
handle_request(req, work_tx.clone(), config.recv_timeout)
358+
handle_request(req, work_tx.clone(), config.recv_timeout, config.send_timeout)
346359
}));
347360
let fut = graceful_shutdown.watch(conn.into_owned());
348361
tokio::task::spawn(async move {
@@ -412,6 +425,7 @@ async fn handle_request(
412425
req: HyperRequest<Incoming>,
413426
work_tx: Arc<crossbeam_channel::Sender<RequestWithCompletion>>,
414427
recv_timeout: u64,
428+
send_timeout: u64,
415429
) -> Result<HyperResponse<BodyWithTrailers>, Error> {
416430
debug!("Received request: {:?}", req);
417431
debug!("HTTP version: {:?}", req.version());
@@ -448,13 +462,62 @@ async fn handle_request(
448462
response_tx,
449463
};
450464

451-
if work_tx.send(with_completion).is_err() {
452-
warn!("Failed to send request to worker");
453-
return Ok(if is_grpc {
454-
grpc::create_grpc_error_response(500, 13, "Failed to process request")
455-
} else {
456-
create_error_response("Failed to process request")
457-
});
465+
// First try non-blocking send
466+
match work_tx.try_send(with_completion) {
467+
Ok(()) => {
468+
debug!("Successfully queued request (fast path)");
469+
},
470+
Err(crossbeam_channel::TrySendError::Full(completion)) => {
471+
// Channel is full, fall back to blocking send with timeout
472+
debug!("Channel full, attempting send with timeout");
473+
474+
// Use send_timeout to implement backpressure
475+
let send_result = tokio::task::spawn_blocking(move || {
476+
work_tx.send_timeout(
477+
completion,
478+
std::time::Duration::from_millis(send_timeout),
479+
)
480+
}).await;
481+
482+
match send_result {
483+
Ok(Ok(())) => {
484+
debug!("Successfully queued request after timeout wait");
485+
},
486+
Ok(Err(crossbeam_channel::SendTimeoutError::Timeout(_))) => {
487+
// Timeout waiting for channel space
488+
warn!("Channel full after timeout - returning 429 Too Many Requests");
489+
return Ok(if is_grpc {
490+
grpc::create_grpc_error_response(429, 8, "Server too busy, try again later") // RESOURCE_EXHAUSTED = 8
491+
} else {
492+
create_too_many_requests_response("Server too busy, try again later")
493+
});
494+
},
495+
Ok(Err(crossbeam_channel::SendTimeoutError::Disconnected(_))) => {
496+
warn!("Worker channel disconnected");
497+
return Ok(if is_grpc {
498+
grpc::create_grpc_error_response(500, 13, "Server shutting down")
499+
} else {
500+
create_error_response("Server shutting down")
501+
});
502+
},
503+
Err(_) => {
504+
warn!("Task to send request failed");
505+
return Ok(if is_grpc {
506+
grpc::create_grpc_error_response(500, 13, "Internal server error")
507+
} else {
508+
create_error_response("Internal server error")
509+
});
510+
}
511+
}
512+
},
513+
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
514+
warn!("Worker channel disconnected");
515+
return Ok(if is_grpc {
516+
grpc::create_grpc_error_response(500, 13, "Server shutting down")
517+
} else {
518+
create_error_response("Server shutting down")
519+
});
520+
}
458521
}
459522

460523
match response_rx.await {
@@ -493,6 +556,16 @@ fn create_error_response(error_message: &str) -> HyperResponse<BodyWithTrailers>
493556
.unwrap()
494557
}
495558

559+
// Helper function to create too many requests responses
560+
fn create_too_many_requests_response(error_message: &str) -> HyperResponse<BodyWithTrailers> {
561+
let builder = HyperResponse::builder()
562+
.status(StatusCode::TOO_MANY_REQUESTS)
563+
.header("content-type", "text/plain");
564+
565+
builder.body(BodyWithTrailers::new(Bytes::from(error_message.to_string()), None))
566+
.unwrap()
567+
}
568+
496569
#[magnus::init]
497570
fn init(ruby: &Ruby) -> Result<(), MagnusError> {
498571
let module = ruby.define_module("HyperRuby")?;

0 commit comments

Comments
 (0)