Skip to content

Commit bab5aa2

Browse files
committed
fix(bridge): push pipe 死链主动监听 + token 重连去重
长时间运行(18h)后服务进程出现内存膨胀(>300MB 触发 heap dump) 和 TSF/UI 状态分裂。日志显示: - 35 次 push pipe 连接但无任何 disconnect 事件 - 同 PID 多次重连(PID 59996 七次、24964 四次) - "Failed to push state to client error=The handle is invalid" 120 次 - 用户在 UI 反复点 toggle mode 9 次试图修复状态 根因:服务端没有主动检测客户端断开,旧 handle 漂在 pushClients map 里直到下一次广播写失败才被"惰性发现"。同 PID 重连只覆盖 pushClientsByPID 索引,旧 handle 仍参与广播。 修复用单 goroutine 同时承担握手与死链监听: 1. Phase 1 阻塞读 8 字节 token;同 token 已映射其他 handle 时 主动 cleanupPushHandle+CloseHandle 旧 handle(同 token = 同 CTextService 实例,旧的必无效)。按 token 而非 PID,避免误清 explorer.exe 等宿主里多个合法 CLangBar 实例。 2. Phase 2 继续阻塞 ReadFile,协议规定 token 之后客户端不再发 数据,goroutine park 在内核无 CPU 消耗;对端 close pipe 时 OS 立即唤醒,defer 路径 0 延迟清理。 合并原 2-goroutine + 500ms timeout 设计,顺带修旧版内层 goroutine 在 legacy 客户端不发 token 时永久泄漏的隐患。 cleanupPushHandle 返回 bool 已具备并发安全,新清理路径与广播 失败清理路径共用同一锁,不会双关 handle。
1 parent 0f26a15 commit bab5aa2

1 file changed

Lines changed: 58 additions & 26 deletions

File tree

wind_input/internal/bridge/server_push.go

Lines changed: 58 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -97,39 +97,71 @@ func (s *Server) startPushPipeListener() {
9797
s.logger.Debug("CMD_SERVICE_READY sent to new push client", "clientID", clientID)
9898
}
9999

100-
// 异步读取 token 握手(8 字节),不阻塞主循环。
101-
// 内层 goroutine 做实际 ReadFile(可能永久阻塞于旧版客户端),
102-
// 外层 goroutine 持有 500ms 超时并在超时后退出(内层 goroutine 在 handle 关闭时自然退出)。
100+
// 单 goroutine 完成两件事:
101+
// 1) 阻塞读 8 字节 token 握手
102+
// 2) 握手后继续阻塞 ReadFile,专门用于检测对端关闭(死链监听)
103+
// 协议规定客户端发完 token 后不再发任何消息,所以 ReadFile 在握手后会
104+
// 永久 park 在内核等待,不消耗 CPU;客户端 close pipe 时 OS 立即唤醒
105+
// 并返回错误,我们走 defer 路径清理 handle —— 不再等到下一次广播写失败
106+
// 才"惰性发现"死链。
107+
//
108+
// 同 token 重连:旧 handle 必然失效,必须按 token 主动清理。
109+
// 不能按 PID 清理:同 PID 可能有多个合法实例(如 explorer.exe 的多个
110+
// CLangBar 宿主),它们各自持有不同 token,按 PID 误清会破坏正常推送。
103111
go func(h windows.Handle, pid uint32, cid int) {
104-
tokenCh := make(chan uint64, 1)
105-
go func() {
106-
var buf [8]byte
107-
var n uint32
108-
if err := windows.ReadFile(h, buf[:], &n, nil); err == nil && n == 8 {
109-
tokenCh <- binary.LittleEndian.Uint64(buf[:])
112+
defer func() {
113+
s.pushMu.Lock()
114+
removed := s.cleanupPushHandle(h)
115+
s.pushMu.Unlock()
116+
if removed {
117+
windows.CloseHandle(h)
110118
}
111119
}()
112-
select {
113-
case token := <-tokenCh:
114-
if token == 0 {
115-
return
120+
121+
// Phase 1: token 握手
122+
var buf [8]byte
123+
var n uint32
124+
if err := windows.ReadFile(h, buf[:], &n, nil); err != nil || n == 0 {
125+
s.logger.Info("Push pipe disconnected before token handshake",
126+
"clientID", cid, "processID", pid, "error", err)
127+
return
128+
}
129+
130+
var registeredToken uint64
131+
if n >= 8 {
132+
token := binary.LittleEndian.Uint64(buf[:])
133+
if token != 0 {
134+
s.pushMu.Lock()
135+
if oldH, ok := s.tokenToPushHandle[token]; ok && oldH != h {
136+
if s.cleanupPushHandle(oldH) {
137+
windows.CloseHandle(oldH)
138+
s.logger.Info("Push pipe: stale handle replaced by token reconnect",
139+
"clientID", cid, "processID", pid, "token", token)
140+
}
141+
}
142+
if _, exists := s.pushClients[h]; exists {
143+
s.tokenToPushHandle[token] = h
144+
s.pushHandleToToken[h] = token
145+
registeredToken = token
146+
}
147+
s.pushMu.Unlock()
148+
s.logger.Debug("Push pipe: token registered",
149+
"clientID", cid, "processID", pid, "token", token)
116150
}
117-
s.pushMu.Lock()
118-
if _, exists := s.pushClients[h]; exists {
119-
s.tokenToPushHandle[token] = h
120-
s.pushHandleToToken[h] = token
151+
}
152+
153+
// Phase 2: 死链监听 —— ReadFile 在客户端 close pipe 时立刻返回错误
154+
var probe [16]byte
155+
for {
156+
err := windows.ReadFile(h, probe[:], &n, nil)
157+
if err != nil || n == 0 {
158+
s.logger.Info("Push pipe client disconnected",
159+
"clientID", cid, "processID", pid, "token", registeredToken, "error", err)
160+
return
121161
}
122-
s.pushMu.Unlock()
123-
s.logger.Debug("Push pipe: token registered", "clientID", cid, "processID", pid, "token", token)
124-
case <-time.After(500 * time.Millisecond):
125-
s.logger.Debug("Push pipe: token handshake timed out (old client?)", "clientID", cid, "processID", pid)
162+
// 协议不允许 token 之后再有数据,但万一发生只丢弃并继续监听。
126163
}
127164
}(handle, pushProcessID, clientID)
128-
129-
// Note: We don't actively monitor disconnection here.
130-
// Client disconnection is detected when write fails in PushCommitTextToActiveClient
131-
// or PushStateToAllClients. This avoids false positives from GetNamedPipeHandleState
132-
// which can return "Access is denied" on valid pipes.
133165
}
134166
}
135167

0 commit comments

Comments
 (0)