Skip to content

Commit 76fa8e4

Browse files
author
ShellMonster
committed
fix: 修复QPS调度器死锁问题,优化请求获取流程
问题:当就绪队列达到max_prefetch上限时,所有工作线程都卡在 submit()的背压循环中,无法调用get_ready_request()消费队列, 导致死锁。 解决方案: 1. 调整_get_request流程:先尝试从就绪队列获取,再提交新请求 2. submit使用非阻塞模式(block=False),避免背压死锁 3. 新增调度器状态日志,便于监控和调试
1 parent 0970570 commit 76fa8e4

2 files changed

Lines changed: 81 additions & 15 deletions

File tree

feapder/core/parser_control.py

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,28 +72,41 @@ def _get_request(self):
7272
"""
7373
@summary: 获取请求
7474
如果启用了QPS限流,流程为:
75-
1. 从Collector批量取出请求
76-
2. 提交到QPSScheduler进行限流调度
77-
3. 从QPSScheduler的就绪队列获取已获得令牌的请求
75+
1. 优先从就绪队列获取请求(避免死锁)
76+
2. 就绪队列为空时,从Collector取请求提交给调度器
77+
3. 再次尝试从就绪队列获取请求
7878
如果未启用QPS限流,直接从Collector获取
7979
---------
8080
@result: 请求字典,包含request_obj和request_redis,队列为空返回None
8181
"""
8282
if self._qps_scheduler:
8383
# QPS限流模式
84-
# 步骤1:从Collector获取请求并提交给调度器
84+
# 步骤1:优先从就绪队列获取请求(非阻塞,避免死锁)
85+
request = self._qps_scheduler.get_ready_request_nowait()
86+
if request:
87+
return request
88+
89+
# 步骤2:就绪队列为空,从Collector获取请求并提交给调度器
8590
# Collector.get_request() 返回的是 dict: {"request_obj": Request, "request_redis": str}
8691
batch_size = max(1, self._qps_scheduler.max_prefetch // 10)
8792
for _ in range(batch_size):
8893
request_dict = self._collector.get_request()
8994
if request_dict:
90-
# 将请求字典包装后提交给调度器
91-
self._qps_scheduler.submit(request_dict)
95+
# 非阻塞提交,避免背压导致死锁
96+
self._qps_scheduler.submit(request_dict, block=False)
9297
else:
9398
break
9499

95-
# 步骤2:从调度器获取已就绪的请求(与原始模式timeout保持一致)
96-
return self._qps_scheduler.get_ready_request(timeout=1.0)
100+
# 步骤3:从调度器获取已就绪的请求(带超时)
101+
request = self._qps_scheduler.get_ready_request(timeout=1.0)
102+
103+
# 超时返回None时输出调试日志
104+
if request is None and not self._qps_scheduler.is_empty():
105+
log.debug(
106+
f"QPS: 等待就绪请求超时,调度器中仍有 {self._qps_scheduler.pending_count()} 个请求在等待令牌"
107+
)
108+
109+
return request
97110
else:
98111
# 原始模式:直接从Collector获取
99112
return self._collector.get_request()
@@ -526,27 +539,41 @@ def _get_request(self):
526539
"""
527540
@summary: 获取请求
528541
如果启用了QPS限流,流程为:
529-
1. 从MemoryDB批量取出请求
530-
2. 提交到QPSScheduler进行限流调度
531-
3. 从QPSScheduler的就绪队列获取已获得令牌的请求
542+
1. 优先从就绪队列获取请求(避免死锁)
543+
2. 就绪队列为空时,从MemoryDB取请求提交给调度器
544+
3. 再次尝试从就绪队列获取请求
532545
如果未启用QPS限流,直接从MemoryDB获取
533546
---------
534547
@result: 请求对象Request,队列为空返回None
535548
"""
536549
if self._qps_scheduler:
537550
# QPS限流模式
538-
# 步骤1:从MemoryDB批量获取请求并提交给调度器
551+
# 步骤1:优先从就绪队列获取请求(非阻塞,避免死锁)
552+
request = self._qps_scheduler.get_ready_request_nowait()
553+
if request:
554+
return request
555+
556+
# 步骤2:就绪队列为空,从MemoryDB获取请求并提交给调度器
539557
# 批量大小与max_prefetch相关,避免过多请求堆积在调度器中
540558
batch_size = max(1, self._qps_scheduler.max_prefetch // 10)
541559
for _ in range(batch_size):
542560
pending_request = self._memory_db.get_nowait()
543561
if pending_request:
544-
self._qps_scheduler.submit(pending_request)
562+
# 非阻塞提交,避免背压导致死锁
563+
self._qps_scheduler.submit(pending_request, block=False)
545564
else:
546565
break
547566

548-
# 步骤2:从调度器获取已就绪的请求(与原始模式timeout保持一致)
549-
return self._qps_scheduler.get_ready_request(timeout=1.0)
567+
# 步骤3:从调度器获取已就绪的请求(带超时)
568+
request = self._qps_scheduler.get_ready_request(timeout=1.0)
569+
570+
# 超时返回None时输出调试日志
571+
if request is None and not self._qps_scheduler.is_empty():
572+
log.debug(
573+
f"QPS: 等待就绪请求超时,调度器中仍有 {self._qps_scheduler.pending_count()} 个请求在等待令牌"
574+
)
575+
576+
return request
550577
else:
551578
# 原始模式:直接从MemoryDB获取
552579
return self._memory_db.get()

feapder/core/schedulers/qps_scheduler.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ def __init__(
9595
'ready': 0, # 已就绪的请求数
9696
}
9797

98+
# 状态日志控制
99+
self._last_status_log_time = 0
100+
self._status_log_interval = 10 # 每10秒输出一次状态
101+
98102
def start(self) -> None:
99103
"""
100104
@summary: 启动调度器,启动后台调度线程开始处理请求的QPS限流
@@ -150,6 +154,7 @@ def submit(self, request: Any, block: bool = True, timeout: float = None) -> boo
150154
# 检查是否需要背压
151155
if self.max_prefetch > 0:
152156
start_time = time.time()
157+
logged_backpressure = False
153158
while True:
154159
with self._count_lock:
155160
if self._domain_pending_count[domain] < self.max_prefetch:
@@ -162,6 +167,13 @@ def submit(self, request: Any, block: bool = True, timeout: float = None) -> boo
162167
if timeout is not None and (time.time() - start_time) >= timeout:
163168
return False
164169

170+
# 背压阻塞日志(只输出一次,避免刷屏)
171+
if not logged_backpressure:
172+
log.debug(
173+
f"QPS背压: 域名 {domain} 待处理数达到上限 {self.max_prefetch},等待释放..."
174+
)
175+
logged_backpressure = True
176+
165177
# 等待一小段时间后重试
166178
time.sleep(0.01)
167179
else:
@@ -223,6 +235,9 @@ def _scheduler_loop(self) -> None:
223235
# 处理延迟堆(将到期的请求移入就绪队列)
224236
moved = self._process_delay_heap()
225237

238+
# 定期输出状态日志(DEBUG模式下)
239+
self._log_status_if_needed()
240+
226241
# 如果没有任何工作,短暂休眠避免CPU空转
227242
if not processed and not moved:
228243
time.sleep(0.001) # 1ms
@@ -290,6 +305,30 @@ def _process_delay_heap(self) -> int:
290305

291306
return count
292307

308+
def _log_status_if_needed(self) -> None:
309+
"""
310+
@summary: 定期输出调度器状态日志(DEBUG模式下)
311+
---------
312+
@result:
313+
"""
314+
now = time.time()
315+
if now - self._last_status_log_time >= self._status_log_interval:
316+
self._last_status_log_time = now
317+
318+
with self._heap_lock:
319+
delay_heap_size = len(self._delay_heap)
320+
321+
submit_queue_size = self._submit_queue.qsize()
322+
ready_queue_size = self._ready_queue.qsize()
323+
324+
# 只有当有请求在处理时才输出日志
325+
if delay_heap_size > 0 or submit_queue_size > 0 or ready_queue_size > 0:
326+
log.debug(
327+
f"QPS调度器状态: 提交队列={submit_queue_size}, "
328+
f"延迟堆={delay_heap_size}, 就绪队列={ready_queue_size}, "
329+
f"统计={self._stats}"
330+
)
331+
293332
def is_empty(self) -> bool:
294333
"""
295334
@summary: 检查调度器是否为空

0 commit comments

Comments
 (0)