Skip to content

Commit 907d9c5

Browse files
merge caches
1 parent 34b62be commit 907d9c5

21 files changed

Lines changed: 738 additions & 468 deletions

server/controller/prometheus/cache/cache.go

Lines changed: 113 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"golang.org/x/sync/errgroup"
2626

2727
"github.com/deepflowio/deepflow/message/controller"
28+
metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model"
2829
"github.com/deepflowio/deepflow/server/controller/prometheus/common"
2930
"github.com/deepflowio/deepflow/server/libs/logger"
3031
)
@@ -42,6 +43,7 @@ type Cache struct {
4243

4344
canRefresh chan bool
4445
refreshInterval time.Duration
46+
lastRefresh time.Time
4547

4648
MetricName *metricName
4749
LabelName *labelName
@@ -77,37 +79,128 @@ func (c *Cache) GetORG() *common.ORG {
7779
return c.org
7880
}
7981

82+
func (c *Cache) GetMetricNameID(name string) (int, bool) {
83+
return c.MetricName.GetID(name)
84+
}
85+
86+
func (c *Cache) SetMetricNameID(name string, id int) {
87+
c.MetricName.setID(name, id)
88+
}
89+
90+
func (c *Cache) GetLabelNameID(name string) (int, bool) {
91+
return c.LabelName.GetID(name)
92+
}
93+
94+
func (c *Cache) GetLabelValueID(value string) (int, bool) {
95+
return c.LabelValue.GetID(value)
96+
}
97+
98+
func (c *Cache) GetLabelID(name, value string) (int, bool) {
99+
return c.Label.GetIDByKey(NewLabelKey(name, value))
100+
}
101+
102+
func (c *Cache) GetLabelKeyToID() map[LabelKey]int {
103+
return c.Label.GetKeyToID()
104+
}
105+
106+
func (c *Cache) GetLabelNameByID(id int) (string, bool) {
107+
return c.LabelName.GetNameByID(id)
108+
}
109+
110+
func (c *Cache) GetLabelValueByID(id int) (string, bool) {
111+
return c.LabelValue.GetValueByID(id)
112+
}
113+
114+
func (c *Cache) GetMetricNameToID() map[string]int {
115+
return c.MetricName.GetNameToID()
116+
}
117+
118+
func (c *Cache) GetMetricAndAPPLabelLayout() map[LayoutKey]uint8 {
119+
return c.MetricAndAPPLabelLayout.GetLayoutKeyToIndex()
120+
}
121+
122+
func (c *Cache) GetMetricAndAPPLabelLayoutIndex(key LayoutKey) (uint8, bool) {
123+
return c.MetricAndAPPLabelLayout.GetIndexByKey(key)
124+
}
125+
126+
func (c *Cache) AddMetricAndAPPLabelLayoutsFromGrpc(batch []*controller.PrometheusMetricAPPLabelLayout) {
127+
c.MetricAndAPPLabelLayout.AddFromGrpc(batch)
128+
}
129+
130+
func (c *Cache) SetLabelNameID(name string, id int) {
131+
c.LabelName.setID(name, id)
132+
}
133+
134+
func (c *Cache) SetLabelValueID(value string, id int) {
135+
c.LabelValue.setID(value, id)
136+
}
137+
138+
func (c *Cache) AddMetricNames(batch []*metadbmodel.PrometheusMetricName) {
139+
c.MetricName.Add(batch)
140+
}
141+
142+
func (c *Cache) AddMetricNamesFromGrpc(batch []*controller.PrometheusMetricName) {
143+
c.MetricName.AddFromGrpc(batch)
144+
}
145+
146+
func (c *Cache) AddLabelNames(batch []*metadbmodel.PrometheusLabelName) {
147+
c.LabelName.Add(batch)
148+
}
149+
150+
func (c *Cache) AddLabelNamesFromGrpc(batch []*controller.PrometheusLabelName) {
151+
c.LabelName.AddFromGrpc(batch)
152+
}
153+
154+
func (c *Cache) AddLabelValues(batch []*metadbmodel.PrometheusLabelValue) {
155+
c.LabelValue.Add(batch)
156+
}
157+
158+
func (c *Cache) AddLabelValuesFromGrpc(batch []*controller.PrometheusLabelValue) {
159+
c.LabelValue.AddFromGrpc(batch)
160+
}
161+
162+
func (c *Cache) AddLabels(batch []*metadbmodel.PrometheusLabel) {
163+
c.Label.Add(batch)
164+
}
165+
166+
func (c *Cache) AddLabelsFromGrpc(batch []*controller.PrometheusLabel) {
167+
c.Label.AddFromGrpc(batch)
168+
}
169+
80170
func (c *Cache) Refresh() (err error) {
81-
LOOP:
82-
for {
83-
select {
84-
case <-c.canRefresh:
85-
err = c.refresh()
171+
now := time.Now()
172+
select {
173+
case <-c.canRefresh:
174+
defer func() {
86175
c.canRefresh <- true
87-
break LOOP
88-
default:
89-
time.Sleep(time.Second)
90-
log.Infof("last refresh cache not completed now", c.org.LogPrefix)
91-
}
176+
}()
177+
default:
178+
return nil
179+
}
180+
181+
if c.refreshInterval > 0 && !c.lastRefresh.IsZero() && now.Sub(c.lastRefresh) < c.refreshInterval {
182+
return nil
183+
}
184+
185+
err = c.refresh()
186+
if err == nil {
187+
c.lastRefresh = now
92188
}
93189
return
94190
}
95191

96192
func (c *Cache) refresh() error {
97193
log.Infof("refresh cache started", c.org.LogPrefix)
98-
// LabelName and LabelValue must be refreshed before Label,
99-
// because Label.refresh() converts name/value strings to IDs.
100-
egRunAhead := &errgroup.Group{}
101-
common.AppendErrGroup(egRunAhead, c.MetricName.refresh)
102-
common.AppendErrGroup(egRunAhead, c.LabelName.refresh)
103-
common.AppendErrGroup(egRunAhead, c.LabelValue.refresh)
104-
if err := egRunAhead.Wait(); err != nil {
105-
return err
106-
}
107194
eg := &errgroup.Group{}
108-
common.AppendErrGroup(eg, c.Label.refresh)
195+
common.AppendErrGroup(eg, c.MetricName.refresh)
196+
common.AppendErrGroup(eg, c.LabelName.refresh)
197+
common.AppendErrGroup(eg, c.LabelValue.refresh)
109198
common.AppendErrGroup(eg, c.MetricAndAPPLabelLayout.refresh)
110199
err := eg.Wait()
200+
if err != nil {
201+
return err
202+
}
203+
err = c.Label.refresh()
111204
log.Infof("refresh cache completed", c.org.LogPrefix)
112205
return err
113206

server/controller/prometheus/cache/cache_db_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ func generateDBLabels(n int) []metadbmodel.PrometheusLabel {
158158
now := time.Now()
159159
for i := 0; i < n; i++ {
160160
items[i] = metadbmodel.PrometheusLabel{
161+
PrometheusAutoIncID: metadbmodel.PrometheusAutoIncID{ID: i + 1},
161162
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
162163
Name: fmt.Sprintf("n_%d", i),
163164
Value: fmt.Sprintf("v_%d", i),
@@ -252,11 +253,33 @@ func TestSelect_Label_LoadOnlyIDNameValue(t *testing.T) {
252253
db := setupTestDB(t)
253254
defer cleanupTestDB(t)
254255

256+
nameItems := make([]metadbmodel.PrometheusLabelName, 5000)
257+
valueItems := make([]metadbmodel.PrometheusLabelValue, 5000)
258+
now := time.Now()
259+
for i := 0; i < 5000; i++ {
260+
nameItems[i] = metadbmodel.PrometheusLabelName{
261+
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
262+
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
263+
Name: fmt.Sprintf("n_%d", i),
264+
}
265+
valueItems[i] = metadbmodel.PrometheusLabelValue{
266+
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
267+
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
268+
Value: fmt.Sprintf("v_%d", i),
269+
}
270+
}
271+
require.NoError(t, batchInsert(db, nameItems, 1000))
272+
require.NoError(t, batchInsert(db, valueItems, 1000))
273+
255274
items := generateDBLabels(5000)
256275
require.NoError(t, batchInsert(db, items, 1000))
257276

258277
l := newTestLabel()
259278
l.org = newTestORG(db)
279+
l.labelName.org = l.org
280+
l.labelValue.org = l.org
281+
require.NoError(t, l.labelName.refresh())
282+
require.NoError(t, l.labelValue.refresh())
260283

261284
err := l.refresh()
262285
require.NoError(t, err)
@@ -298,12 +321,34 @@ func TestSelect_Label_RefreshDiscardsDeletedRows(t *testing.T) {
298321
db := setupTestDB(t)
299322
defer cleanupTestDB(t)
300323

324+
nameItems := make([]metadbmodel.PrometheusLabelName, 200)
325+
valueItems := make([]metadbmodel.PrometheusLabelValue, 200)
326+
now := time.Now()
327+
for i := 0; i < 200; i++ {
328+
nameItems[i] = metadbmodel.PrometheusLabelName{
329+
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
330+
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
331+
Name: fmt.Sprintf("n_%d", i),
332+
}
333+
valueItems[i] = metadbmodel.PrometheusLabelValue{
334+
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
335+
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
336+
Value: fmt.Sprintf("v_%d", i),
337+
}
338+
}
339+
require.NoError(t, batchInsert(db, nameItems, 100))
340+
require.NoError(t, batchInsert(db, valueItems, 100))
341+
301342
// 第一轮:插入 200 条并 refresh
302343
items := generateDBLabels(200)
303344
require.NoError(t, batchInsert(db, items, 100))
304345

305346
l := newTestLabel()
306347
l.org = newTestORG(db)
348+
l.labelName.org = l.org
349+
l.labelValue.org = l.org
350+
require.NoError(t, l.labelName.refresh())
351+
require.NoError(t, l.labelValue.refresh())
307352
require.NoError(t, l.refresh())
308353
assert.Equal(t, 200, countLabelConcurrentMap(l.GetKeyToID()))
309354

@@ -439,10 +484,34 @@ func TestSelect_Label_500K_Refresh(t *testing.T) {
439484
insertStart := time.Now()
440485
items := generateDBLabels(N)
441486
require.NoError(t, batchInsert(db, items, 5000))
487+
488+
// Insert unique label names and values
489+
labelNames := make([]metadbmodel.PrometheusLabelName, N)
490+
labelValues := make([]metadbmodel.PrometheusLabelValue, N)
491+
now := time.Now()
492+
for i := 0; i < N; i++ {
493+
labelNames[i] = metadbmodel.PrometheusLabelName{
494+
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
495+
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
496+
Name: fmt.Sprintf("n_%d", i),
497+
}
498+
labelValues[i] = metadbmodel.PrometheusLabelValue{
499+
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
500+
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
501+
Value: fmt.Sprintf("v_%d", i),
502+
}
503+
}
504+
require.NoError(t, batchInsert(db, labelNames, 5000))
505+
require.NoError(t, batchInsert(db, labelValues, 5000))
506+
442507
t.Logf("insert completed in %v", time.Since(insertStart))
443508

444509
l := newTestLabel()
445510
l.org = newTestORG(db)
511+
l.labelName.org = l.org
512+
l.labelValue.org = l.org
513+
require.NoError(t, l.labelName.refresh())
514+
require.NoError(t, l.labelValue.refresh())
446515

447516
refreshStart := time.Now()
448517
err := l.refresh()

0 commit comments

Comments
 (0)