diff --git a/collector/config.yml b/collector/config.yml index 5d467abe06..a00c8facf5 100644 --- a/collector/config.yml +++ b/collector/config.yml @@ -64,3 +64,5 @@ hotspot: max_sample_size: 128 hotspot_partition_min_score: 3 hotspot_partition_min_qps: 100 + enable_detect_hotkey: false + occurrence_threshold: 3 diff --git a/collector/hotspot/partition_detector.go b/collector/hotspot/partition_detector.go index 1a8c6277fd..42cb6f4d2c 100644 --- a/collector/hotspot/partition_detector.go +++ b/collector/hotspot/partition_detector.go @@ -50,6 +50,8 @@ type PartitionDetectorConfig struct { MaxSampleSize int HotspotPartitionMinScore float64 HotspotPartitionMinQPS float64 + EnableDetectHotkey bool + OccurrenceThreshold int32 } func LoadPartitionDetectorConfig() *PartitionDetectorConfig { @@ -63,6 +65,8 @@ func LoadPartitionDetectorConfig() *PartitionDetectorConfig { MaxSampleSize: viper.GetInt("hotspot.max_sample_size"), HotspotPartitionMinScore: viper.GetFloat64("hotspot.hotspot_partition_min_score"), HotspotPartitionMinQPS: viper.GetFloat64("hotspot.hotspot_partition_min_qps"), + EnableDetectHotkey: viper.GetBool("hotspot.enable_detect_hotkey"), + OccurrenceThreshold: viper.GetInt32("hotspot.occurrence_threshold"), } } @@ -444,6 +448,17 @@ const ( operateHotspotDataNumber ) +func operationToString(operationType int) string { + switch operationType { + case readHotspotData: + return "read" + case writeHotspotData: + return "write" + default: + return "unknown" + } +} + // hotspotPartitionStats holds all the statistical values of each partition, used for analysis // of hotspot partitions. type hotspotPartitionStats struct { @@ -501,6 +516,8 @@ func (d *partitionDetectorImpl) analyse(appMap appStatsMap) { d.cfg.MaxSampleSize, d.cfg.HotspotPartitionMinScore, d.cfg.HotspotPartitionMinQPS, + d.cfg.EnableDetectHotkey, + d.cfg.OccurrenceThreshold, key.appID, key.partitionCount, ) @@ -518,6 +535,8 @@ func newPartitionAnalyzer( maxSampleSize int, hotspotPartitionMinScore float64, hotspotPartitionMinQPS float64, + enableDetectHotkey bool, + occurrenceThreshold int32, appID int32, partitionCount int32, ) *partitionAnalyzer { @@ -525,8 +544,11 @@ func newPartitionAnalyzer( maxSampleSize: maxSampleSize, hotspotPartitionMinScore: hotspotPartitionMinScore, hotspotPartitionMinQPS: hotspotPartitionMinQPS, + enableDetectHotkey: enableDetectHotkey, + occurrenceThreshold: occurrenceThreshold, appID: appID, partitionCount: partitionCount, + hotspotPartitionCounts: make([][operateHotspotDataNumber]int32, partitionCount), } } @@ -538,11 +560,14 @@ type partitionAnalyzer struct { maxSampleSize int hotspotPartitionMinScore float64 hotspotPartitionMinQPS float64 + enableDetectHotkey bool + occurrenceThreshold int32 appID int32 partitionCount int32 mtx sync.RWMutex expireTimestampSeconds int64 samples deque.Deque[[]hotspotPartitionStats] // Each element is a sample of all partitions of the table + hotspotPartitionCounts [][operateHotspotDataNumber]int32 } func (a *partitionAnalyzer) isExpired(currentTimestampSeconds int64) bool { @@ -589,6 +614,10 @@ func (a *partitionAnalyzer) analyseHotspots(operationType int) { // systems such as Prometheus. log.Infof("appID=%d, partitionCount=%d, operationType=%d, hotspotPartitions=%d, scores=%v", a.appID, a.partitionCount, operationType, hotspotCount, scores) + + if a.enableDetectHotkey { + a.detectHotkey(operationType, sample, scores) + } } // Calculates [Z-score](https://en.wikipedia.org/wiki/Standard_score) for each partition by @@ -650,16 +679,39 @@ func (a *partitionAnalyzer) countHotspots( scores []float64, ) (hotspotCount int) { for i, score := range scores { - if score < a.hotspotPartitionMinScore { + if !a.isHotspotPartition(operationType, sample[i], score) { continue } - if sample[i].totalQPS[operationType] < a.hotspotPartitionMinQPS { + hotspotCount++ + } + + return +} + +func (a *partitionAnalyzer) detectHotkey( + operationType int, + sample []hotspotPartitionStats, + scores []float64, +) { + for i, score := range scores { + if !a.isHotspotPartition(operationType, sample[i], score) { continue } - hotspotCount++ + a.hotspotPartitionCounts[i][operationType]++ + if a.hotspotPartitionCounts[i][operationType] < a.occurrenceThreshold { + continue + } + + log.Warningf("found a %s hotspot partition %d.%d", operationToString(operationType), a.appID, i) } +} - return +func (a *partitionAnalyzer) isHotspotPartition( + operationType int, + stats hotspotPartitionStats, + score float64, +) bool { + return score >= a.hotspotPartitionMinScore && stats.totalQPS[operationType] >= a.hotspotPartitionMinQPS }