Skip to content

Commit 02effe9

Browse files
authored
Merge pull request #99 from Tuntii/fix-job-queue-hol-blocking-17973034078689993578
⚡ Fix Head-of-Line Blocking in InMemory Job Backend
2 parents dbd1ff5 + 27b1fcb commit 02effe9

2 files changed

Lines changed: 86 additions & 8 deletions

File tree

crates/rustapi-jobs/src/backend/memory.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,26 @@ impl JobBackend for InMemoryBackend {
3434
.lock()
3535
.map_err(|_| JobError::BackendError("Lock poisoned".to_string()))?;
3636

37-
// Simple FIFO for now, ignoring run_at logic complexity for basic in-memory
38-
// In reality we should scan for ready jobs
39-
if let Some(job) = q.front() {
37+
let now = chrono::Utc::now();
38+
let mut index_to_remove = None;
39+
40+
// Scan the queue for the first ready job
41+
for (i, job) in q.iter().enumerate() {
4042
if let Some(run_at) = job.run_at {
41-
if run_at > chrono::Utc::now() {
42-
return Ok(None);
43+
if run_at > now {
44+
continue;
4345
}
4446
}
45-
} else {
46-
return Ok(None);
47+
// Found a ready job (no run_at, or run_at <= now)
48+
index_to_remove = Some(i);
49+
break;
4750
}
4851

49-
Ok(q.pop_front())
52+
if let Some(i) = index_to_remove {
53+
Ok(q.remove(i))
54+
} else {
55+
Ok(None)
56+
}
5057
}
5158

5259
async fn complete(&self, _job_id: &str) -> Result<()> {
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
use async_trait::async_trait;
2+
use rustapi_jobs::{EnqueueOptions, InMemoryBackend, Job, JobContext, JobQueue, Result};
3+
use serde::{Deserialize, Serialize};
4+
use std::sync::{Arc, Mutex};
5+
use std::time::Duration;
6+
7+
#[derive(Debug, Clone, Serialize, Deserialize)]
8+
struct SimpleJobData {
9+
id: i32,
10+
}
11+
12+
#[derive(Clone)]
13+
struct SimpleJob {
14+
processed_ids: Arc<Mutex<Vec<i32>>>,
15+
}
16+
17+
#[async_trait]
18+
impl Job for SimpleJob {
19+
const NAME: &'static str = "simple_job";
20+
type Data = SimpleJobData;
21+
22+
async fn execute(&self, _ctx: JobContext, data: Self::Data) -> Result<()> {
23+
self.processed_ids.lock().unwrap().push(data.id);
24+
Ok(())
25+
}
26+
}
27+
28+
#[tokio::test]
29+
async fn test_head_of_line_blocking() {
30+
let backend = InMemoryBackend::new();
31+
let queue = JobQueue::new(backend);
32+
33+
let processed_ids = Arc::new(Mutex::new(Vec::new()));
34+
let job = SimpleJob {
35+
processed_ids: processed_ids.clone(),
36+
};
37+
38+
queue.register_job(job).await;
39+
40+
// 1. Enqueue a job scheduled far in the future (Job 1)
41+
let opts_future = EnqueueOptions::new().delay(Duration::from_secs(3600));
42+
queue
43+
.enqueue_opts::<SimpleJob>(SimpleJobData { id: 1 }, opts_future)
44+
.await
45+
.unwrap();
46+
47+
// 2. Enqueue a job scheduled now (Job 2)
48+
queue
49+
.enqueue::<SimpleJob>(SimpleJobData { id: 2 })
50+
.await
51+
.unwrap();
52+
53+
// 3. Attempt to process one job.
54+
// Job 2 should be picked up because Job 1 is not ready.
55+
let result = queue.process_one().await.unwrap();
56+
57+
// Verify
58+
if result {
59+
// If it processed something, it MUST be Job 2
60+
let ids = processed_ids.lock().unwrap().clone();
61+
assert_eq!(ids.len(), 1);
62+
assert_eq!(
63+
ids[0], 2,
64+
"Should have processed Job 2, but processed {:?}",
65+
ids
66+
);
67+
} else {
68+
// If it returned false, it means it was blocked by Job 1
69+
panic!("Head-of-line blocking detected! Failed to process Job 2 because Job 1 is blocking the queue.");
70+
}
71+
}

0 commit comments

Comments
 (0)