Skip to content

Commit 88c796a

Browse files
committed
fix(engine): handle semaphore and init failures gracefully in async_utils
Fixes potential panics in async utilities: - ConcurrencyLimiter::execute now returns Result<T> instead of T - AsyncOnce::get_or_init now returns Result<T> with proper error handling - concurrent() function now returns Result<Vec<T>> - ConcurrencyLimiter::acquire in ratelimit.rs now returns Result All semaphore acquire calls now use map_err instead of unwrap() to provide descriptive error messages when semaphores are closed. Fixes #5205, #5201
1 parent 2c40753 commit 88c796a

2 files changed

Lines changed: 48 additions & 20 deletions

File tree

src/cortex-engine/src/async_utils.rs

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,17 @@ impl ConcurrencyLimiter {
147147
}
148148

149149
/// Execute with limit.
150-
pub async fn execute<F, Fut, T>(&self, f: F) -> T
150+
///
151+
/// Returns an error if the semaphore is closed.
152+
pub async fn execute<F, Fut, T>(&self, f: F) -> Result<T>
151153
where
152154
F: FnOnce() -> Fut,
153155
Fut: Future<Output = T>,
154156
{
155-
let _permit = self.semaphore.acquire().await.unwrap();
156-
f().await
157+
let _permit = self.semaphore.acquire().await.map_err(|_| {
158+
CortexError::Internal("concurrency limiter semaphore closed unexpectedly".into())
159+
})?;
160+
Ok(f().await)
157161
}
158162

159163
/// Get available permits.
@@ -178,26 +182,36 @@ impl<T: Clone> AsyncOnce<T> {
178182
}
179183

180184
/// Get or initialize.
181-
pub async fn get_or_init<F, Fut>(&self, init: F) -> T
185+
///
186+
/// Returns an error if the internal state is inconsistent (value missing after init flag set).
187+
pub async fn get_or_init<F, Fut>(&self, init: F) -> Result<T>
182188
where
183189
F: FnOnce() -> Fut,
184190
Fut: Future<Output = T>,
185191
{
186192
// Fast path
187193
if *self.initialized.read().await {
188-
return self.value.read().await.clone().unwrap();
194+
return self.value.read().await.clone().ok_or_else(|| {
195+
CortexError::Internal(
196+
"AsyncOnce: value missing despite initialized flag being set".into(),
197+
)
198+
});
189199
}
190200

191201
// Slow path
192202
let mut initialized = self.initialized.write().await;
193203
if *initialized {
194-
return self.value.read().await.clone().unwrap();
204+
return self.value.read().await.clone().ok_or_else(|| {
205+
CortexError::Internal(
206+
"AsyncOnce: value missing despite initialized flag being set".into(),
207+
)
208+
});
195209
}
196210

197211
let value = init().await;
198212
*self.value.write().await = Some(value.clone());
199213
*initialized = true;
200-
value
214+
Ok(value)
201215
}
202216

203217
/// Check if initialized.
@@ -399,7 +413,9 @@ impl<K: std::hash::Hash + Eq + Clone, V: Clone> AsyncCache<K, V> {
399413
}
400414

401415
/// Run futures concurrently with limit.
402-
pub async fn concurrent<F, Fut, T>(items: impl IntoIterator<Item = F>, limit: usize) -> Vec<T>
416+
///
417+
/// Returns an error if the semaphore is closed unexpectedly.
418+
pub async fn concurrent<F, Fut, T>(items: impl IntoIterator<Item = F>, limit: usize) -> Result<Vec<T>>
403419
where
404420
F: FnOnce() -> Fut,
405421
Fut: Future<Output = T>,
@@ -410,12 +426,17 @@ where
410426
for item in items {
411427
let sem = semaphore.clone();
412428
handles.push(async move {
413-
let _permit = sem.acquire().await.unwrap();
414-
item().await
429+
let _permit = sem.acquire().await.map_err(|_| {
430+
CortexError::Internal("concurrent execution semaphore closed unexpectedly".into())
431+
})?;
432+
Ok(item().await)
415433
});
416434
}
417435

418-
futures::future::join_all(handles).await
436+
futures::future::join_all(handles)
437+
.await
438+
.into_iter()
439+
.collect()
419440
}
420441

421442
/// Select the first future to complete.
@@ -503,16 +524,19 @@ mod tests {
503524
}));
504525
}
505526

506-
futures::future::join_all(handles).await;
527+
let results: Vec<_> = futures::future::join_all(handles).await;
528+
for result in results {
529+
assert!(result.is_ok());
530+
}
507531
assert_eq!(*counter.lock().await, 5);
508532
}
509533

510534
#[tokio::test]
511535
async fn test_async_once() {
512536
let once: AsyncOnce<i32> = AsyncOnce::new();
513537

514-
let v1 = once.get_or_init(|| async { 42 }).await;
515-
let v2 = once.get_or_init(|| async { 100 }).await;
538+
let v1 = once.get_or_init(|| async { 42 }).await.unwrap();
539+
let v2 = once.get_or_init(|| async { 100 }).await.unwrap();
516540

517541
assert_eq!(v1, 42);
518542
assert_eq!(v2, 42);
@@ -560,7 +584,7 @@ mod tests {
560584
Box::new(|| Box::pin(async { 2 })),
561585
Box::new(|| Box::pin(async { 3 })),
562586
];
563-
let results = concurrent(items, 2).await;
587+
let results = concurrent(items, 2).await.unwrap();
564588

565589
assert_eq!(results.len(), 3);
566590
}

src/cortex-engine/src/ratelimit.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -341,9 +341,13 @@ impl ConcurrencyLimiter {
341341
}
342342

343343
/// Acquire a permit.
344-
pub async fn acquire(&self) -> ConcurrencyPermit {
345-
let permit = self.semaphore.clone().acquire_owned().await.unwrap();
346-
ConcurrencyPermit { _permit: permit }
344+
///
345+
/// Returns an error if the semaphore is closed.
346+
pub async fn acquire(&self) -> Result<ConcurrencyPermit> {
347+
let permit = self.semaphore.clone().acquire_owned().await.map_err(|_| {
348+
CortexError::Internal("concurrency limiter semaphore closed unexpectedly".into())
349+
})?;
350+
Ok(ConcurrencyPermit { _permit: permit })
347351
}
348352

349353
/// Try to acquire a permit.
@@ -595,8 +599,8 @@ mod tests {
595599
async fn test_concurrency_limiter() {
596600
let limiter = ConcurrencyLimiter::new(2);
597601

598-
let _p1 = limiter.acquire().await;
599-
let _p2 = limiter.acquire().await;
602+
let _p1 = limiter.acquire().await.unwrap();
603+
let _p2 = limiter.acquire().await.unwrap();
600604

601605
// Third should fail immediately
602606
assert!(limiter.try_acquire().is_none());

0 commit comments

Comments
 (0)