@@ -130,14 +130,8 @@ func (hw *HistoryWorker) dataProcessWorker() {
130130
131131// processMonitoringData 处理单个监控数据点
132132// 重要:每个实例独立处理,互不影响
133- // 每个实例维护自己的30个数据点累积数组和流量差值计算状态
133+ // 每个实例维护自己的数据点累积数组和流量差值计算状态
134134func (hw * HistoryWorker ) processMonitoringData (data MonitoringData ) {
135- // 数据验证
136- if data .EndpointID <= 0 || data .InstanceID == "" {
137- log .Warnf ("[HistoryWorker]无效的监控数据: 端点ID=%d, 实例ID=%s" , data .EndpointID , data .InstanceID )
138- return
139- }
140-
141135 // 构建实例唯一键:endpointID_instanceID
142136 key := hw .buildDataKey (data .EndpointID , data .InstanceID )
143137
@@ -147,10 +141,9 @@ func (hw *HistoryWorker) processMonitoringData(data MonitoringData) {
147141 hw .mu .RUnlock ()
148142
149143 if ! exists {
150- // 为该实例创建独立的状态数据容器
151- // 每个实例都有自己的30个数据点累积数组
144+ // 为该实例创建独立的状态数据容器,每个实例都有自己的数据点累积数组
152145 currentStatus = & ServiceCurrentStatus {
153- Result : make ([]MonitoringData , 0 , _CurrentStatusSize ), // 独立的30个数据点数组
146+ Result : make ([]MonitoringData , 0 , _CurrentStatusSize ), // 独立的数据点数组
154147 }
155148
156149 hw .mu .Lock ()
@@ -182,10 +175,9 @@ func (hw *HistoryWorker) processMonitoringData(data MonitoringData) {
182175
183176 log .Debugf ("[HistoryWorker]实例 %s 累积数据点: %d/%d" , key , resultLength , _CurrentStatusSize )
184177
185- // 检查该实例是否独立达到累积阈值(30个数据点)
178+ // 检查该实例是否独立达到累积阈值
186179 // 重要:每个实例独立计算,不同实例之间互不影响
187180 if resultLength >= _CurrentStatusSize {
188- log .Infof ("[HistoryWorker]实例 %s 达到累积阈值,触发独立批量写入" , key )
189181 hw .triggerBatchWrite (key , currentStatus )
190182 }
191183}
@@ -227,7 +219,7 @@ func (hw *HistoryWorker) aggregateAndWrite(dataPoints []MonitoringData) {
227219 lastPoint := dataPoints [len (dataPoints )- 1 ]
228220
229221 // 初始化聚合结果
230- aggregated := & models.ServiceHistory {
222+ historyModel := & models.ServiceHistory {
231223 EndpointID : firstPoint .EndpointID ,
232224 InstanceID : firstPoint .InstanceID ,
233225 RecordTime : time .Now ().Truncate (time .Minute ), // 按分钟取整
@@ -236,11 +228,10 @@ func (hw *HistoryWorker) aggregateAndWrite(dataPoints []MonitoringData) {
236228 }
237229
238230 // 1. 存储最后一个数据点的累计值(保持原始int64类型)
239- aggregated .DeltaTCPIn = lastPoint .TCPIn
240- aggregated .DeltaTCPOut = lastPoint .TCPOut
241- aggregated .DeltaUDPIn = lastPoint .UDPIn
242- aggregated .DeltaUDPOut = lastPoint .UDPOut
243-
231+ historyModel .DeltaTCPIn = lastPoint .TCPIn
232+ historyModel .DeltaTCPOut = lastPoint .TCPOut
233+ historyModel .DeltaUDPIn = lastPoint .UDPIn
234+ historyModel .DeltaUDPOut = lastPoint .UDPOut
244235 // 2. 计算时间跨度
245236 totalTime := lastPoint .Timestamp .Sub (firstPoint .Timestamp ).Seconds ()
246237
@@ -270,8 +261,8 @@ func (hw *HistoryWorker) aggregateAndWrite(dataPoints []MonitoringData) {
270261 udpOutDelta = float64 (lastPoint .UDPOut )
271262 }
272263
273- aggregated .AvgSpeedIn = (tcpInDelta + udpInDelta ) / totalTime
274- aggregated .AvgSpeedOut = (tcpOutDelta + udpOutDelta ) / totalTime
264+ historyModel .AvgSpeedIn = (tcpInDelta + udpInDelta ) / totalTime
265+ historyModel .AvgSpeedOut = (tcpOutDelta + udpOutDelta ) / totalTime
275266 } else {
276267 // 异常情况:时间差为0,使用理论时间间隔
277268 log .Warnf ("[HistoryWorker]时间差为0,使用理论时间间隔计算速度" )
@@ -282,11 +273,11 @@ func (hw *HistoryWorker) aggregateAndWrite(dataPoints []MonitoringData) {
282273 udpInDelta := float64 (lastPoint .UDPIn - firstPoint .UDPIn )
283274 udpOutDelta := float64 (lastPoint .UDPOut - firstPoint .UDPOut )
284275
285- aggregated .AvgSpeedIn = (tcpInDelta + udpInDelta ) / theoreticalTime
286- aggregated .AvgSpeedOut = (tcpOutDelta + udpOutDelta ) / theoreticalTime
276+ historyModel .AvgSpeedIn = (tcpInDelta + udpInDelta ) / theoreticalTime
277+ historyModel .AvgSpeedOut = (tcpOutDelta + udpOutDelta ) / theoreticalTime
287278 } else {
288- aggregated .AvgSpeedIn = 0
289- aggregated .AvgSpeedOut = 0
279+ historyModel .AvgSpeedIn = 0
280+ historyModel .AvgSpeedOut = 0
290281 }
291282 }
292283
@@ -302,35 +293,32 @@ func (hw *HistoryWorker) aggregateAndWrite(dataPoints []MonitoringData) {
302293 }
303294
304295 if pingCount > 0 {
305- aggregated .AvgPing = pingSum / float64 (pingCount )
296+ historyModel .AvgPing = pingSum / float64 (pingCount )
306297 } else {
307- aggregated .AvgPing = 0
298+ historyModel .AvgPing = 0
308299 }
309300
310301 // 5. Pool连接池:直接使用最后一个值(反映最新状态)
311302 if lastPoint .Pool != nil {
312- aggregated .AvgPool = * lastPoint .Pool
303+ historyModel .AvgPool = * lastPoint .Pool
313304 } else {
314- aggregated .AvgPool = 0
305+ historyModel .AvgPool = 0
315306 }
316307
317- log .Infof ("[HistoryWorker]聚合完成 - 端点:%d 实例:%s 数据点:%d 时间跨度:%.1fs TCP入累计:%d TCP出累计:%d UDP入累计:%d UDP出累计:%d 延迟平均:%.2fms 连接池最新:%d 入站速度:%.2f bytes/s 出站速度:%.2f bytes/s" ,
318- aggregated .EndpointID , aggregated .InstanceID , aggregated .RecordCount , totalTime ,
319- aggregated .DeltaTCPIn , aggregated .DeltaTCPOut , aggregated .DeltaUDPIn , aggregated .DeltaUDPOut ,
320- aggregated .AvgPing , int64 (aggregated .AvgPool ), aggregated .AvgSpeedIn , aggregated .AvgSpeedOut )
321-
322308 // 6. 立即写入数据库
323309 startTime := time .Now ()
324- err := hw .db .Create (aggregated ).Error
310+ err := hw .db .Create (historyModel ).Error
325311 duration := time .Since (startTime )
326312
327313 if err != nil {
328314 log .Errorf ("[HistoryWorker]立即写入失败 (端点:%d 实例:%s, 耗时%v): %v" ,
329- aggregated .EndpointID , aggregated .InstanceID , duration , err )
330- } else {
331- log .Infof ("[HistoryWorker]立即写入成功: 端点:%d 实例:%s, 数据点:%d, 耗时%v" ,
332- aggregated .EndpointID , aggregated .InstanceID , aggregated .RecordCount , duration )
315+ historyModel .EndpointID , historyModel .InstanceID , duration , err )
333316 }
317+
318+ log .Infof ("[HistoryWorker]聚合完成 - 端点:%d 实例:%s 数据点:%d 时间跨度:%.1fs TCP入累计:%d TCP出累计:%d UDP入累计:%d UDP出累计:%d 延迟平均:%.2fms 连接池最新:%d 入站速度:%.2f bytes/s 出站速度:%.2f bytes/s" ,
319+ historyModel .EndpointID , historyModel .InstanceID , historyModel .RecordCount , totalTime ,
320+ historyModel .DeltaTCPIn , historyModel .DeltaTCPOut , historyModel .DeltaUDPIn , historyModel .DeltaUDPOut ,
321+ historyModel .AvgPing , int64 (historyModel .AvgPool ), historyModel .AvgSpeedIn , historyModel .AvgSpeedOut )
334322}
335323
336324// 移除批量写入相关方法(改为立即写入)
0 commit comments