-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathapp_lifecycle.py
More file actions
146 lines (104 loc) · 4.45 KB
/
Copy pathapp_lifecycle.py
File metadata and controls
146 lines (104 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import ctypes
from ctypes import wintypes
import _thread
import threading
import atexit
import time
from logger import get_logger
logger = get_logger(__name__)
# ----------------- 全局生命周期状态 -----------------
# 退出信号:一旦被 set(),表示程序进入死亡倒计时
_exit_event = threading.Event()
# 暂停信号:一旦被 set(),表示程序业务应当挂起
_pause_event = threading.Event()
# Windows API 句柄引用防回收
_win_handler_ref = None
_cleanup_done_event = threading.Event()
# ----------------- 信号控制 API -----------------
def trigger_exit(reason:str):
"""触发全局退出信号,并打断主线程"""
if _exit_event.is_set():
return
if reason:
logger.warning(f"准备退出程序,原因: {reason} 。")
_exit_event.set()
if threading.current_thread() is not threading.main_thread():
_thread.interrupt_main()
def toggle_pause():
"""切换程序的暂停/恢复状态"""
if _pause_event.is_set():
logger.warning("暂停/恢复热键被按下,Bot 已恢复。")
_pause_event.clear()
else:
logger.warning("暂停/恢复热键被按下,Bot 即将暂停。按 CTRL+F9 恢复。")
_pause_event.set()
def is_exiting() -> bool:
return _exit_event.is_set()
def is_paused() -> bool:
return _pause_event.is_set()
# ----------------- 睡眠函数封装族 -----------------
def sleep_stoppable(duration: float) -> bool:
"""
仅带退出信号检查的休眠。
不需要响应暂停的线程或原子方法应当使用该方法替换 `time.sleep()` 。
该计时器精度低于 `time.sleep()` 。
:return: True 表示正常休眠结束,False 表示被退出信号打断
"""
# event.wait() 返回 True 表示事件被 set 了 (即收到了退出信号)
is_interrupted = _exit_event.wait(timeout=duration)
return not is_interrupted
def sleep_smart(duration: float) -> bool:
"""
同时带有退出信号检查和暂停检查的 sleep 方法。
如果在休眠中发生暂停,计时器不走,直到恢复后继续补足剩余的休眠时间。
需要响应暂停的线程应当使用该方法替换 `time.sleep()` 。
该计时器精度低于 `sleep_stoppable()` 。
:return: True 表示正常休眠结束,False 表示被退出信号打断
"""
remaining = duration
while remaining > 0:
# 检查是否需要退出
if _exit_event.is_set():
return False
# 检查是否被暂停
if _pause_event.is_set():
# 同样等待退出信号
_exit_event.wait(0.5)
continue # 暂停期间,remaining 计时器不减少
# 正常休眠:分块睡眠,避免长时间阻塞
chunk = min(remaining, 0.5)
start = time.monotonic()
_exit_event.wait(timeout=chunk)
elapsed = time.monotonic() - start
remaining -= elapsed
return not _exit_event.is_set()
# ----------------- Windows 事件拦截 -----------------
def _mark_cleanup_done():
"""
注册在 atexit 的最后,一旦执行到这里,说明其他 atexit 和 finally 都已经跑完了。
注意 atexit 是 FILO ,因此该方法应当第一个注册到 atexit 。
"""
_cleanup_done_event.set()
def _console_ctrl_handler(ctrl_type):
"""拦截关闭窗口事件,调用退出方法以释放资源,而不是直接关闭程序"""
CTRL_CLOSE_EVENT = 2
if ctrl_type == CTRL_CLOSE_EVENT:
# logger.warning("接收到窗口关闭信号...")
trigger_exit("接收到系统关闭信号 (WM_CLOSE)")
# 给主线程争取清理时间 (最大等 4.5 秒,避免 5 秒后被系统直接杀死)
_cleanup_done_event.wait(timeout=4.5)
return True
return False
def init_lifecycle_manager():
"""
初始化生命周期管理器,注册系统回调。
注意早于该方法执行的 atexit 回调在接收到 WM_CLOSE 信号将不会被执行。
"""
global _win_handler_ref
# 将 Windows 的等待锁的释放注册到 atexit 链中
# 因为 atexit 执行回调时是 FILO 的,所以在 WM_CLOSE 时只有晚于该语句注册的回调才能执行
atexit.register(_mark_cleanup_done)
# 注册拦截关闭事件
HandlerRoutine = ctypes.WINFUNCTYPE(wintypes.BOOL, wintypes.DWORD)
_win_handler_ref = HandlerRoutine(_console_ctrl_handler)
ctypes.windll.kernel32.SetConsoleCtrlHandler(_win_handler_ref, True)