Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions collector/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,5 @@ hotspot:
max_sample_size: 128
hotspot_partition_min_score: 3
hotspot_partition_min_qps: 100
enable_detect_hotkey: false
occurrence_threshold: 3
60 changes: 56 additions & 4 deletions collector/hotspot/partition_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type PartitionDetectorConfig struct {
MaxSampleSize int
HotspotPartitionMinScore float64
HotspotPartitionMinQPS float64
EnableDetectHotkey bool
OccurrenceThreshold int32
}

func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
Expand All @@ -63,6 +65,8 @@ func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
MaxSampleSize: viper.GetInt("hotspot.max_sample_size"),
HotspotPartitionMinScore: viper.GetFloat64("hotspot.hotspot_partition_min_score"),
HotspotPartitionMinQPS: viper.GetFloat64("hotspot.hotspot_partition_min_qps"),
EnableDetectHotkey: viper.GetBool("hotspot.enable_detect_hotkey"),
OccurrenceThreshold: viper.GetInt32("hotspot.occurrence_threshold"),
}
}

Expand Down Expand Up @@ -444,6 +448,17 @@ const (
operateHotspotDataNumber
)

func operationToString(operationType int) string {
switch operationType {
case readHotspotData:
return "read"
case writeHotspotData:
return "write"
default:
return "unknown"
}
}

// hotspotPartitionStats holds all the statistical values of each partition, used for analysis
// of hotspot partitions.
type hotspotPartitionStats struct {
Expand Down Expand Up @@ -501,6 +516,8 @@ func (d *partitionDetectorImpl) analyse(appMap appStatsMap) {
d.cfg.MaxSampleSize,
d.cfg.HotspotPartitionMinScore,
d.cfg.HotspotPartitionMinQPS,
d.cfg.EnableDetectHotkey,
d.cfg.OccurrenceThreshold,
key.appID,
key.partitionCount,
)
Expand All @@ -518,15 +535,20 @@ func newPartitionAnalyzer(
maxSampleSize int,
hotspotPartitionMinScore float64,
hotspotPartitionMinQPS float64,
enableDetectHotkey bool,
occurrenceThreshold int32,
appID int32,
partitionCount int32,
) *partitionAnalyzer {
return &partitionAnalyzer{
maxSampleSize: maxSampleSize,
hotspotPartitionMinScore: hotspotPartitionMinScore,
hotspotPartitionMinQPS: hotspotPartitionMinQPS,
enableDetectHotkey: enableDetectHotkey,
occurrenceThreshold: occurrenceThreshold,
appID: appID,
partitionCount: partitionCount,
hotspotPartitionCounts: make([][operateHotspotDataNumber]int32, partitionCount),
}
}

Expand All @@ -538,11 +560,14 @@ type partitionAnalyzer struct {
maxSampleSize int
hotspotPartitionMinScore float64
hotspotPartitionMinQPS float64
enableDetectHotkey bool
occurrenceThreshold int32
appID int32
partitionCount int32
mtx sync.RWMutex
expireTimestampSeconds int64
samples deque.Deque[[]hotspotPartitionStats] // Each element is a sample of all partitions of the table
hotspotPartitionCounts [][operateHotspotDataNumber]int32
}

func (a *partitionAnalyzer) isExpired(currentTimestampSeconds int64) bool {
Expand Down Expand Up @@ -589,6 +614,10 @@ func (a *partitionAnalyzer) analyseHotspots(operationType int) {
// systems such as Prometheus.
log.Infof("appID=%d, partitionCount=%d, operationType=%d, hotspotPartitions=%d, scores=%v",
a.appID, a.partitionCount, operationType, hotspotCount, scores)

if a.enableDetectHotkey {
a.detectHotkey(operationType, sample, scores)
}
}

// Calculates [Z-score](https://en.wikipedia.org/wiki/Standard_score) for each partition by
Expand Down Expand Up @@ -650,16 +679,39 @@ func (a *partitionAnalyzer) countHotspots(
scores []float64,
) (hotspotCount int) {
for i, score := range scores {
if score < a.hotspotPartitionMinScore {
if !a.isHotspotPartition(operationType, sample[i], score) {
continue
}

if sample[i].totalQPS[operationType] < a.hotspotPartitionMinQPS {
hotspotCount++
}

return
}

func (a *partitionAnalyzer) detectHotkey(
operationType int,
sample []hotspotPartitionStats,
scores []float64,
) {
for i, score := range scores {
if !a.isHotspotPartition(operationType, sample[i], score) {
continue
}

hotspotCount++
a.hotspotPartitionCounts[i][operationType]++
if a.hotspotPartitionCounts[i][operationType] < a.occurrenceThreshold {
continue
}

log.Warningf("found a %s hotspot partition %d.%d", operationToString(operationType), a.appID, i)
}
}

return
func (a *partitionAnalyzer) isHotspotPartition(
operationType int,
stats hotspotPartitionStats,
score float64,
) bool {
return score >= a.hotspotPartitionMinScore && stats.totalQPS[operationType] >= a.hotspotPartitionMinQPS
}
Loading