Skip to content

Commit ab2cf3b

Browse files
committed
refactor(bridge): bridge pipe 也迁移到 go-winio, 统一 IO 模型
继承 push pipe 迁移 (d1561f7), 这一步把 bridge pipe (请求-响应 RPC 通道) 也搬到 go-winio overlapped I/O. 之前 bridge 走自写的 sync ReadFile/WriteFile, 功能正常但代码风格不统一, 而且仍残留"同 handle 上 sync read+write 串行化" 的潜在 bug 风险 (push pipe 就是栽在这上面). 主要改动: - Start() 用 winio.ListenPipe + listener.Accept() 替代 CreateNamedPipe + ConnectNamedPipe 手工循环 - handleClient(handle, ...) → handleClient(conn net.Conn, ...) - handleBatchEvents(... writer *pipeWriter ...) → 接 io.Writer (conn 直接传) - activeHandles map[Handle]*pipeWriter → activeConns map[net.Conn]struct{} (只用作集合 + 计数, value 从来没读出来过) - 删除 pipeReader (74 行, winio MessageMode 内置 ERROR_MORE_DATA 处理 + 消息边界, 我们自己拼接逻辑全部下线) - 删除 pipeWriter (15 行) - 删除 readBufPool (winio 自己管 buffer) - AGENTS.md 加入"不要回退到 sync ReadFile/WriteFile"的告诫 净删除 131 行 (server.go: -209 / +78). 整个 internal/bridge/ 现在只有 一种 IO 模型 (winio overlapped + net.Conn), 同一份 fdGetter 接口提取 HANDLE 给 GetNamedPipeClientProcessId 用. 实测 (1.5 小时, PID 25784): - 4926 bridge 请求, 0 错误 - 788 push enqueued / 788 completed, 100% 送达 - 写入耗时 723/788 是 μs 级 (log 显示 0s), 最长 798μs - 22 次断开都被 phase-2 reader 即时捕获 (overlapped 并发安全)
1 parent d1561f7 commit ab2cf3b

4 files changed

Lines changed: 68 additions & 199 deletions

File tree

wind_input/internal/bridge/AGENTS.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,21 @@ Named Pipe IPC 服务端,负责与 C++ TSF(文本服务框架)桥接层进
1515
| File | Description |
1616
|------|-------------|
1717
| `protocol.go` | 协议类型定义(ResponseType、KeyEventData、StatusUpdateData 等) |
18-
| `server.go` | Named Pipe 服务端主体(连接管理、消息读写、pipeReader/pipeWriter|
18+
| `server.go` | Named Pipe 服务端主体(基于 go-winio overlapped I/O;bridge pipe 走请求-响应 RPC,push pipe 走单向广播;net.Conn 接口统一读写|
1919
| `server_handler.go` | 消息分发:解码二进制消息并路由到 MessageHandler 各方法 |
20-
| `server_push.go` | 推送管道管理(per-client outbound channel + 单 writer goroutine;所有 push 仅触达 active client,`pushToActiveClient` 是统一入口) |
20+
| `server_push.go` | 推送管道管理(per-client outbound channel + 单 writer goroutine + phase-2 死链监听;所有 push 仅触达 active client,`pushToActiveClient` 是统一入口) |
2121
| `host_render.go` | `HostRenderManager`:管理白名单进程的宿主渲染状态;`HostRenderState` 持有每个进程的共享内存引用;通过 `OpenProcess`/`QueryFullProcessImageNameW` 识别进程名称 |
2222
| `shared_memory.go` | `SharedMemory`:命名共享内存 + 命名事件对;`WriteFrame` 将 RGBA→BGRA 转换后写入位图并信令通知;`WriteHide` 发送隐藏命令;安全描述符包含 AppContainer 低完整性标记(`S:(ML;;NW;;;LW)`)以支持 UWP 进程访问 |
2323

2424
## For AI Agents
2525

2626
### Working In This Directory
27-
- 管道使用 MESSAGE 模式(`PIPE_TYPE_MESSAGE|PIPE_READMODE_MESSAGE`),每次 ReadFile 返回完整消息
27+
- 管道用 `github.com/Microsoft/go-winio` 起 listener (`winio.ListenPipe`),配置 `MessageMode: true` 保证消息边界
2828
- 缓冲区大小 64KB(与 Weasel 一致)
29-
- 安全描述符允许 Everyone/SYSTEM/Administrators 访问(SDDL: `D:P(A;;GA;;;WD)(A;;GA;;;SY)(A;;GA;;;BA)`
29+
- 安全描述符允许 Everyone/SYSTEM/Administrators 访问(SDDL: `D:P(A;;GA;;;WD)(A;;GA;;;SY)(A;;GA;;;BA)(A;;GA;;;AC)`),含 `S:(ML;;NW;;;LW)` 支持 UWP/AppContainer
30+
- **关键:不要回到自写的同步 `windows.ReadFile` + `WriteFile`**——同一 handle 上 sync read park + sync write 会被内核串行化,writer 会被永久卡住。go-winio 内部用 overlapped I/O + IOCP 避免这个问题
31+
- 客户端 PID 通过 `conn.(fdGetter).Fd()` 拿到底层 HANDLE 后调 `GetNamedPipeClientProcessId`
32+
- Push pipe `pushClient``Disconnect()` + `Close()` 主动断开 client;只用 `Close()` 不会通知 client 端
3033
- 推送管道按进程 ID(PID)跟踪客户端,`activeProcessID` 标识当前有焦点的进程,安全推送只发给活跃客户端
3134
- 请求处理带 1000ms 超时(`RequestProcessTimeout`),覆盖高负载下的调度抖动
3235
- 异步请求(`IsAsyncRequest`)不发送响应

wind_input/internal/bridge/server.go

Lines changed: 53 additions & 188 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313
"unsafe"
1414

15+
"github.com/Microsoft/go-winio"
1516
"github.com/huanfeng/wind_input/internal/ipc"
1617
"github.com/huanfeng/wind_input/pkg/buildvariant"
1718
"golang.org/x/sys/windows"
@@ -32,14 +33,6 @@ func isPipeClosed(err error) bool {
3233
errors.Is(err, windows.ERROR_PIPE_NOT_CONNECTED)
3334
}
3435

35-
// readBufPool 复用 64KB 管道读取缓冲区,避免每次消息读取都 make([]byte, 64KB)。
36-
var readBufPool = sync.Pool{
37-
New: func() any {
38-
buf := make([]byte, PipeBufferSize)
39-
return &buf
40-
},
41-
}
42-
4336
var (
4437
kernel32 = windows.NewLazySystemDLL("kernel32.dll")
4538
procGetNamedPipeClientProcessId = kernel32.NewProc("GetNamedPipeClientProcessId")
@@ -79,9 +72,11 @@ type Server struct {
7972
handler MessageHandler
8073
codec *ipc.BinaryCodec
8174

82-
mu sync.RWMutex
83-
clientCount int
84-
activeHandles map[windows.Handle]*pipeWriter // Map handle to writer for broadcasting
75+
mu sync.RWMutex
76+
clientCount int
77+
// activeConns 跟踪当前活跃的 bridge pipe 连接(请求-响应通道)。
78+
// 仅作为"集合 + 计数"使用——RestartService 时遍历 Close。
79+
activeConns map[net.Conn]struct{}
8580

8681
// Push pipe clients (for proactive state push)
8782
pushMu sync.RWMutex
@@ -113,7 +108,7 @@ func NewServer(handler MessageHandler, logger *slog.Logger) *Server {
113108
handler: handler,
114109
logger: logger,
115110
codec: ipc.NewBinaryCodec(),
116-
activeHandles: make(map[windows.Handle]*pipeWriter),
111+
activeConns: make(map[net.Conn]struct{}),
117112
pushClients: make(map[windows.Handle]*pushClient),
118113
pushClientsByPID: make(map[uint32]windows.Handle),
119114
pushHandleToPID: make(map[windows.Handle]uint32),
@@ -156,7 +151,11 @@ func (s *Server) GetActiveHostRender() (writeFrame func(img *image.RGBA, x, y in
156151
return shm.WriteFrame, shm.WriteHide
157152
}
158153

159-
// Start begins listening for connections from C++ Bridge
154+
// Start begins listening for connections from C++ Bridge.
155+
//
156+
// Bridge pipe(请求-响应 RPC 通道)也迁移到 go-winio overlapped I/O,统一架构。
157+
// 与 push pipe 同样用 winio.ListenPipe + listener.Accept,conn 是 net.Conn,
158+
// 读写走 codec.ReadHeader / WriteMessage(已经是 io.Reader/Writer 接口)。
160159
func (s *Server) Start() error {
161160
s.logger.Info("Starting Bridge IPC server (binary protocol)", "pipe", BridgePipeName)
162161

@@ -166,112 +165,81 @@ func (s *Server) Start() error {
166165
// Allow desktop clients plus AppContainer/modern hosts (e.g. Start menu search).
167166
// S:(ML;;NW;;;LW) = Mandatory Label: Low integrity — required for UWP/AppContainer
168167
// processes (Microsoft Store, Start Menu) which run at low integrity level.
169-
// Without this, the mandatory integrity check blocks access before DACL evaluation.
170-
// D: = DACL: WD=Everyone, SY=SYSTEM, BA=Administrators, AC=ALL APPLICATION PACKAGES
171-
sddl := "D:P(A;;GA;;;WD)(A;;GA;;;SY)(A;;GA;;;BA)(A;;GA;;;AC)S:(ML;;NW;;;LW)"
172-
sd, err := windows.SecurityDescriptorFromString(sddl)
173-
if err != nil {
174-
s.logger.Error("Failed to create security descriptor", "error", err)
175-
sd = nil
168+
pipeConfig := &winio.PipeConfig{
169+
SecurityDescriptor: "D:P(A;;GA;;;WD)(A;;GA;;;SY)(A;;GA;;;BA)(A;;GA;;;AC)S:(ML;;NW;;;LW)",
170+
MessageMode: true,
171+
InputBufferSize: int32(PipeBufferSize),
172+
OutputBufferSize: int32(PipeBufferSize),
176173
}
177-
178-
var sa *windows.SecurityAttributes
179-
if sd != nil {
180-
sa = &windows.SecurityAttributes{
181-
Length: uint32(unsafe.Sizeof(windows.SecurityAttributes{})),
182-
SecurityDescriptor: sd,
183-
}
174+
listener, err := winio.ListenPipe(BridgePipeName, pipeConfig)
175+
if err != nil {
176+
return fmt.Errorf("failed to listen bridge pipe: %w", err)
184177
}
185178

186179
for {
187-
pipePath, err := windows.UTF16PtrFromString(BridgePipeName)
180+
conn, err := listener.Accept()
188181
if err != nil {
189-
return fmt.Errorf("failed to convert pipe path: %w", err)
190-
}
191-
192-
handle, err := windows.CreateNamedPipe(
193-
pipePath,
194-
windows.PIPE_ACCESS_DUPLEX,
195-
// Use MESSAGE mode like Weasel for more reliable message boundaries
196-
windows.PIPE_TYPE_MESSAGE|windows.PIPE_READMODE_MESSAGE|windows.PIPE_WAIT,
197-
windows.PIPE_UNLIMITED_INSTANCES,
198-
PipeBufferSize, // 64KB like Weasel
199-
PipeBufferSize,
200-
0,
201-
sa,
202-
)
203-
204-
if err != nil {
205-
return fmt.Errorf("failed to create named pipe: %w", err)
206-
}
207-
208-
s.logger.Debug("Waiting for C++ Bridge connection...")
209-
210-
err = windows.ConnectNamedPipe(handle, nil)
211-
if err != nil && err != windows.ERROR_PIPE_CONNECTED {
212-
windows.CloseHandle(handle)
182+
if errors.Is(err, net.ErrClosed) {
183+
s.logger.Info("Bridge pipe listener closed")
184+
return nil
185+
}
186+
s.logger.Error("Bridge pipe accept error", "error", err)
187+
time.Sleep(200 * time.Millisecond)
213188
continue
214189
}
215190

216-
// Create pipe writer for this client
217-
writer := &pipeWriter{handle: handle}
218-
219191
s.mu.Lock()
220192
s.clientCount++
221193
clientID := s.clientCount
222-
s.activeHandles[handle] = writer
194+
s.activeConns[conn] = struct{}{}
223195
s.mu.Unlock()
224196

225197
s.logger.Info("C++ Bridge connected", "clientID", clientID)
226198

227-
// Handle client in a separate goroutine to allow concurrent connections
228-
go func(h windows.Handle, id int) {
229-
pid := s.handleClient(h, id)
199+
go func(c net.Conn, id int) {
200+
pid := s.handleClient(c, id)
230201

231202
// Capture the current setup sequence BEFORE acquiring the main lock.
232-
// This prevents a race where the old connection's cleanup goroutine
233-
// destroys a newer connection's SharedMemory for the same PID.
203+
// 防止旧连接的 cleanup goroutine 销毁同 PID 新连接的 SharedMemory。
234204
var setupSeq uint64
235205
if s.hostRender != nil && pid != 0 {
236206
setupSeq = s.hostRender.GetSetupSeq(pid)
237207
}
238208

239209
s.mu.Lock()
240-
delete(s.activeHandles, h)
241-
activeCount := len(s.activeHandles)
210+
delete(s.activeConns, c)
211+
activeCount := len(s.activeConns)
242212
s.mu.Unlock()
243213

244-
// Clean up host render resources only if the generation matches
245214
if s.hostRender != nil && pid != 0 && setupSeq != 0 {
246215
s.hostRender.CleanupClient(pid, setupSeq)
247216
}
248217

249-
// Notify handler that a client disconnected
250218
s.handler.HandleClientDisconnected(activeCount)
251-
}(handle, clientID)
219+
}(conn, clientID)
252220
}
253221
}
254222

255-
func (s *Server) handleClient(handle windows.Handle, clientID int) uint32 {
256-
defer windows.CloseHandle(handle)
223+
func (s *Server) handleClient(conn net.Conn, clientID int) uint32 {
224+
defer conn.Close()
257225

258-
// Get the client's process ID for tracking active client
259-
processID, err := getNamedPipeClientProcessId(handle)
260-
if err != nil {
261-
s.logger.Warn("Failed to get client process ID", "clientID", clientID, "error", err)
262-
processID = 0 // Continue without process ID tracking
263-
} else {
264-
s.logger.Debug("Handling client", "clientID", clientID, "processID", processID)
226+
// Get the client's process ID for tracking active client.
227+
// winio 的 net.Conn 底层 win32File 暴露 Fd()——取出 handle 调用 GetNamedPipeClientProcessId。
228+
var processID uint32
229+
if g, ok := conn.(fdGetter); ok {
230+
var err error
231+
processID, err = getNamedPipeClientProcessId(windows.Handle(g.Fd()))
232+
if err != nil {
233+
s.logger.Warn("Failed to get client process ID", "clientID", clientID, "error", err)
234+
processID = 0
235+
} else {
236+
s.logger.Debug("Handling client", "clientID", clientID, "processID", processID)
237+
}
265238
}
266239

267-
// Create a pipe reader wrapper
268-
reader := &pipeReader{handle: handle}
269-
defer reader.release()
270-
writer := &pipeWriter{handle: handle}
271-
272240
for {
273-
// Read header
274-
header, err := s.codec.ReadHeader(reader)
241+
// Read header (winio 在 MessageMode 下 conn.Read 自带消息边界 + ERROR_MORE_DATA 处理)
242+
header, err := s.codec.ReadHeader(conn)
275243
if err != nil {
276244
if isPipeClosed(err) {
277245
s.logger.Debug("Bridge pipe closed by peer", "clientID", clientID, "error", err)
@@ -282,7 +250,7 @@ func (s *Server) handleClient(handle windows.Handle, clientID int) uint32 {
282250
}
283251

284252
// Read payload
285-
payload, err := s.codec.ReadPayload(reader, header.Length)
253+
payload, err := s.codec.ReadPayload(conn, header.Length)
286254
if err != nil {
287255
if isPipeClosed(err) {
288256
s.logger.Debug("Bridge pipe closed by peer during payload read", "clientID", clientID, "error", err)
@@ -297,7 +265,7 @@ func (s *Server) handleClient(handle windows.Handle, clientID int) uint32 {
297265

298266
// Handle batch events
299267
if header.Command == ipc.CmdBatchEvents {
300-
s.handleBatchEvents(header, payload, writer, clientID, processID)
268+
s.handleBatchEvents(header, payload, conn, clientID, processID)
301269
continue
302270
}
303271

@@ -311,7 +279,7 @@ func (s *Server) handleClient(handle windows.Handle, clientID int) uint32 {
311279
}
312280

313281
// Write response
314-
if err := s.codec.WriteMessage(writer, response); err != nil {
282+
if err := s.codec.WriteMessage(conn, response); err != nil {
315283
if isPipeClosed(err) {
316284
s.logger.Debug("Bridge pipe closed by peer during response write", "clientID", clientID, "error", err)
317285
} else {
@@ -325,109 +293,6 @@ func (s *Server) handleClient(handle windows.Handle, clientID int) uint32 {
325293
return processID
326294
}
327295

328-
// pipeReader wraps windows.Handle for io.Reader
329-
// In MESSAGE mode, each ReadFile returns a complete message
330-
type pipeReader struct {
331-
handle windows.Handle
332-
msgBuffer []byte // Buffer for current message (slice of poolBuf or heap)
333-
msgOffset int // Current read offset in msgBuffer
334-
poolBuf *[]byte // Pool buffer held until current message is fully consumed
335-
}
336-
337-
func (r *pipeReader) Read(p []byte) (int, error) {
338-
// If we have buffered data from a previous message read, return that first
339-
if r.msgOffset < len(r.msgBuffer) {
340-
n := copy(p, r.msgBuffer[r.msgOffset:])
341-
r.msgOffset += n
342-
return n, nil
343-
}
344-
345-
// Current message fully consumed; return pool buffer before acquiring a new one
346-
if r.poolBuf != nil {
347-
readBufPool.Put(r.poolBuf)
348-
r.poolBuf = nil
349-
r.msgBuffer = nil
350-
}
351-
352-
// Acquire a reusable 64KB buffer from the pool
353-
bufPtr := readBufPool.Get().(*[]byte)
354-
readBuf := *bufPtr
355-
var bytesRead uint32
356-
357-
err := windows.ReadFile(r.handle, readBuf, &bytesRead, nil)
358-
if err != nil {
359-
// Handle ERROR_MORE_DATA - message is larger than 64KB (should not happen in practice)
360-
if err == windows.ERROR_MORE_DATA {
361-
// Copy partial data out BEFORE returning pool buffer to avoid race with other goroutines.
362-
accum := make([]byte, bytesRead)
363-
copy(accum, readBuf[:bytesRead])
364-
readBufPool.Put(bufPtr)
365-
for {
366-
tmpPtr := readBufPool.Get().(*[]byte)
367-
tmp := *tmpPtr
368-
err = windows.ReadFile(r.handle, tmp, &bytesRead, nil)
369-
accum = append(accum, tmp[:bytesRead]...)
370-
readBufPool.Put(tmpPtr)
371-
if err == nil {
372-
break
373-
}
374-
if err != windows.ERROR_MORE_DATA {
375-
return 0, err
376-
}
377-
}
378-
r.msgBuffer = accum
379-
r.msgOffset = 0
380-
n := copy(p, r.msgBuffer)
381-
r.msgOffset = n
382-
return n, nil
383-
}
384-
readBufPool.Put(bufPtr)
385-
return 0, err
386-
}
387-
388-
if bytesRead == 0 {
389-
readBufPool.Put(bufPtr)
390-
return 0, io.EOF
391-
}
392-
393-
// Hold the pool buffer until this entire message is consumed
394-
r.poolBuf = bufPtr
395-
r.msgBuffer = readBuf[:bytesRead]
396-
r.msgOffset = 0
397-
398-
n := copy(p, r.msgBuffer)
399-
r.msgOffset = n
400-
return n, nil
401-
}
402-
403-
// release returns any held pool buffer back to the pool. Must be called when the reader is done.
404-
func (r *pipeReader) release() {
405-
if r.poolBuf != nil {
406-
readBufPool.Put(r.poolBuf)
407-
r.poolBuf = nil
408-
}
409-
r.msgBuffer = nil
410-
}
411-
412-
// pipeWriter is the synchronous bridge-pipe writer (request-response RPC).
413-
// 仅用于 bridge pipe;push pipe 已迁移到 net.Conn 基于 winio 的 pushClient。
414-
// mu 串行化并发 WriteFile(Windows 命名管道未保证 thread-safe)。
415-
type pipeWriter struct {
416-
handle windows.Handle
417-
mu sync.Mutex
418-
}
419-
420-
func (w *pipeWriter) Write(p []byte) (int, error) {
421-
w.mu.Lock()
422-
defer w.mu.Unlock()
423-
var bytesWritten uint32
424-
err := windows.WriteFile(w.handle, p, &bytesWritten, nil)
425-
if err != nil {
426-
return 0, err
427-
}
428-
return int(bytesWritten), nil
429-
}
430-
431296
// pushOutboundBufferSize: per-client push 广播队列容量。
432297
// 状态/配置推送 idempotent,队列满则 drop 最新(下次 push 自带最新 value)。
433298
const pushOutboundBufferSize = 16

wind_input/internal/bridge/server_handler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/binary"
66
"fmt"
7+
"io"
78
"runtime/debug"
89

910
"github.com/huanfeng/wind_input/internal/ipc"
@@ -422,7 +423,7 @@ func (s *Server) handleModeNotify(payload []byte, clientID int) []byte {
422423
}
423424

424425
// handleBatchEvents processes a batch of events and sends responses for sync events only
425-
func (s *Server) handleBatchEvents(header *ipc.IpcHeader, payload []byte, writer *pipeWriter, clientID int, processID uint32) {
426+
func (s *Server) handleBatchEvents(header *ipc.IpcHeader, payload []byte, w io.Writer, clientID int, processID uint32) {
426427
events, err := s.codec.DecodeBatchEvents(payload)
427428
if err != nil {
428429
s.logger.Error("Failed to decode batch events", "clientID", clientID, "error", err)
@@ -450,7 +451,7 @@ func (s *Server) handleBatchEvents(header *ipc.IpcHeader, payload []byte, writer
450451
// Send batch response if there are any sync events
451452
if len(responses) > 0 {
452453
batchResponse := s.codec.EncodeBatchResponse(responses)
453-
if err := s.codec.WriteMessage(writer, batchResponse); err != nil {
454+
if err := s.codec.WriteMessage(w, batchResponse); err != nil {
454455
s.logger.Error("Failed to write batch response to Bridge", "clientID", clientID, "error", err)
455456
}
456457
}

0 commit comments

Comments
 (0)