Skip to content

Commit 4b7c045

Browse files
1oca1h0stMecozea
authored andcommitted
faet: 尝试在SSE里面加上僵尸进程检查,解决无法获取隧道内数据的问题
1 parent bb3e9c1 commit 4b7c045

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

internal/sse/manager.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,11 @@ func (m *Manager) listenSSE(ctx context.Context, conn *EndpointConnection) {
364364
connectionTimeout := time.NewTimer(10 * time.Second)
365365
defer connectionTimeout.Stop()
366366

367+
// 空闲超时检测:定期检查是否长时间未收到事件(防止僵尸连接)
368+
idleCheckTicker := time.NewTicker(60 * time.Second) // 每60秒检查一次
369+
defer idleCheckTicker.Stop()
370+
const maxIdleTime = 5 * time.Minute // 最大空闲时间5分钟
371+
367372
for {
368373
select {
369374
case <-ctx.Done():
@@ -383,6 +388,26 @@ func (m *Manager) listenSSE(ctx context.Context, conn *EndpointConnection) {
383388
}
384389
return
385390
}
391+
case <-idleCheckTicker.C:
392+
// 检查是否长时间未收到事件(仅在连接已建立后检查)
393+
if connectionEstablished {
394+
lastEventTime := conn.GetLastEventTime()
395+
if !lastEventTime.IsZero() {
396+
idleDuration := time.Since(lastEventTime)
397+
if idleDuration > maxIdleTime {
398+
log.Warnf("[Master-%d#SSE]检测到僵尸连接:已%v未收到任何事件,主动断开重连",
399+
conn.EndpointID, idleDuration.Round(time.Second))
400+
conn.SetConnected(false)
401+
if !conn.IsManuallyDisconnected() {
402+
m.markEndpointFail(conn.EndpointID)
403+
conn.ResetLastConnectAttempt()
404+
}
405+
return
406+
}
407+
log.Debugf("[Master-%d#SSE]空闲检查:距离上次事件%v(最大允许%v)",
408+
conn.EndpointID, idleDuration.Round(time.Second), maxIdleTime)
409+
}
410+
}
386411
case ev, ok := <-events:
387412
if !ok {
388413
// 事件通道关闭,这是真正的连接断开
@@ -411,6 +436,9 @@ func (m *Manager) listenSSE(ctx context.Context, conn *EndpointConnection) {
411436
connectionTimeout.Stop()
412437
}
413438

439+
// 更新最后事件时间(用于检测僵尸连接)
440+
conn.UpdateLastEventTime()
441+
414442
log.Debugf("[Master-%d#SSE]收到SSE消息: %s", conn.EndpointID, ev.Data)
415443

416444
// 投递到全局 worker pool 异步处理

internal/sse/model.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type EndpointConnection struct {
6666
lastConnectAttempt time.Time // 最后一次连接尝试时间
6767
reconnectAttempts int // 重连尝试次数
6868
isConnected bool // 当前连接状态
69+
lastEventTime time.Time // 最后一次接收到事件的时间
6970
}
7071

7172
// SetManuallyDisconnected 设置手动断开状态
@@ -128,6 +129,22 @@ func (ec *EndpointConnection) GetReconnectAttempts() int {
128129
return ec.reconnectAttempts
129130
}
130131

132+
// UpdateLastEventTime 更新最后事件时间
133+
// TODO(性能优化): 如果单个endpoint的tunnel数量很多(100+)且事件频繁,
134+
// 可以考虑使用atomic.Int64存储Unix纳秒时间戳,或添加频率限制(30s更新一次)来减少锁竞争
135+
func (ec *EndpointConnection) UpdateLastEventTime() {
136+
ec.mu.Lock()
137+
defer ec.mu.Unlock()
138+
ec.lastEventTime = time.Now()
139+
}
140+
141+
// GetLastEventTime 获取最后事件时间
142+
func (ec *EndpointConnection) GetLastEventTime() time.Time {
143+
ec.mu.RLock()
144+
defer ec.mu.RUnlock()
145+
return ec.lastEventTime
146+
}
147+
131148
// Event 事件类型
132149
type Event struct {
133150
Type string `json:"type"`

0 commit comments

Comments
 (0)