Skip to content

Commit 34beb21

Browse files
echobtfactorydroid
andauthored
fix(cortex-collab): track pending reservations in Guards spawn slot limit (#477)
The test_guards_reservation test was failing because reserve_spawn_slot() only checked threads_set.len() for the current count, but that set only contains committed threads. This allowed multiple pending reservations to bypass the limit. Added pending_count atomic counter to track reservations that haven't been committed yet. The check now includes both active threads and pending reservations: current_count + pending >= max_threads. - Added pending_count field to Guards struct - Updated reserve_spawn_slot to check pending count when enforcing limit - Updated commit() to decrement pending_count when reservation becomes active - Updated cancel() and Drop to decrement pending_count when releasing Co-authored-by: Droid Agent <droid@factory.ai>
1 parent e9ea251 commit 34beb21

1 file changed

Lines changed: 14 additions & 3 deletions

File tree

cortex-collab/src/guards.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ pub struct Guards {
2020
/// Total count of spawned threads (includes completed).
2121
total_count: AtomicUsize,
2222

23+
/// Count of pending reservations (not yet committed).
24+
pending_count: AtomicUsize,
25+
2326
/// Maximum allowed concurrent threads.
2427
max_threads: usize,
2528
}
@@ -30,6 +33,7 @@ impl Guards {
3033
Self {
3134
threads_set: Mutex::new(HashSet::new()),
3235
total_count: AtomicUsize::new(0),
36+
pending_count: AtomicUsize::new(0),
3337
max_threads: DEFAULT_MAX_THREADS,
3438
}
3539
}
@@ -39,6 +43,7 @@ impl Guards {
3943
Self {
4044
threads_set: Mutex::new(HashSet::new()),
4145
total_count: AtomicUsize::new(0),
46+
pending_count: AtomicUsize::new(0),
4247
max_threads,
4348
}
4449
}
@@ -48,12 +53,14 @@ impl Guards {
4853
pub async fn reserve_spawn_slot(self: &Arc<Self>) -> Option<SpawnReservation> {
4954
let threads = self.threads_set.lock().await;
5055
let current_count = threads.len();
56+
let pending = self.pending_count.load(Ordering::Acquire);
5157

52-
if current_count >= self.max_threads {
58+
if current_count + pending >= self.max_threads {
5359
return None;
5460
}
5561

56-
// Increment total count
62+
// Increment pending and total count
63+
self.pending_count.fetch_add(1, Ordering::AcqRel);
5764
self.total_count.fetch_add(1, Ordering::AcqRel);
5865

5966
Some(SpawnReservation {
@@ -109,12 +116,15 @@ impl SpawnReservation {
109116
/// This transfers ownership of the slot to the spawned thread.
110117
pub async fn commit(mut self, thread_id: ThreadId) {
111118
self.state.register_spawned_thread(thread_id).await;
119+
// Decrement pending count since it's now committed
120+
self.state.pending_count.fetch_sub(1, Ordering::AcqRel);
112121
self.active = false;
113122
}
114123

115124
/// Cancel the reservation without spawning.
116125
pub fn cancel(mut self) {
117126
if self.active {
127+
self.state.pending_count.fetch_sub(1, Ordering::AcqRel);
118128
self.state.total_count.fetch_sub(1, Ordering::AcqRel);
119129
self.active = false;
120130
}
@@ -124,7 +134,8 @@ impl SpawnReservation {
124134
impl Drop for SpawnReservation {
125135
fn drop(&mut self) {
126136
if self.active {
127-
// If not committed, decrement the count
137+
// If not committed, decrement the counts
138+
self.state.pending_count.fetch_sub(1, Ordering::AcqRel);
128139
self.state.total_count.fetch_sub(1, Ordering::AcqRel);
129140
}
130141
}

0 commit comments

Comments
 (0)