Skip to content

Commit c227974

Browse files
ZhengYa-0110SongZhen0704
authored andcommitted
feat: use stream-based query for big prometheus data
1 parent f8aa0ca commit c227974

5 files changed

Lines changed: 114 additions & 66 deletions

File tree

server/controller/prometheus/cache/label.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,25 @@ func (l *label) Add(batch []*controller.PrometheusLabel) {
114114
}
115115

116116
func (l *label) refresh(args ...interface{}) error {
117-
data, err := l.load()
117+
rows, err := l.org.DB.Model(&metadbmodel.PrometheusLabel{}).Select("id", "name", "value").Rows()
118118
if err != nil {
119119
return err
120120
}
121-
l.processLoadedData(data)
122-
return nil
123-
}
124-
125-
func (l *label) processLoadedData(data []*metadbmodel.PrometheusLabel) {
126-
newActive := make(map[LabelKey]int, len(data))
127-
for _, item := range data {
128-
k := NewLabelKey(item.Name, item.Value)
129-
newActive[k] = item.ID
121+
defer rows.Close()
122+
123+
newActive := make(map[LabelKey]int)
124+
for rows.Next() {
125+
var id int
126+
var name, value string
127+
if scanErr := rows.Scan(&id, &name, &value); scanErr != nil {
128+
log.Errorf("stream scan prometheus_label interrupted: %v", scanErr, l.org.LogPrefix)
129+
return scanErr
130+
}
131+
newActive[NewLabelKey(name, value)] = id
132+
}
133+
if err := rows.Err(); err != nil {
134+
log.Errorf("stream read prometheus_label error: %v", err, l.org.LogPrefix)
135+
return err
130136
}
131137

132138
l.mu.Lock()
@@ -138,10 +144,5 @@ func (l *label) processLoadedData(data []*metadbmodel.PrometheusLabel) {
138144
l.mu.Unlock()
139145

140146
l.replaceActive(newActive)
141-
}
142-
143-
func (l *label) load() ([]*metadbmodel.PrometheusLabel, error) {
144-
var labels []*metadbmodel.PrometheusLabel
145-
err := l.org.DB.Select("id", "name", "value").Find(&labels).Error
146-
return labels, err
147+
return nil
147148
}

server/controller/prometheus/cache/label_value.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,26 @@ func (lv *labelValue) Add(batch []*controller.PrometheusLabelValue) {
9898
}
9999

100100
func (lv *labelValue) refresh(args ...interface{}) error {
101-
data, err := lv.load()
101+
rows, err := lv.org.DB.Model(&metadbmodel.PrometheusLabelValue{}).Select("id", "value").Rows()
102102
if err != nil {
103103
return err
104104
}
105-
106-
newActive := lv.processLoadedData(data)
105+
defer rows.Close()
106+
107+
newActive := make(map[string]int)
108+
for rows.Next() {
109+
var id int
110+
var value string
111+
if scanErr := rows.Scan(&id, &value); scanErr != nil {
112+
log.Errorf("stream scan prometheus_label_value interrupted: %v", scanErr, lv.org.LogPrefix)
113+
return scanErr
114+
}
115+
newActive[value] = id
116+
}
117+
if err := rows.Err(); err != nil {
118+
log.Errorf("stream read prometheus_label_value error: %v", err, lv.org.LogPrefix)
119+
return err
120+
}
107121

108122
lv.mu.Lock()
109123
pending := lv.pending
@@ -116,17 +130,3 @@ func (lv *labelValue) refresh(args ...interface{}) error {
116130
lv.replaceActive(newActive)
117131
return nil
118132
}
119-
120-
func (lv *labelValue) processLoadedData(data []*metadbmodel.PrometheusLabelValue) map[string]int {
121-
newActive := make(map[string]int, len(data))
122-
for _, item := range data {
123-
newActive[item.Value] = item.ID
124-
}
125-
return newActive
126-
}
127-
128-
func (lv *labelValue) load() ([]*metadbmodel.PrometheusLabelValue, error) {
129-
var labelValues []*metadbmodel.PrometheusLabelValue
130-
err := lv.org.DB.Select("id", "value").Find(&labelValues).Error
131-
return labelValues, err
132-
}

server/controller/prometheus/encoder/label.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,25 @@ func (l *label) refresh(args ...interface{}) error {
6262
}
6363
l.lock.Unlock()
6464

65-
var items []*metadbmodel.PrometheusLabel
66-
if err := l.org.DB.Select("id", "name", "value").Find(&items).Error; err != nil {
65+
rows, err := l.org.DB.Model(&metadbmodel.PrometheusLabel{}).Select("id", "name", "value").Rows()
66+
if err != nil {
6767
return err
6868
}
69-
newMap := make(map[cache.LabelKey]int, len(items))
70-
for _, item := range items {
71-
newMap[cache.NewLabelKey(item.Name, item.Value)] = item.ID
69+
defer rows.Close()
70+
71+
newMap := make(map[cache.LabelKey]int)
72+
for rows.Next() {
73+
var id int
74+
var name, value string
75+
if scanErr := rows.Scan(&id, &name, &value); scanErr != nil {
76+
log.Errorf("db stream scan %s interrupted: %v", l.resourceType, scanErr, l.org.LogPrefix)
77+
return scanErr
78+
}
79+
newMap[cache.NewLabelKey(name, value)] = id
80+
}
81+
if err := rows.Err(); err != nil {
82+
log.Errorf("db stream %s error: %v", l.resourceType, err, l.org.LogPrefix)
83+
return err
7284
}
7385

7486
l.lock.Lock()

server/controller/prometheus/encoder/label_value.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,26 @@ func (lv *labelValue) refresh(args ...interface{}) error {
5252
}
5353
lv.lock.Unlock()
5454

55-
var items []*metadbmodel.PrometheusLabelValue
56-
if err := lv.org.DB.Select("id", "value").Find(&items).Error; err != nil {
55+
rows, err := lv.org.DB.Model(&metadbmodel.PrometheusLabelValue{}).Select("id", "value").Rows()
56+
if err != nil {
5757
log.Errorf("db query %s failed: %v", lv.resourceType, err, lv.org.LogPrefix)
5858
return err
5959
}
60-
newMap := make(map[string]int, len(items))
61-
for _, item := range items {
62-
newMap[item.Value] = item.ID
60+
defer rows.Close()
61+
62+
newMap := make(map[string]int)
63+
for rows.Next() {
64+
var id int
65+
var value string
66+
if scanErr := rows.Scan(&id, &value); scanErr != nil {
67+
log.Errorf("db stream scan %s interrupted: %v", lv.resourceType, scanErr, lv.org.LogPrefix)
68+
return scanErr
69+
}
70+
newMap[value] = id
71+
}
72+
if err := rows.Err(); err != nil {
73+
log.Errorf("db stream %s error: %v", lv.resourceType, err, lv.org.LogPrefix)
74+
return err
6375
}
6476

6577
lv.lock.Lock()

server/controller/tagrecorder/ch_app_label.go

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,6 @@ func NewChAPPLabel() *ChAPPLabel {
4040

4141
func (l *ChAPPLabel) generateNewData(db *metadb.DB) (map[PrometheusAPPLabelKey]metadbmodel.ChAPPLabel, bool) {
4242
log.Infof("generate data for %s", l.resourceTypeName, db.LogPrefixORGID)
43-
var prometheusLabels []metadbmodel.PrometheusLabel
44-
err := db.Unscoped().Find(&prometheusLabels).Error
45-
46-
if err != nil {
47-
log.Errorf(dbQueryResourceFailed(l.resourceTypeName, err), db.LogPrefixORGID)
48-
return nil, false
49-
}
5043

5144
appLabelSlice, ok := l.generateAPPLabelData(db)
5245

@@ -55,12 +48,23 @@ func (l *ChAPPLabel) generateNewData(db *metadb.DB) (map[PrometheusAPPLabelKey]m
5548
return nil, false
5649
}
5750

51+
labelRows, err := db.Unscoped().Model(&metadbmodel.PrometheusLabel{}).Select("id", "name", "value").Rows()
52+
if err != nil {
53+
log.Errorf(dbQueryResourceFailed(l.resourceTypeName, err), db.LogPrefixORGID)
54+
return nil, false
55+
}
56+
defer labelRows.Close()
57+
5858
keyToItem := make(map[PrometheusAPPLabelKey]metadbmodel.ChAPPLabel)
59-
for _, prometheusLabel := range prometheusLabels {
60-
labelName := prometheusLabel.Name
59+
for labelRows.Next() {
60+
var id int
61+
var labelName, labelValue string
62+
if scanErr := labelRows.Scan(&id, &labelName, &labelValue); scanErr != nil {
63+
log.Errorf("stream scan %s prometheus_label interrupted: %v", l.resourceTypeName, scanErr, db.LogPrefixORGID)
64+
return nil, false
65+
}
6166
if slices.Contains(appLabelSlice, labelName) {
6267
labelNameID, nameOK := labelNameIDMap[labelName]
63-
labelValue := prometheusLabel.Value
6468
labelValueID, valueOK := valueNameIDMap[labelValue]
6569
if !nameOK || !valueOK {
6670
log.Warningf("label name or value not found in db, labelName: %s, labelValue: %s", labelName, labelValue)
@@ -72,7 +76,10 @@ func (l *ChAPPLabel) generateNewData(db *metadb.DB) (map[PrometheusAPPLabelKey]m
7276
LabelValueID: labelValueID,
7377
}
7478
}
75-
79+
}
80+
if err := labelRows.Err(); err != nil {
81+
log.Errorf("stream read %s prometheus_label error: %v", l.resourceTypeName, err, db.LogPrefixORGID)
82+
return nil, false
7683
}
7784
return keyToItem, true
7885
}
@@ -111,29 +118,45 @@ func (l *ChAPPLabel) generateAPPLabelData(db *metadb.DB) ([]string, bool) {
111118
func (l *ChAPPLabel) generateNameIDData(db *metadb.DB) (map[string]int, map[string]int, bool) {
112119
labelNameIDMap := make(map[string]int)
113120
valueNameIDMap := make(map[string]int)
114-
var prometheusLabelNames []metadbmodel.PrometheusLabelName
115-
var prometheusLabelValues []metadbmodel.PrometheusLabelValue
116-
117-
err := db.Unscoped().Find(&prometheusLabelNames).Error
118121

122+
nameRows, err := db.Unscoped().Model(&metadbmodel.PrometheusLabelName{}).Select("id", "name").Rows()
119123
if err != nil {
120124
log.Errorf(dbQueryResourceFailed(l.resourceTypeName, err), db.LogPrefixORGID)
121125
return nil, nil, false
122126
}
127+
defer nameRows.Close()
128+
for nameRows.Next() {
129+
var id int
130+
var name string
131+
if scanErr := nameRows.Scan(&id, &name); scanErr != nil {
132+
log.Errorf("stream scan %s prometheus_label_name interrupted: %v", l.resourceTypeName, scanErr, db.LogPrefixORGID)
133+
return nil, nil, false
134+
}
135+
labelNameIDMap[name] = id
136+
}
137+
if err := nameRows.Err(); err != nil {
138+
log.Errorf("stream read %s prometheus_label_name error: %v", l.resourceTypeName, err, db.LogPrefixORGID)
139+
return nil, nil, false
140+
}
123141

124-
err = db.Unscoped().Find(&prometheusLabelValues).Error
125-
142+
valueRows, err := db.Unscoped().Model(&metadbmodel.PrometheusLabelValue{}).Select("id", "value").Rows()
126143
if err != nil {
127144
log.Errorf(dbQueryResourceFailed(l.resourceTypeName, err), db.LogPrefixORGID)
128145
return nil, nil, false
129146
}
130-
131-
for _, prometheusLabelName := range prometheusLabelNames {
132-
labelNameIDMap[prometheusLabelName.Name] = prometheusLabelName.ID
147+
defer valueRows.Close()
148+
for valueRows.Next() {
149+
var id int
150+
var value string
151+
if scanErr := valueRows.Scan(&id, &value); scanErr != nil {
152+
log.Errorf("stream scan %s prometheus_label_value interrupted: %v", l.resourceTypeName, scanErr, db.LogPrefixORGID)
153+
return nil, nil, false
154+
}
155+
valueNameIDMap[value] = id
133156
}
134-
135-
for _, prometheusLabelValue := range prometheusLabelValues {
136-
valueNameIDMap[prometheusLabelValue.Value] = prometheusLabelValue.ID
157+
if err := valueRows.Err(); err != nil {
158+
log.Errorf("stream read %s prometheus_label_value error: %v", l.resourceTypeName, err, db.LogPrefixORGID)
159+
return nil, nil, false
137160
}
138161
return labelNameIDMap, valueNameIDMap, true
139162
}

0 commit comments

Comments
 (0)