Skip to content

Commit 86a791e

Browse files
committed
feat: 增加SSE_DEBUG_LOG变量用于是否输出sse消息到控制台,默认不输出
1 parent 445ce03 commit 86a791e

File tree

1 file changed

+63
-17
lines changed

1 file changed

+63
-17
lines changed

cmd/server/main.go

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"NodePassDash/internal/dashboard"
66
dbPkg "NodePassDash/internal/db"
77
"NodePassDash/internal/endpoint"
8+
"NodePassDash/internal/endpointcache"
89
"NodePassDash/internal/nodepass"
910
"NodePassDash/internal/router"
1011

@@ -55,7 +56,7 @@ func serveStaticFile(c *gin.Context, fsys fs.FS, fileName, contentType string) {
5556
}
5657

5758
// parseFlags 解析命令行参数并处理基础配置
58-
func parseFlags() (resetPwd bool, port, certFile, keyFile string, showVersion, disableLogin bool) {
59+
func parseFlags() (resetPwd bool, port, certFile, keyFile string, showVersion, disableLogin, sseDebugLog bool) {
5960
// 命令行参数处理
6061
resetPwdCmd := flag.Bool("resetpwd", false, "重置管理员密码")
6162
portFlag := flag.String("port", "", "HTTP 服务端口 (优先级高于环境变量 PORT),默认 3000")
@@ -67,6 +68,8 @@ func parseFlags() (resetPwd bool, port, certFile, keyFile string, showVersion, d
6768
tlsKeyFlag := flag.String("key", "", "TLS 私钥文件路径")
6869
// 禁用用户名密码登录参数
6970
disableLoginFlag := flag.Bool("disable-login", false, "禁用用户名密码登录,仅允许 OAuth2 登录")
71+
// SSE 调试日志参数
72+
sseDebugLogFlag := flag.Bool("sse-debug-log", false, "启用 SSE 消息调试日志")
7073

7174
flag.Parse()
7275

@@ -110,7 +113,16 @@ func parseFlags() (resetPwd bool, port, certFile, keyFile string, showVersion, d
110113
}
111114
}
112115

113-
return *resetPwdCmd, port, certFile, keyFile, *versionFlag || *vFlag, disableLogin
116+
// 设置 SSE 调试日志配置
117+
// 优先级:命令行参数 > 环境变量
118+
sseDebugLog = *sseDebugLogFlag
119+
if !sseDebugLog {
120+
if env := os.Getenv("SSE_DEBUG_LOG"); env == "true" || env == "1" {
121+
sseDebugLog = true
122+
}
123+
}
124+
125+
return *resetPwdCmd, port, certFile, keyFile, *versionFlag || *vFlag, disableLogin, sseDebugLog
114126
}
115127

116128
// setupStaticFiles 配置静态文件服务
@@ -180,7 +192,7 @@ func setupStaticFiles(ginRouter *gin.Engine) error {
180192
}
181193

182194
// initializeServices 初始化所有服务
183-
func initializeServices() (*gorm.DB, *auth.Service, *endpoint.Service, *tunnel.Service, *dashboard.Service, *sse.Service, *sse.Manager, *websocket.Service, error) {
195+
func initializeServices(sseDebugLog bool) (*gorm.DB, *auth.Service, *endpoint.Service, *tunnel.Service, *dashboard.Service, *sse.Service, *sse.Manager, *websocket.Service, error) {
184196
// 获取GORM数据库连接
185197
gormDB := dbPkg.GetDB()
186198
log.Info("数据库连接成功")
@@ -191,13 +203,20 @@ func initializeServices() (*gorm.DB, *auth.Service, *endpoint.Service, *tunnel.S
191203
log.Errorf("系统初始化失败: %v", err)
192204
}
193205

194-
// 初始化端点缓存
206+
// 初始化NodePass客户端缓存(用于API调用)
195207
if err := nodepass.InitializeCache(gormDB); err != nil {
196-
log.Errorf("初始化端点缓存失败: %v", err)
208+
log.Errorf("初始化NodePass客户端缓存失败: %v", err)
197209
} else {
198-
log.Infof("端点缓存初始化成功,加载了 %d 个端点", nodepass.GetCache().Count())
210+
log.Infof("NodePass客户端缓存初始化成功,加载了 %d 个端点", nodepass.GetCache().Count())
199211
}
200212

213+
// 初始化Endpoint内存缓存(类似Nezha的ServerShared)
214+
if err := endpointcache.InitShared(gormDB); err != nil {
215+
log.Errorf("初始化Endpoint内存缓存失败: %v", err)
216+
return nil, nil, nil, nil, nil, nil, nil, nil, fmt.Errorf("初始化Endpoint缓存失败: %v", err)
217+
}
218+
log.Infof("✅ Endpoint内存缓存初始化成功,已加载 %d 个端点", endpointcache.Shared.Count())
219+
201220
// 初始化其他服务
202221
endpointService := endpoint.NewService(gormDB)
203222
tunnelService := tunnel.NewService(gormDB)
@@ -210,7 +229,7 @@ func initializeServices() (*gorm.DB, *auth.Service, *endpoint.Service, *tunnel.S
210229
if err != nil {
211230
return nil, nil, nil, nil, nil, nil, nil, nil, fmt.Errorf("获取底层sql.DB失败: %v", err)
212231
}
213-
sseManager := sse.NewManager(sqlDB, sseService)
232+
sseManager := sse.NewManager(sqlDB, sseService, sseDebugLog)
214233

215234
// 设置Manager引用到Service(避免循环依赖)
216235
sseService.SetManager(sseManager)
@@ -263,6 +282,25 @@ func startBackgroundServices(gormDB *gorm.DB, sseService *sse.Service, sseManage
263282
log.Info("流量数据优化调度器已启动")
264283
}()
265284

285+
// 启动Endpoint缓存定时持久化任务(每30秒持久化一次变更)
286+
go func() {
287+
ticker := time.NewTicker(30 * time.Second)
288+
defer ticker.Stop()
289+
290+
for range ticker.C {
291+
if err := endpointcache.Shared.PersistIfNeeded(gormDB); err != nil {
292+
log.Errorf("❌ 持久化Endpoint缓存失败: %v", err)
293+
} else {
294+
stats := endpointcache.Shared.GetStats()
295+
dirtyCount := stats["dirty_count"].(int)
296+
if dirtyCount > 0 {
297+
log.Debugf("💾 持久化了 %d 个变更的端点", dirtyCount)
298+
}
299+
}
300+
}
301+
}()
302+
log.Info("Endpoint缓存定时持久化任务已启动(间隔: 30秒)")
303+
266304
// 启动SSE相关服务
267305
go func() {
268306
sseService.StartStoreWorkers(4) // 减少worker数量
@@ -285,51 +323,59 @@ func startBackgroundServices(gormDB *gorm.DB, sseService *sse.Service, sseManage
285323
}
286324

287325
// gracefulShutdown 优雅关闭服务
288-
func gracefulShutdown(server *http.Server, trafficScheduler *dashboard.TrafficScheduler, wsService *websocket.Service, sseManager *sse.Manager, sseService *sse.Service) {
326+
func gracefulShutdown(server *http.Server, gormDB *gorm.DB, trafficScheduler *dashboard.TrafficScheduler, wsService *websocket.Service, sseManager *sse.Manager, sseService *sse.Service) {
289327
// 等待中断信号
290328
quit := make(chan os.Signal, 1)
291329
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
292330
<-quit
293331

294332
// 关闭服务
295-
log.Infof("正在关闭服务器...")
333+
log.Infof("⏳ 正在关闭服务器...")
334+
335+
// 1. 优先持久化Endpoint缓存(保证数据不丢失)
336+
log.Infof("💾 正在持久化Endpoint缓存...")
337+
if err := endpointcache.Shared.Shutdown(gormDB); err != nil {
338+
log.Errorf("❌ 关闭Endpoint缓存失败: %v", err)
339+
} else {
340+
log.Infof("✅ Endpoint缓存已成功关闭并持久化")
341+
}
296342

297343
// 关闭增强系统(暂时注释掉)
298344
// if err := lifecycleManager.Shutdown(); err != nil {
299345
// log.Errorf("增强系统关闭失败: %v", err)
300346
// }
301347

302-
// 关闭流量调度器
348+
// 2. 关闭流量调度器
303349
if trafficScheduler != nil {
304350
trafficScheduler.Stop()
305351
}
306352

307-
// 关闭WebSocket系统
353+
// 3. 关闭WebSocket系统
308354
if wsService != nil {
309355
wsService.Stop()
310356
}
311357

312-
// 关闭SSE系统
358+
// 4. 关闭SSE系统
313359
if sseManager != nil {
314360
sseManager.Close()
315361
}
316362
if sseService != nil {
317363
sseService.Close()
318364
}
319365

320-
// 优雅关闭HTTP服务器
366+
// 5. 优雅关闭HTTP服务器
321367
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
322368
defer shutdownCancel()
323369

324370
if err := server.Shutdown(shutdownCtx); err != nil {
325371
log.Errorf("服务器关闭错误: %v", err)
326372
}
327373

328-
log.Infof("服务器已关闭")
374+
log.Infof("✅ 服务器已安全关闭")
329375
}
330376

331377
func main() {
332-
resetPwd, port, certFile, keyFile, showVersion, disableLogin := parseFlags()
378+
resetPwd, port, certFile, keyFile, showVersion, disableLogin, sseDebugLog := parseFlags()
333379

334380
// 如果指定了版本参数,显示版本信息后退出
335381
if showVersion {
@@ -351,7 +397,7 @@ func main() {
351397
}
352398

353399
// 初始化所有服务
354-
gormDB, authService, endpointService, tunnelService, dashboardService, sseService, sseManager, wsService, err := initializeServices()
400+
gormDB, authService, endpointService, tunnelService, dashboardService, sseService, sseManager, wsService, err := initializeServices(sseDebugLog)
355401
if err != nil {
356402
log.Errorf("服务初始化失败: %v", err)
357403
return
@@ -417,5 +463,5 @@ func main() {
417463
_ = ctx
418464

419465
// 优雅关闭服务
420-
gracefulShutdown(server, trafficScheduler, wsService, sseManager, sseService)
466+
gracefulShutdown(server, gormDB, trafficScheduler, wsService, sseManager, sseService)
421467
}

0 commit comments

Comments
 (0)