Skip to content

Commit 6544115

Browse files
committed
perf(bridge): push pipe 改为 per-client outbound channel + 单 writer goroutine
旧设计每次广播都 go func() 写入 push pipe,slow/dead client 让广播 goroutine 无限堆积。pprof 曾观测到 725 个 goroutine 堵在 pipeWriter.Write 的 mutex 上, 长跑后服务进程 Private Bytes 持续上涨。 bab5aa2 已经修了死链主动监听这一面,但活客户端读得慢 / 多客户端场景下 广播写入仍可能堆 goroutine。 本提交重构 push 写路径: 1. pipeWriter 增加 outbound chan []byte (容量 16) + shutdown()/enqueueBroadcast() 2. push client 注册时启动单个 pushWriterLoop 串行消费 outbound 3. PushStateToAllClients / pushSyncConfigToAllClients 改为非阻塞 enqueue: 队列满则直接 drop+log(状态/配置同步幂等,下次推就是最新值) 4. cleanupPushHandle / RestartService 关闭 outbound 让 writer 自然退出 5. 定向 push(PushCommitText/PushClearComposition/PushUpdateComposition)保持 直接 WriteMessage——不能丢 收益:每个 push client 至多 1 个 writer goroutine,不会随广播次数线性增长。 slow client 不再影响其他 client。 隐患遗留:pipeWriter.Write 仍同步 WriteFile,活但读得慢的 client 会让单个 writer goroutine 卡死,待后续 overlapped I/O + 超时重构。
1 parent 3f7f84b commit 6544115

2 files changed

Lines changed: 118 additions & 35 deletions

File tree

wind_input/internal/bridge/server.go

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -410,12 +410,39 @@ func (r *pipeReader) release() {
410410

411411
// pipeWriter wraps windows.Handle for io.Writer.
412412
// mu serializes concurrent WriteFile calls on the same handle:
413-
// broadcast goroutines (PushStateToAllClients) may write to the
414-
// same client handle in parallel, and Windows named-pipe writes
415-
// are not guaranteed to be thread-safe without serialization.
413+
// the per-client push writer goroutine (drains outbound) and targeted
414+
// sync sends (PushCommitText 等) can both write to the same handle.
415+
// Windows 命名管道写入未保证线程安全,必须 Mutex 互斥。
416+
//
417+
// outbound 仅 push pipe 客户端非 nil。它把"广播"路径变成
418+
// per-client 单 writer goroutine:
419+
// - 旧设计每次广播都 go func(),slow client 会导致 goroutine 堆到数百个
420+
// (历史 pprof 见 725 个 stuck),且无法 drop。
421+
// - 新设计每个 push client 仅一个 writer goroutine。enqueueBroadcast 满则丢弃
422+
// (状态/配置同步语义幂等,下次推就是最新值,丢一条无害)。
423+
//
424+
// TODO(隐患1): WriteFile 当前仍是同步阻塞。slow client(活着但读得慢)会让
425+
// 该 client 的 writer goroutine 卡死在内核里。后续需改成 overlapped I/O +
426+
// GetOverlappedResultEx 超时 + CancelIoEx,前提是 push pipe 改用
427+
// FILE_FLAG_OVERLAPPED。
416428
type pipeWriter struct {
417-
handle windows.Handle
418-
mu sync.Mutex
429+
handle windows.Handle
430+
mu sync.Mutex
431+
outbound chan []byte
432+
closeOnce sync.Once
433+
}
434+
435+
// pushOutboundBufferSize: per-client 广播队列容量。
436+
// 状态推送/配置同步在快速 toggle 场景下可能短时连发;16 给一个不易满的窗口,
437+
// 真挂 client 时也能快速识别为"持续 drop"并丢弃,不会无限制堆积。
438+
const pushOutboundBufferSize = 16
439+
440+
// newPushPipeWriter creates a pipeWriter for push pipe clients with an outbound queue.
441+
func newPushPipeWriter(h windows.Handle) *pipeWriter {
442+
return &pipeWriter{
443+
handle: h,
444+
outbound: make(chan []byte, pushOutboundBufferSize),
445+
}
419446
}
420447

421448
func (w *pipeWriter) Write(p []byte) (int, error) {
@@ -428,3 +455,27 @@ func (w *pipeWriter) Write(p []byte) (int, error) {
428455
}
429456
return int(bytesWritten), nil
430457
}
458+
459+
// enqueueBroadcast 非阻塞地把一条广播消息丢到该 client 的 outbound 队列。
460+
// 返回 false 表示该 client 队列已满(client 卡顿或已死),调用方应当 drop+log。
461+
// 调用方不需要持有任何锁。
462+
func (w *pipeWriter) enqueueBroadcast(msg []byte) bool {
463+
if w == nil || w.outbound == nil {
464+
return false
465+
}
466+
select {
467+
case w.outbound <- msg:
468+
return true
469+
default:
470+
return false
471+
}
472+
}
473+
474+
// shutdown 关闭 outbound 队列,writer goroutine 在 drain 完后 range 退出。
475+
// 多次调用安全(closeOnce)。bridge pipe 写入器(outbound 为 nil)调用为 no-op。
476+
func (w *pipeWriter) shutdown() {
477+
if w == nil || w.outbound == nil {
478+
return
479+
}
480+
w.closeOnce.Do(func() { close(w.outbound) })
481+
}

wind_input/internal/bridge/server_push.go

Lines changed: 62 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (s *Server) startPushPipeListener() {
6363
continue
6464
}
6565

66-
writer := &pipeWriter{handle: handle}
66+
writer := newPushPipeWriter(handle)
6767

6868
// Get the client's process ID for targeted push
6969
pushProcessID, err := getNamedPipeClientProcessId(handle)
@@ -89,6 +89,8 @@ func (s *Server) startPushPipeListener() {
8989
s.logger.Info("Push pipe client connected", "clientID", clientID, "processID", pushProcessID)
9090

9191
// Notify the newly-connected TSF client that the service is ready.
92+
// 在启动 writer goroutine 之前同步发送,确保 SERVICE_READY 是该 client
93+
// 收到的第一条消息(不会被后续 enqueueBroadcast 抢前面去)。
9294
encoded := s.codec.EncodeServiceReady()
9395
if err := s.codec.WriteMessage(writer, encoded); err != nil {
9496
s.logger.Warn("Failed to send CMD_SERVICE_READY to new push client",
@@ -97,6 +99,11 @@ func (s *Server) startPushPipeListener() {
9799
s.logger.Debug("CMD_SERVICE_READY sent to new push client", "clientID", clientID)
98100
}
99101

102+
// Per-client writer goroutine:消费 outbound 队列,把广播路径从
103+
// "每次都 go func()"改成"单 worker 串行"。slow client 不会再让
104+
// goroutine 堆积。outbound 关闭后 range 退出,writer 自然终止。
105+
go s.pushWriterLoop(handle, writer, clientID, pushProcessID)
106+
100107
// 单 goroutine 完成两件事:
101108
// 1) 阻塞读 8 字节 token 握手
102109
// 2) 握手后继续阻塞 ReadFile,专门用于检测对端关闭(死链监听)
@@ -165,6 +172,35 @@ func (s *Server) startPushPipeListener() {
165172
}
166173
}
167174

175+
// pushWriterLoop 是 per-client 广播 worker。范围迭代 outbound,串行写入;
176+
// 写失败时清理 handle 并退出。outbound 被 shutdown() 关闭后 range 自然退出。
177+
//
178+
// 不再像旧设计那样"每次广播都 go func()"——pprof 曾观测到 725 个 goroutine
179+
// 堵在 sync.Mutex.Lock 上,slow/dead client 把广播 goroutine 无限堆积。
180+
// 新设计下每个 client 至多 1 个 writer goroutine。
181+
func (s *Server) pushWriterLoop(h windows.Handle, writer *pipeWriter, cid int, pid uint32) {
182+
for msg := range writer.outbound {
183+
if err := s.codec.WriteMessage(writer, msg); err != nil {
184+
if isPipeClosed(err) {
185+
s.logger.Debug("Push pipe writer exiting on peer close",
186+
"clientID", cid, "processID", pid, "error", err)
187+
} else {
188+
s.logger.Warn("Push pipe writer aborting on write error",
189+
"clientID", cid, "processID", pid, "error", err)
190+
}
191+
// Phase-2 reader 多数情况下已经清理过了;cleanupPushHandle 用返回值
192+
// 做并发安全的"二选一",CloseHandle 不会被双关。
193+
s.pushMu.Lock()
194+
removed := s.cleanupPushHandle(h)
195+
s.pushMu.Unlock()
196+
if removed {
197+
windows.CloseHandle(h)
198+
}
199+
return
200+
}
201+
}
202+
}
203+
168204
// removePushHandleFromPIDIndex 在写失败清理时维护 pushClientsByPID 的一致性。
169205
// 当被移除的 handle 恰好是该 PID 的最新记录时,尝试从 pushHandleToPID 中为同 PID
170206
// 找另一个存活 handle 作替代;若无其他 handle 则删除该条目。
@@ -189,7 +225,8 @@ func (s *Server) removePushHandleFromPIDIndex(pid uint32, removedHandle windows.
189225
// (removePushHandleFromPIDIndex 需要先读 pushHandleToPID 找替代 handle,
190226
// 因此 pushHandleToPID 的实际删除放在最后。)
191227
func (s *Server) cleanupPushHandle(handle windows.Handle) bool {
192-
if _, exists := s.pushClients[handle]; !exists {
228+
w, exists := s.pushClients[handle]
229+
if !exists {
193230
return false
194231
}
195232
pid := s.pushHandleToPID[handle]
@@ -200,6 +237,11 @@ func (s *Server) cleanupPushHandle(handle windows.Handle) bool {
200237
delete(s.tokenToPushHandle, token)
201238
delete(s.pushHandleToToken, handle)
202239
}
240+
// 关闭 outbound 让 writer goroutine 退出;CloseHandle 在调用方完成。
241+
// shutdown() 多次调用安全(closeOnce)。
242+
if w != nil {
243+
w.shutdown()
244+
}
203245
return true
204246
}
205247

@@ -240,22 +282,15 @@ func (s *Server) PushStateToAllClients(status *StatusUpdateData) {
240282
"fullWidth", status.FullWidth,
241283
"capsLock", status.CapsLock)
242284

243-
// 每个客户端独立 goroutine 写入,避免某个 client 的 pipe buffer 满/阻塞
244-
// 导致后续 client(如 Notepad 第二个 CLangBar 实例)永远收不到推送。
245-
// Go map 随机迭代顺序会使阻塞点前后的 client 每次不同,造成状态同步时好时坏。
285+
// 把消息丢到每个 client 的 outbound 队列;per-client writer goroutine 串行消费。
286+
// 队列满表示该 client 卡顿——状态推送语义幂等,丢弃即可(下次推就是最新值)。
287+
// 旧设计每次广播都 go func(),slow client 让 goroutine 堆到数百个;新设计下
288+
// 每个 client 仅一个 writer goroutine,不会无限增长。
246289
for _, client := range clients {
247-
c := client
248-
go func() {
249-
if err := s.codec.WriteMessage(c.writer, encoded); err != nil {
250-
s.logger.Warn("Failed to push state to client", "processID", c.processID, "error", err)
251-
s.pushMu.Lock()
252-
removed := s.cleanupPushHandle(c.handle)
253-
s.pushMu.Unlock()
254-
if removed {
255-
windows.CloseHandle(c.handle)
256-
}
257-
}
258-
}()
290+
if !client.writer.enqueueBroadcast(encoded) {
291+
s.logger.Warn("Push state dropped: outbound queue full",
292+
"processID", client.processID)
293+
}
259294
}
260295
}
261296

@@ -495,19 +530,13 @@ func (s *Server) pushSyncConfigToAllClients(key string, value []byte, logName st
495530
return
496531
}
497532

533+
// 同 PushStateToAllClients:丢到 per-client outbound 队列,满则 drop。
534+
// 配置同步幂等——下次 push 自带最新 value。
498535
for _, client := range clients {
499-
c := client
500-
go func() {
501-
if err := s.codec.WriteMessage(c.writer, encoded); err != nil {
502-
s.logger.Debug("Failed to push config", "config", logName, "error", err)
503-
s.pushMu.Lock()
504-
removed := s.cleanupPushHandle(c.handle)
505-
s.pushMu.Unlock()
506-
if removed {
507-
windows.CloseHandle(c.handle)
508-
}
509-
}
510-
}()
536+
if !client.writer.enqueueBroadcast(encoded) {
537+
s.logger.Warn("Push config dropped: outbound queue full",
538+
"config", logName)
539+
}
511540
}
512541
}
513542

@@ -544,7 +573,10 @@ func (s *Server) RestartService() {
544573
// Close all push pipe clients and clear all mappings
545574
s.pushMu.Lock()
546575
pushClientCount := len(s.pushClients)
547-
for h := range s.pushClients {
576+
for h, w := range s.pushClients {
577+
if w != nil {
578+
w.shutdown() // 关 outbound 让 writer goroutine 退出
579+
}
548580
windows.CloseHandle(h)
549581
}
550582
// 重置所有 map(比逐条 delete 更高效)

0 commit comments

Comments
 (0)