Skip to content

Commit bd71e7d

Browse files
author
Simon Morley
committed
fix: switch to std::sync::Mutex for BPF scanner/collector
tokio::sync::Mutex deadlocks when used inside spawn_blocking + Handle::block_on — the async waker can't unpark blocked threads. Since all critical sections are synchronous (std::thread::sleep in SYN sender, raw socket I/O), std::sync::Mutex is correct here. [skip tests]
1 parent 9b28de2 commit bd71e7d

2 files changed

Lines changed: 13 additions & 13 deletions

File tree

src/engine.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::sync::Arc;
99
use std::time::{Duration, Instant};
1010

1111
use chrono::Utc;
12-
use tokio::sync::Mutex;
12+
use std::sync::Mutex;
1313
use uuid::Uuid;
1414

1515
use crate::scanner::collector::DiscoveryCollector;
@@ -286,7 +286,7 @@ impl ScanEngine {
286286
// Catches detachment between engine creation and scan execution —
287287
// the critical gap for long-lived processes (e.g. limpet-timing).
288288
{
289-
let bpf_guard = self.collector.lock().await;
289+
let bpf_guard = self.collector.lock().unwrap();
290290
if let Err(e) = bpf_guard.verify_attached() {
291291
return ScanResult {
292292
request_id: request.request_id,
@@ -314,7 +314,7 @@ impl ScanEngine {
314314

315315
let mut all_probes = Vec::new();
316316
for batch in ports.chunks(batch_size) {
317-
let mut scanner_guard = self.scanner.lock().await;
317+
let mut scanner_guard = self.scanner.lock().unwrap();
318318
scanner_guard.set_profile(scan_profile.clone());
319319
let result = match scanner_guard.send_syn_batch(target_ip, batch) {
320320
Ok(r) => r,
@@ -354,7 +354,7 @@ impl ScanEngine {
354354
let remaining = deadline - now;
355355
tokio::time::sleep(poll_interval.min(remaining)).await;
356356

357-
let bpf_guard = self.collector.lock().await;
357+
let bpf_guard = self.collector.lock().unwrap();
358358
let responded = all_probes
359359
.iter()
360360
.filter(|probe| {
@@ -373,7 +373,7 @@ impl ScanEngine {
373373

374374
// Collect all results in one pass
375375
let collector = DiscoveryCollector::new(timeout);
376-
let bpf_guard = self.collector.lock().await;
376+
let bpf_guard = self.collector.lock().unwrap();
377377
let discovery = collector.collect(&all_probes, &*bpf_guard, target_ip_u32);
378378
drop(bpf_guard);
379379

src/timing/userspace.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
//! No userspace fallback — requires Linux with CAP_BPF + CAP_NET_ADMIN.
66
77
use std::sync::Arc;
8-
use tokio::sync::Mutex;
8+
use std::sync::Mutex;
99

1010
use crate::{TimingRequest, TimingResult};
1111
use std::net::{SocketAddr, ToSocketAddrs};
@@ -47,15 +47,15 @@ async fn poll_bpf_timing_entry(
4747
// Drain AF_XDP ring to prevent overflow and allow XDP to keep redirecting.
4848
// We don't care which frames arrive here — the BPF map is the source of truth.
4949
{
50-
let mut scanner_ref = scanner.lock().await;
50+
let mut scanner_ref = scanner.lock().unwrap();
5151
scanner_ref.poll_rx(0);
5252
}
5353

5454
// Check if this probe's map entry has a response.
5555
// read_timing_v2 returns None when: (a) no entry exists, or (b) only SYN flag set
5656
// (TC recorded the SYN but no response yet). Returns Some once response arrives.
5757
{
58-
let bpf_ref = bpf.lock().await;
58+
let bpf_ref = bpf.lock().unwrap();
5959
if let Some(entry) = bpf_ref.read_timing_v2(dst_ip, dst_port, src_port) {
6060
return Some(entry);
6161
}
@@ -91,7 +91,7 @@ pub async fn collect_timing_samples_raw(
9191
) -> TimingResult {
9292
// Pre-timing BPF health check: verify XDP + TC are still attached.
9393
{
94-
let bpf_ref = bpf.lock().await;
94+
let bpf_ref = bpf.lock().unwrap();
9595
if let Err(e) = bpf_ref.verify_attached() {
9696
return TimingResult::error(
9797
request,
@@ -128,7 +128,7 @@ pub async fn collect_timing_samples_raw(
128128
for _i in 0..sample_count {
129129
// Send a single raw SYN probe via AF_XDP TX
130130
let probe = {
131-
let mut scanner_ref = scanner.lock().await;
131+
let mut scanner_ref = scanner.lock().unwrap();
132132
match scanner_ref.send_single_syn(target_ipv4, dst_port) {
133133
Ok(p) => p,
134134
Err(e) => {
@@ -192,13 +192,13 @@ pub async fn collect_timing_samples_raw(
192192

193193
// Clean up BPF map entry
194194
{
195-
let bpf_ref = bpf.lock().await;
195+
let bpf_ref = bpf.lock().unwrap();
196196
bpf_ref.delete_entry(dst_ip_u32, dst_port, probe.src_port);
197197
}
198198

199199
// Inter-probe jitter delay from stealth profile
200200
let delay_ms = {
201-
let scanner_ref = scanner.lock().await;
201+
let scanner_ref = scanner.lock().unwrap();
202202
scanner_ref.profile().jittered_delay_ms()
203203
};
204204
if delay_ms > 0 {
@@ -215,7 +215,7 @@ pub async fn collect_timing_samples_raw(
215215

216216
let stats = calculate_stats(&samples);
217217
let precision_class = {
218-
let bpf_ref = bpf.lock().await;
218+
let bpf_ref = bpf.lock().unwrap();
219219
let base = bpf_ref.backend().precision_class().to_string();
220220
if skipped_count > 0 {
221221
format!("{base}_degraded")

0 commit comments

Comments
 (0)