forked from AI-FanGe/OpenAIglasses_for_Navigation
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkflow_blindpath.py
More file actions
3262 lines (2758 loc) · 146 KB
/
workflow_blindpath.py
File metadata and controls
3262 lines (2758 loc) · 146 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# workflow_blindpath.py
# -*- coding: utf-8 -*-
"""
盲道导航工作流 - 纯净版
移除了所有 Redis、Celery 依赖,可以直接集成到任何 Python 应用中
"""
import os
import time
import cv2
import numpy as np
import logging
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
from collections import deque
import torch # 添加这行
from obstacle_detector_client import ObstacleDetectorClient
from audio_player import play_voice_text # 新增
from crosswalk_awareness import CrosswalkAwarenessMonitor, split_combined_voice # 斑马线感知
# 尝试导入 Pillow,用于中文显示
try:
from PIL import Image, ImageDraw, ImageFont
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
Image, ImageDraw, ImageFont = None, None, None
logger = logging.getLogger(__name__)
# ========== 状态常量定义 ==========
STATE_ONBOARDING = "ONBOARDING"
STATE_NAVIGATING = "NAVIGATING"
STATE_MANEUVERING_TURN = "MANEUVERING_TURN"
STATE_AVOIDING_OBSTACLE = "AVOIDING_OBSTACLE"
STATE_LOCKING_ON = "LOCKING_ON"
# ONBOARDING子步骤
ONBOARDING_STEP_ROTATION = "ROTATION"
ONBOARDING_STEP_TRANSLATION = "TRANSLATION"
# 转向子步骤
MANEUVER_STEP_1_ISSUE_COMMAND = "ISSUE_COMMAND"
MANEUVER_STEP_2_WAIT_FOR_SHIFT = "WAIT_FOR_SHIFT"
MANEUVER_STEP_3_ALIGN_ON_NEW_PATH = "ALIGN_ON_NEW_PATH"
# 颜色定义 (BGR格式)
VIS_COLORS = {
"blind_path": (0, 255, 0), # 绿色
"obstacle": (0, 0, 255), # 红色
"crosswalk": (0, 165, 255), # 橙色
"centerline": (0, 255, 255), # 黄色
"target_point": (255, 0, 0), # 蓝色
"turn_point": (128, 0, 128), # 紫色
"pulse_effect": (100, 100, 255) # 淡红色
}
# 障碍物名称映射
_OBSTACLE_NAME_CN = {
'person': '人',
'bicycle': '自行车',
'car': '车',
'motorcycle': '摩托车',
'bus': '公交车',
'truck': '卡车',
'animal': '动物',
'scooter': '电瓶车',
'stroller': '婴儿车',
'dog': '狗',
}
# 动态类别名称列表
DYNAMIC_CLASS_NAMES = {'person', 'bicycle', 'car', 'motorcycle', 'bus', 'truck', 'animal', 'dog'}
@dataclass
class ProcessingResult:
"""处理结果数据类"""
guidance_text: str # 语音引导文本
visualizations: List[Dict[str, Any]] # 可视化元素列表
annotated_image: Optional[np.ndarray] = None # 标注后的图像
state_info: Dict[str, Any] = None # 状态信息
def __post_init__(self):
if self.state_info is None:
self.state_info = {}
class BlindPathNavigator:
"""盲道导航处理器 - 无外部依赖版本"""
def __init__(self, yolo_model=None, obstacle_detector=None):
"""
初始化导航器
:param yolo_model: YOLO分割模型(可选)
:param obstacle_detector: 障碍物检测器(可选)
"""
self.yolo_model = yolo_model
self.obstacle_detector = obstacle_detector
# 状态变量
self.current_state = STATE_ONBOARDING
self.onboarding_step = ONBOARDING_STEP_ROTATION
self.maneuver_step = MANEUVER_STEP_1_ISSUE_COMMAND
self.maneuver_target_info = None
# 光流追踪参数
self.lk_params = dict(
winSize=(15, 15),
maxLevel=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
)
# 特征检测参数
self.feature_params = dict(
maxCorners=100,
qualityLevel=0.05,
minDistance=10,
blockSize=7,
useHarrisDetector=False,
k=0.04
)
# 光流追踪点缓存
self.flow_points = {} # {mask_type: points}
self.flow_grace = {} # {mask_type: grace_count}
self.FLOW_GRACE_MAX = 3 # 【修改】从8帧降低到3帧,快速清除光流遗留
# 中心线平滑缓存
self.centerline_history = [] # 历史中心线数据
self.centerline_history_max = 5 # 保留最近5帧用于平滑
# 多项式系数平滑缓存
self.poly_coeffs_history = [] # 历史多项式系数
self.poly_coeffs_history_max = 8 # 保留最近8帧系数用于平滑
# 转弯检测追踪器
self.turn_detection_tracker = {
'direction': None,
'consecutive_hits': 0,
'last_seen_frame': 0,
'corner_info': None
}
# 转弯冷却
self.turn_cooldown_frames = 0
self.TURN_COOLDOWN_DURATION = 50
# 避障相关
self.avoidance_plan = None
self.avoidance_step_index = 0
self.lock_on_data = None
# 斑马线追踪
self.crosswalk_tracker = {
'stage': 'not_detected',
'consecutive_frames': 0,
'last_area_ratio': 0.0,
'last_bottom_y_ratio': 0.0,
'last_center_x_ratio': 0.5,
'position_announced': False,
'alignment_status': 'not_aligned',
'last_seen_frame': 0,
'last_angle': 0.0
}
# 帧计数器
self.frame_counter = 0
# 直行提示配置 - 支持环境变量
self.guide_interval = float(os.getenv("AIGLASS_STRAIGHT_INTERVAL", "4.0")) # 播报间隔(秒)
self.last_guide_time = 0.0
self.straight_continuous_mode = os.getenv("AIGLASS_STRAIGHT_CONTINUOUS", "1") == "1" # 持续播报模式
self.straight_repeat_limit = int(os.getenv("AIGLASS_STRAIGHT_LIMIT", "2")) # 限制模式下的最大次数
self.straight_repeat_count = 0
# 【新增】方向指令持续播报配置
self.direction_interval = float(os.getenv("AIGLASS_DIRECTION_INTERVAL", "3.0")) # 方向指令间隔(秒)
self.last_direction_time = 0.0
self.last_direction_message = ""
# 打印配置信息
logger.info(f"[BlindPath] 直行播报配置: 间隔={self.guide_interval}秒, "
f"持续模式={self.straight_continuous_mode}, "
f"限制次数={self.straight_repeat_limit}")
logger.info(f"[BlindPath] 方向播报配置: 间隔={self.direction_interval}秒")
# 缓存变量
self.prev_gray = None
self.prev_blind_path_mask = None
self.prev_crosswalk_mask = None
self.prev_obstacle_cache = []
self.last_guidance_message = ""
self.last_detected_obstacles = []
self.last_obstacle_detection_frame = 0
self.last_any_speech_time = 0
# 斑马线准备状态标志
self.crosswalk_ready_announced = False
self.crosswalk_ready_time = 0
# 障碍物语音待播报
self.pending_obstacle_voice = None
# 红绿灯检测
self.traffic_light_detector = None
self.init_traffic_light_detector()
self.traffic_light_history = deque(maxlen=8) # 用于多数表决
self.last_traffic_light_state = "unknown"
self.green_light_announced = False
# 阈值设置
self.CLASS_CONF_THRESHOLDS = {
1: 0.20, # blind_path
0: 0.30 # crosswalk
}
# 导航阈值
# 导航阈值
self.ONBOARDING_ALIGN_THRESHOLD_RATIO = 0.1
self.VP_FIT_ERROR_THRESHOLD = 8.0
self.ONBOARDING_ORIENTATION_THRESHOLD_RAD = np.deg2rad(10)
self.ONBOARDING_CENTER_OFFSET_THRESHOLD_RATIO = 0.15
self.NAV_ORIENTATION_THRESHOLD_RAD = np.deg2rad(10)
self.NAV_CENTER_OFFSET_THRESHOLD_RATIO = 0.15
self.CURVATURE_PROXY_THRESHOLD = 5e-5
# 斑马线切换阈值
self.CROSSWALK_SWITCH_AREA_RATIO = 0.22
self.CROSSWALK_SWITCH_BOTTOM_RATIO = 0.9
self.CROSSWALK_SWITCH_CONSECUTIVE_FRAMES = 10
# 障碍物检测间隔
# 障碍物检测优化参数 - 从环境变量读取,支持性能调优
self.OBSTACLE_DETECTION_INTERVAL = int(os.getenv("AIGLASS_OBS_INTERVAL", "15")) # 默认每5帧检测一次
self.OBSTACLE_CACHE_DURATION_FRAMES = int(os.getenv("AIGLASS_OBS_CACHE_FRAMES", "10")) # 缓存10帧
# 障碍物播报管理
self.last_obstacle_speech = ""
self.last_obstacle_speech_time = 0
self.obstacle_speech_cooldown = 5.0 # 相同障碍物3秒内不重复播报
# 掩码稳定化参数(已禁用光流外推,这些参数不再使用)
self.MASK_STAB_MIN_AREA = int(os.getenv("AIGLASS_MASK_MIN_AREA", "1500"))
self.MASK_STAB_KERNEL = int(os.getenv("AIGLASS_MASK_MORPH", "3"))
self.MASK_MISS_TTL = 0 # 【修改为0】禁用光流外推,完全实时
self.blind_miss_ttl = 0
self.cross_miss_ttl = 0
# 光流跟踪参数
self.flow_iou_threshold = 0.3 # IoU低于此值时重新初始化光流点
# 【新增】盲道YOLO检测间隔
self.BLINDPATH_DETECTION_INTERVAL = int(os.getenv("AIGLASS_BLINDPATH_INTERVAL", "8")) # 每2帧检测一次
self.last_blindpath_detection_frame = 0
self.last_blindpath_mask = None
self.last_crosswalk_mask = None
# 【新增】斑马线感知监控器
self.crosswalk_monitor = CrosswalkAwarenessMonitor()
logger.info("[BlindPath] 斑马线感知监控器已初始化")
logger.info(f"[BlindPath] 盲道检测间隔: 每{self.BLINDPATH_DETECTION_INTERVAL}帧")
def init_traffic_light_detector(self):
"""初始化红绿灯检测器"""
try:
# 首先尝试使用 YOLO 模型检测红绿灯
self.traffic_light_yolo = None
# 如果你有专门的红绿灯模型,在这里加载
# self.traffic_light_yolo = YOLO('path/to/traffic_light_model.pt')
except Exception as e:
logger.info(f"未加载红绿灯YOLO模型: {e}")
def detect_traffic_light(self, image: np.ndarray) -> str:
"""检测红绿灯状态
返回: 'red', 'green', 'yellow', 'unknown'
"""
# 模拟模式(用于测试)
if os.getenv("AIGLASS_SIMULATE_TRAFFIC_LIGHT", "0") == "1":
# 根据帧数模拟红绿灯变化
cycle = (self.frame_counter // 100) % 3
if cycle == 0:
return "red"
elif cycle == 1:
return "yellow"
else:
return "green"
# 如果有 YOLO 模型,优先使用
if self.traffic_light_yolo:
try:
results = self.traffic_light_yolo.predict(image, verbose=False, conf=0.3)
# TODO: 解析 YOLO 结果,判断红绿灯颜色
pass
except:
pass
# 使用 HSV 颜色检测作为后备方案
return self._detect_traffic_light_by_color(image)
def _detect_traffic_light_by_color(self, image: np.ndarray) -> str:
"""基于 HSV 颜色空间检测红绿灯"""
h, w = image.shape[:2]
# 检测图像上半部分和中间部分(红绿灯可能在不同高度)
roi = image[:int(h * 0.7), :] # 扩大检测范围到70%
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
# 提高亮度的图像用于检测(有助于检测较暗的红绿灯)
hsv_bright = hsv.copy()
hsv_bright[:, :, 2] = cv2.add(hsv_bright[:, :, 2], 30) # 增加亮度
# 定义颜色范围(优化后的参数)
# 红色(两个范围,因为红色在 HSV 中跨越 0 度)
lower_red1 = np.array([0, 120, 100])
upper_red1 = np.array([10, 255, 255])
lower_red2 = np.array([170, 120, 100])
upper_red2 = np.array([180, 255, 255])
# 绿色(调整为更宽的范围以适应不同灯光)
lower_green = np.array([40, 60, 60])
upper_green = np.array([90, 255, 255])
# 黄色
lower_yellow = np.array([15, 100, 100])
upper_yellow = np.array([40, 255, 255])
# 创建掩码(同时在原图和增亮图上检测)
mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1)
mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2)
mask_red1_bright = cv2.inRange(hsv_bright, lower_red1, upper_red1)
mask_red2_bright = cv2.inRange(hsv_bright, lower_red2, upper_red2)
mask_red = cv2.bitwise_or(cv2.bitwise_or(mask_red1, mask_red2),
cv2.bitwise_or(mask_red1_bright, mask_red2_bright))
mask_green = cv2.bitwise_or(cv2.inRange(hsv, lower_green, upper_green),
cv2.inRange(hsv_bright, lower_green, upper_green))
mask_yellow = cv2.bitwise_or(cv2.inRange(hsv, lower_yellow, upper_yellow),
cv2.inRange(hsv_bright, lower_yellow, upper_yellow))
# 形态学操作去噪
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_OPEN, kernel)
mask_green = cv2.morphologyEx(mask_green, cv2.MORPH_OPEN, kernel)
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_OPEN, kernel)
# 计算每种颜色的面积
area_red = cv2.countNonZero(mask_red)
area_green = cv2.countNonZero(mask_green)
area_yellow = cv2.countNonZero(mask_yellow)
# 设置最小面积阈值(降低阈值使检测更敏感)
min_area = 30 # 进一步降低阈值
# 添加更详细的调试信息
if hasattr(self, 'frame_counter') and self.frame_counter % 30 == 0:
logger.info(f"[HSV检测] 红:{area_red}, 绿:{area_green}, 黄:{area_yellow}")
# 保存调试图像
if os.getenv("AIGLASS_DEBUG_TRAFFIC_LIGHT", "0") == "1":
debug_dir = "traffic_light_debug"
os.makedirs(debug_dir, exist_ok=True)
cv2.imwrite(f"{debug_dir}/frame_{self.frame_counter}_roi.jpg", roi)
cv2.imwrite(f"{debug_dir}/frame_{self.frame_counter}_red.jpg", mask_red)
cv2.imwrite(f"{debug_dir}/frame_{self.frame_counter}_green.jpg", mask_green)
cv2.imwrite(f"{debug_dir}/frame_{self.frame_counter}_yellow.jpg", mask_yellow)
# 判断颜色(优先级:绿 > 红 > 黄)
if area_green > min_area and area_green > area_red * 0.8: # 绿灯优先
return "green"
elif area_red > min_area and area_red > area_green:
return "red"
elif area_yellow > min_area:
return "yellow"
else:
return "unknown"
def _get_voice_priority(self, guidance_text):
"""获取语音指令的优先级
优先级:障碍物(100) > 转向/平移(50) > 保持直行(10)
"""
if not guidance_text:
return 0
# 障碍物播报 - 最高优先级
obstacle_keywords = ['前方有', '左侧有', '右侧有', '停一下', '注意避让', '障碍物']
for keyword in obstacle_keywords:
if keyword in guidance_text:
return 100
# 转向和平移 - 中等优先级
direction_keywords = ['左转', '右转', '左移', '右移', '向左', '向右', '平移', '微调']
for keyword in direction_keywords:
if keyword in guidance_text:
return 50
# 保持直行 - 最低优先级
if '保持直行' in guidance_text or '继续前进' in guidance_text or '方向正确' in guidance_text:
return 10
# 其他指令 - 默认中等优先级
return 30
def process_frame(self, image: np.ndarray) -> ProcessingResult:
"""
处理单帧图像
:param image: BGR格式的图像
:return: 处理结果
"""
self.frame_counter += 1
# 更新冷却期
if self.turn_cooldown_frames > 0:
self.turn_cooldown_frames -= 1
image_height, image_width = image.shape[:2]
image_center_x = image_width / 2
# 转换为灰度图
curr_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# 可视化元素列表
frame_visualizations = []
guidance_text = ""
# 1. 【修改为实时检测】每帧都进行YOLO检测,不使用缓存
blind_path_mask, crosswalk_mask = self._detect_path_and_crosswalk(image)
# 【调试】检查YOLO检测结果
if self.frame_counter % 30 == 0: # 每30帧打印一次
has_blind = blind_path_mask is not None and np.sum(blind_path_mask > 0) > 0
has_cross = crosswalk_mask is not None and np.sum(crosswalk_mask > 0) > 0
logger.info(f"[YOLO检测] Frame={self.frame_counter}, 盲道={'有' if has_blind else '无'}, "
f"斑马线={'有' if has_cross else '无'}")
if has_cross:
cross_area = np.sum(crosswalk_mask > 0) / crosswalk_mask.size
logger.info(f"[YOLO检测] 斑马线原始面积: {cross_area*100:.2f}%")
# 【修改】保留斑马线检测结果,用于斑马线感知
# crosswalk_mask = None # 不再强制设为None
# 2. 【禁用掩码稳定化和光流外推】直接使用实时检测结果
# 不再使用光流外推,完全实时更新:有就是有,没有就是没有
# blind_path_mask 和 crosswalk_mask 直接使用上面检测的结果
crosswalk_mask_before_stabilize = crosswalk_mask
# 【调试】检查稳定化后的结果
if self.frame_counter % 30 == 0 and crosswalk_mask_before_stabilize is not None:
after_stab = crosswalk_mask is not None and np.sum(crosswalk_mask > 0) > 0
logger.info(f"[掩码稳定化] 斑马线稳定化后: {'有' if after_stab else '无(被过滤)'}")
# 【新增】3. 全程障碍物检测
# 无论在什么状态下,都进行障碍物检测
logger.info(f"[Frame {self.frame_counter}] 开始障碍物检测...")
# 使用缓存策略,但确保所有障碍物都被可视化
if self.frame_counter % self.OBSTACLE_DETECTION_INTERVAL == 0:
detected_obstacles = self._detect_obstacles(image, blind_path_mask)
self.last_detected_obstacles = detected_obstacles
self.last_obstacle_detection_frame = self.frame_counter
logger.info(f"[Frame {self.frame_counter}] 执行了新的障碍物检测,检测到 {len(detected_obstacles)} 个障碍物")
else:
if self.frame_counter - self.last_obstacle_detection_frame < self.OBSTACLE_CACHE_DURATION_FRAMES:
detected_obstacles = self.last_detected_obstacles
logger.info(f"[Frame {self.frame_counter}] 使用缓存的障碍物数据,共 {len(detected_obstacles)} 个障碍物")
else:
detected_obstacles = []
logger.info(f"[Frame {self.frame_counter}] 缓存过期,无障碍物数据")
# 添加所有障碍物的可视化(不只是近距离的)
for i, obs in enumerate(detected_obstacles):
logger.info(f" 障碍物 {i+1}: {obs.get('name', 'unknown')}, "
f"bottom_y_ratio={obs.get('bottom_y_ratio', 0):.2f}, "
f"area_ratio={obs.get('area_ratio', 0):.3f}, "
f"位置=({obs.get('center_x', 0):.0f}, {obs.get('center_y', 0):.0f})")
self._add_obstacle_visualization(obs, frame_visualizations)
# 【新增】检查近距离障碍物并设置语音
self._check_and_set_obstacle_voice(detected_obstacles)
# 【新增】斑马线感知处理
# 先检查crosswalk_mask状态
if crosswalk_mask is not None:
cross_pixels = np.sum(crosswalk_mask > 0)
if cross_pixels > 0:
logger.info(f"[斑马线] 传入monitor: pixels={cross_pixels}, area={cross_pixels/crosswalk_mask.size*100:.2f}%")
else:
logger.info(f"[斑马线] crosswalk_mask全为0,无斑马线")
else:
if self.frame_counter % 30 == 0:
logger.info(f"[斑马线] crosswalk_mask为None")
crosswalk_guidance = self.crosswalk_monitor.process_frame(crosswalk_mask, blind_path_mask)
if crosswalk_guidance:
logger.info(f"[斑马线感知] 检测结果: area={crosswalk_guidance.get('area', 0):.3f}, "
f"should_broadcast={crosswalk_guidance.get('should_broadcast', False)}, "
f"voice={crosswalk_guidance.get('voice_text', 'None')}")
if crosswalk_guidance and crosswalk_guidance['should_broadcast']:
# 将斑马线语音加入待播报列表(通过pending机制)
if not hasattr(self, 'pending_crosswalk_voice'):
self.pending_crosswalk_voice = None
self.pending_crosswalk_voice = crosswalk_guidance
logger.info(f"[斑马线语音] 已设置待播报语音: {crosswalk_guidance['voice_text']}, 优先级{crosswalk_guidance['priority']}")
# 【新增】添加斑马线可视化
if crosswalk_mask is not None:
# 计算可视化数据
total_pixels = crosswalk_mask.size
crosswalk_pixels = np.sum(crosswalk_mask > 0)
area_ratio = crosswalk_pixels / total_pixels
y_coords, x_coords = np.where(crosswalk_mask > 0)
if len(y_coords) > 0:
center_x_ratio = np.mean(x_coords) / crosswalk_mask.shape[1]
center_y_ratio = np.mean(y_coords) / crosswalk_mask.shape[0]
has_occlusion = self.crosswalk_monitor._check_occlusion(crosswalk_mask, blind_path_mask)
# 获取可视化数据
viz_data = self.crosswalk_monitor.get_visualization_data(
crosswalk_mask, area_ratio, center_x_ratio, center_y_ratio, has_occlusion
)
# 添加斑马线mask可视化
self._add_mask_visualization(crosswalk_mask, frame_visualizations,
"crosswalk_mask", viz_data['stage_color'])
# 添加斑马线检测信息可视化
self._add_crosswalk_info_visualization(viz_data, image_height, image_width,
frame_visualizations)
# 【已禁用】4. 更新斑马线追踪器 - 盲道导航不再跳转到斑马线
# self._update_crosswalk_tracker(crosswalk_mask, image_height, image_width)
# 5. 添加路径可视化
# 【恢复】盲道mask可视化
self._add_mask_visualization(blind_path_mask, frame_visualizations, "blind_path_mask", "rgba(0, 255, 0, 0.4)")
# 【斑马线可视化由crosswalk_monitor处理,不在这里添加】
# 【已禁用】5. 根据状态执行不同的导航逻辑 - 盲道导航不再处理斑马线
current_stage = 'not_detected' # 固定为不检测斑马线
# current_stage = self.crosswalk_tracker['stage'] # 已禁用
# 直接进行盲道导航,不检查斑马线状态
if False: # current_stage == 'ready':
# 检查是否已经播报过准备提示
if not hasattr(self, 'crosswalk_ready_announced'):
self.crosswalk_ready_announced = False
self.crosswalk_ready_time = 0
current_time = time.time()
# 检测红绿灯
traffic_light_color = self.detect_traffic_light(image)
self.traffic_light_history.append(traffic_light_color)
# 调试信息
if self.frame_counter % 30 == 0: # 每30帧打印一次
logger.info(f"[红绿灯检测] 当前颜色: {traffic_light_color}, 历史: {list(self.traffic_light_history)}")
# 多数表决,获得稳定的红绿灯状态
if len(self.traffic_light_history) >= 3:
color_counts = {}
for color in self.traffic_light_history:
color_counts[color] = color_counts.get(color, 0) + 1
# 获取出现次数最多的颜色
stable_color = max(color_counts.items(), key=lambda x: x[1])[0]
else:
stable_color = "unknown"
# 添加红绿灯状态可视化
self._add_traffic_light_visualization(
stable_color, frame_visualizations, image_height, image_width
)
# 决定语音播报
if not self.crosswalk_ready_announced:
guidance_text = "已对准, 准备切换过马路模式。"
self.crosswalk_ready_announced = True
self.crosswalk_ready_time = current_time
elif stable_color == "green" and not self.green_light_announced:
guidance_text = "绿灯稳定,开始通行。"
self.green_light_announced = True
elif stable_color == "red":
# 红灯时定期提醒
if current_time - self.crosswalk_ready_time > 5.0:
guidance_text = "正在等待绿灯…"
self.crosswalk_ready_time = current_time
else:
guidance_text = ""
else:
guidance_text = ""
# 添加状态信息
frame_visualizations.append({
"type": "data_panel",
"data": {
"状态": "等待过马路",
"红绿灯": stable_color,
"检测历史": len(self.traffic_light_history)
},
"position": (25, image_height - 120)
})
elif False: # current_stage == 'approaching':
guidance_text = self._handle_crosswalk_approaching(
frame_visualizations, image_height, image_width, image
)
# elif current_stage in ['far', 'not_detected']:
else: # 总是执行盲道导航
# 【已禁用】斑马线提示
# if current_stage == 'far' and not self.crosswalk_tracker['position_announced']:
# guidance_text = "远处发现斑马线,继续直行。"
# self.crosswalk_tracker['position_announced'] = True
if blind_path_mask is None:
guidance_text = ""
# 【移除左上角文字,改为右上角数据面板】
frame_visualizations.append({
"type": "data_panel",
"data": {
"状态": "等待盲道识别"
},
"position": (image_width - 180, 20)
})
else:
guidance_text = self._execute_state_machine(
blind_path_mask, image, frame_visualizations,
image_height, image_width, curr_gray
)
# 6. 更新缓存
self.prev_gray = curr_gray
if blind_path_mask is not None:
self.prev_blind_path_mask = blind_path_mask.copy()
if crosswalk_mask is not None:
self.prev_crosswalk_mask = crosswalk_mask.copy()
# 【改进】语音优先级管理系统
current_time = time.time()
# 收集所有可能的语音指令
voice_candidates = []
# 1. 添加主要导航语音
if guidance_text:
voice_candidates.append({
'text': guidance_text,
'priority': self._get_voice_priority(guidance_text),
'source': 'navigation'
})
# 2. 检查是否有障碍物语音(独立检查,确保最高优先级)
if hasattr(self, 'pending_obstacle_voice'):
if self.pending_obstacle_voice:
voice_candidates.append({
'text': self.pending_obstacle_voice,
'priority': 100, # 障碍物始终最高优先级
'source': 'obstacle'
})
self.pending_obstacle_voice = None # 清除已处理的障碍物语音
# 【新增】检查是否有斑马线语音
if hasattr(self, 'pending_crosswalk_voice'):
if self.pending_crosswalk_voice:
voice_candidates.append({
'text': self.pending_crosswalk_voice['voice_text'],
'priority': self.pending_crosswalk_voice['priority'],
'source': 'crosswalk'
})
self.pending_crosswalk_voice = None # 清除已处理的斑马线语音
# 3. 选择优先级最高的语音
if voice_candidates:
# 按优先级排序,取最高的
voice_candidates.sort(key=lambda x: x['priority'], reverse=True)
selected_voice = voice_candidates[0]
final_guidance_text = selected_voice['text']
# 全局播报冷却(避免任何语音重叠)
MIN_SPEECH_INTERVAL = 1.2 # 任意两条语音间隔至少0.8秒
if hasattr(self, 'last_any_speech_time'):
if current_time - self.last_any_speech_time < MIN_SPEECH_INTERVAL:
final_guidance_text = "" # 太快了,跳过这次播报
# 特殊处理保持直行的节流
if final_guidance_text == "保持直行":
if self.straight_continuous_mode:
# 持续播报模式:只检查时间间隔
if current_time - self.last_guide_time >= self.guide_interval:
self.last_guide_time = current_time
self.straight_repeat_count += 1
self.last_any_speech_time = current_time
else:
final_guidance_text = ""
else:
# 原有的限制模式
if (current_time - self.last_guide_time >= self.guide_interval) and \
(self.straight_repeat_count < self.straight_repeat_limit):
self.last_guide_time = current_time
self.straight_repeat_count += 1
self.last_any_speech_time = current_time
else:
final_guidance_text = ""
elif final_guidance_text and selected_voice['source'] != 'obstacle':
# 【修改】非直行、非障碍物指令 - 支持方向指令持续播报
# 判断是否是方向指令
direction_keywords = ["左转", "右转", "左移", "右移", "向左", "向右", "平移", "微调"]
is_direction = any(keyword in final_guidance_text for keyword in direction_keywords)
if is_direction:
# 方向指令:支持持续播报
if final_guidance_text == self.last_direction_message:
# 同一个方向指令,检查时间间隔
if current_time - self.last_direction_time >= self.direction_interval:
self.last_direction_time = current_time
self.last_any_speech_time = current_time
self.straight_repeat_count = 0
else:
final_guidance_text = "" # 时间间隔不够,跳过
else:
# 新的方向指令,立即播报
self.last_direction_message = final_guidance_text
self.last_direction_time = current_time
self.last_any_speech_time = current_time
self.straight_repeat_count = 0
else:
# 其他指令:只播报一次
if final_guidance_text != self.last_guidance_message:
self.last_guidance_message = final_guidance_text
self.straight_repeat_count = 0
self.last_any_speech_time = current_time
else:
final_guidance_text = ""
elif final_guidance_text and selected_voice['source'] == 'obstacle':
# 障碍物语音总是播报
self.last_any_speech_time = current_time
elif final_guidance_text and selected_voice['source'] == 'crosswalk':
# 斑马线语音总是播报(不受重复检查限制)
self.last_any_speech_time = current_time
# 播报选中的语音
if final_guidance_text:
try:
# 【优化】组合语音只播第一部分,避免队列积压
if selected_voice.get('source') == 'crosswalk' and ',' in final_guidance_text:
voice_parts = split_combined_voice(final_guidance_text)
logger.info(f"[斑马线语音] 组合播报检测到{len(voice_parts)}部分,只播第一部分保持实时")
# 只播放第一部分,后续部分丢弃以保持实时性
if voice_parts:
play_voice_text(voice_parts[0])
logger.info(f"[语音播报] 优先级{selected_voice['priority']}: {voice_parts[0]}")
else:
play_voice_text(final_guidance_text)
logger.info(f"[语音播报] 优先级{selected_voice['priority']}: {final_guidance_text}")
except Exception as e:
logger.error(f"[语音播报] 播放失败: {e}")
else:
final_guidance_text = ""
# 7. 生成标注图像
annotated_image = None
if frame_visualizations:
annotated_image = self._draw_visualizations(image.copy(), frame_visualizations)
else:
annotated_image = image.copy()
# 添加底部指令按钮(显示当前实际播报的语音)
current_instruction = final_guidance_text if final_guidance_text else "等待中..."
annotated_image = self._draw_command_button(annotated_image, current_instruction)
# 8. 返回结果
return ProcessingResult(
guidance_text=guidance_text,
visualizations=frame_visualizations,
annotated_image=annotated_image,
state_info={
"state": self.current_state,
"crosswalk_stage": current_stage,
"frame_count": self.frame_counter
}
)
def _detect_path_and_crosswalk(self, image: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
"""检测盲道和斑马线"""
if self.yolo_model is None:
# 【新增】没有模型时返回模拟数据用于测试
logger.warning("YOLO模型未加载,返回模拟数据")
h, w = image.shape[:2]
# 创建一个模拟的盲道掩码(垂直居中的条带)
blind_path_mask = np.zeros((h, w), dtype=np.uint8)
# 在图像中央创建一个宽度为图像宽度20%的垂直条带
strip_width = int(w * 0.2)
strip_left = (w - strip_width) // 2
blind_path_mask[int(h*0.3):, strip_left:strip_left+strip_width] = 255
return blind_path_mask, None
blind_path_mask = None
crosswalk_mask = None
try:
min_conf = min(self.CLASS_CONF_THRESHOLDS.values())
results = self.yolo_model.predict(image, verbose=False, conf=min_conf, classes=[0, 1])
if (results and results[0] and results[0].masks is not None and
results[0].boxes is not None and len(results[0].masks.data) > 0):
for mask_tensor, conf_tensor, cls_tensor in zip(
results[0].masks.data, results[0].boxes.conf, results[0].boxes.cls
):
class_id = int(cls_tensor.item())
confidence = float(conf_tensor.item())
threshold = self.CLASS_CONF_THRESHOLDS.get(class_id, 1.0)
if confidence >= threshold:
current_mask = self._tensor_to_mask(mask_tensor, image.shape[1], image.shape[0])
if class_id == 1: # 盲道
if blind_path_mask is None:
blind_path_mask = current_mask
else:
blind_path_mask = cv2.bitwise_or(blind_path_mask, current_mask)
elif class_id == 0: # 斑马线
if crosswalk_mask is None:
crosswalk_mask = current_mask
else:
crosswalk_mask = cv2.bitwise_or(crosswalk_mask, current_mask)
except Exception as e:
logger.error(f"YOLO检测失败: {e}")
# 【新增】检测失败时也返回模拟数据
h, w = image.shape[:2]
blind_path_mask = np.zeros((h, w), dtype=np.uint8)
strip_width = int(w * 0.2)
strip_left = (w - strip_width) // 2
blind_path_mask[int(h*0.3):, strip_left:strip_left+strip_width] = 255
return blind_path_mask, crosswalk_mask
def _tensor_to_mask(self, mask_tensor, out_w: int, out_h: int, binarize: bool = True) -> np.ndarray:
"""将张量掩码转换为numpy数组"""
try:
import torch
if not isinstance(mask_tensor, torch.Tensor):
arr = np.asarray(mask_tensor)
if arr.dtype != np.uint8:
arr = (arr > 0.5).astype(np.uint8) * 255 if binarize else (arr * 255.0).astype(np.uint8)
mask_u8 = arr
else:
if mask_tensor.dtype in (torch.bfloat16, torch.float16):
mask_tensor = mask_tensor.to(torch.float32)
if mask_tensor.ndim > 2:
mask_tensor = mask_tensor.squeeze()
if binarize:
mask_tensor = (mask_tensor > 0.5).to(torch.uint8).mul_(255)
mask_u8 = mask_tensor.cpu().numpy()
else:
mask_u8 = (mask_tensor.mul(255).clamp_(0, 255).to(torch.uint8)).cpu().numpy()
if mask_u8.ndim == 3:
mask_u8 = mask_u8.squeeze(-1)
if mask_u8.shape[1] != out_w or mask_u8.shape[0] != out_h:
mask_u8 = cv2.resize(mask_u8, (out_w, out_h), interpolation=cv2.INTER_NEAREST)
return mask_u8
except ImportError:
# 如果没有torch,返回空掩码
return np.zeros((out_h, out_w), dtype=np.uint8)
def _stabilize_mask(self, prev_gray, curr_gray, raw_mask, prev_stable_mask, mask_type):
"""稳定化掩码 - 使用 Lucas-Kanade 光流"""
if mask_type == 'blind_path':
ttl = self.blind_miss_ttl
min_area = self.MASK_STAB_MIN_AREA
else: # crosswalk
ttl = self.cross_miss_ttl
min_area = self.MASK_STAB_MIN_AREA
# 调用新的光流稳定化方法
stable_mask = self._stabilize_seg_mask(
prev_gray, curr_gray, raw_mask, prev_stable_mask,
(curr_gray.shape[1], curr_gray.shape[0]) if curr_gray is not None else (640, 480),
min_area_px=min_area,
morph_kernel=self.MASK_STAB_KERNEL,
mask_type=mask_type
)
if stable_mask is not None:
# 重置TTL
if mask_type == 'blind_path':
self.blind_miss_ttl = self.MASK_MISS_TTL
else:
self.cross_miss_ttl = self.MASK_MISS_TTL
return stable_mask
else:
# 减少TTL
if mask_type == 'blind_path':
self.blind_miss_ttl = max(0, self.blind_miss_ttl - 1)
else:
self.cross_miss_ttl = max(0, self.cross_miss_ttl - 1)
return None
def _stabilize_seg_mask(self, prev_gray, curr_gray, curr_mask, prev_stable_mask,
image_wh, min_area_px=1500, morph_kernel=3, iou_high_thr=0.4, mask_type='',
fast_clear=True):
"""使用 Lucas-Kanade 光流的掩码稳定化实现"""
W, H = image_wh
def _binarize(mask):
if mask is None:
return None
if mask.dtype != np.uint8:
mask = mask.astype(np.uint8)
mask = (mask > 0).astype(np.uint8) * 255
return mask
def _morph_smooth(mask, kernel_size):
if mask is None:
return None
k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,
(max(1, kernel_size), max(1, kernel_size)))
sm = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k, iterations=1)
sm = cv2.morphologyEx(sm, cv2.MORPH_OPEN, k, iterations=1)
return sm
curr_mask_b = _binarize(curr_mask)
prev_mask_b = _binarize(prev_stable_mask)
# 如果没有历史数据,直接返回当前掩码
if prev_mask_b is None or prev_gray is None or curr_gray is None:
return _morph_smooth(curr_mask_b, morph_kernel) if curr_mask_b is not None else None
# 当前帧有检测结果
if curr_mask_b is not None and np.sum(curr_mask_b > 0) >= min_area_px:
# 计算与上一帧的IoU
if prev_mask_b is not None:
inter = np.logical_and(curr_mask_b > 0, prev_mask_b > 0).sum()
union = np.logical_or(curr_mask_b > 0, prev_mask_b > 0).sum()
iou = float(inter) / float(union) if union > 0 else 0.0
# IoU足够高,说明检测稳定,直接使用当前结果
if iou >= iou_high_thr:
return _morph_smooth(curr_mask_b, morph_kernel)
# IoU较低但仍有重叠,进行加权融合
elif iou > 0.1:
# 使用光流预测的掩码
flow_mask = self._predict_mask_with_flow(prev_mask_b, prev_gray, curr_gray)
if flow_mask is not None:
# 根据IoU动态调整权重
# IoU越低,越依赖光流;IoU越高,越依赖当前检测
w_curr = min(0.9, 0.4 + iou) # IoU=0.1时w_curr=0.5, IoU=0.5时w_curr=0.9
w_flow = 1.0 - w_curr
fused = (w_curr * curr_mask_b.astype(np.float32) +
w_flow * flow_mask.astype(np.float32))
fused_bin = (fused >= 128).astype(np.uint8) * 255
# 重新初始化光流点(如果IoU过低)
if iou < self.flow_iou_threshold:
self.flow_points['blind_path'] = None
return _morph_smooth(fused_bin, morph_kernel)
# 没有历史或IoU太低,使用当前检测
return _morph_smooth(curr_mask_b, morph_kernel)
# 当前帧没有检测结果,尝试使用光流外推
else:
# 获取对应的TTL
if mask_type == 'blind_path':
ttl = self.blind_miss_ttl
else:
ttl = self.cross_miss_ttl
# 【修改】当前帧无检测结果,快速清除
if fast_clear and ttl <= 1:
# TTL耗尽,立即返回None,不使用光流
return None
if prev_mask_b is not None and np.sum(prev_mask_b > 0) >= min_area_px and ttl > 0:
# 使用光流预测
flow_mask = self._predict_mask_with_flow(prev_mask_b, prev_gray, curr_gray)
if flow_mask is not None and np.sum(flow_mask > 0) >= min_area_px * 0.5:
return _morph_smooth(flow_mask, morph_kernel)
# 光流失败或超过TTL
return None
def _predict_mask_with_flow(self, prev_mask, prev_gray, curr_gray):
"""使用Lucas-Kanade光流预测掩码位置(改进版)"""
try:
# 方法1:尝试使用凸包方法(参考yolomedia)
if hasattr(self, 'flow_points') and 'blind_path' in self.flow_points:
p0 = self.flow_points['blind_path']
if p0 is not None and len(p0) >= 5:
# 计算光流
p1, st, err = cv2.calcOpticalFlowPyrLK(prev_gray, curr_gray, p0, None, **self.lk_params)