Skip to content

Commit 789d735

Browse files
merge caches
1 parent 34b62be commit 789d735

22 files changed

Lines changed: 777 additions & 529 deletions

server/controller/prometheus/cache/cache.go

Lines changed: 137 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"golang.org/x/sync/errgroup"
2626

2727
"github.com/deepflowio/deepflow/message/controller"
28+
metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model"
2829
"github.com/deepflowio/deepflow/server/controller/prometheus/common"
2930
"github.com/deepflowio/deepflow/server/libs/logger"
3031
)
@@ -40,8 +41,11 @@ type Cache struct {
4041
org *common.ORG
4142
ctx context.Context
4243

43-
canRefresh chan bool
4444
refreshInterval time.Duration
45+
lastRefresh time.Time
46+
47+
refreshing bool
48+
refreshCond *sync.Cond
4549

4650
MetricName *metricName
4751
LabelName *labelName
@@ -62,52 +66,160 @@ func newCache(orgID int) (*Cache, error) {
6266
lv := newLabelValue(org)
6367
c := &Cache{
6468
org: org,
65-
canRefresh: make(chan bool, 1),
69+
refreshCond: sync.NewCond(&sync.Mutex{}),
6670
MetricName: mn,
6771
LabelName: ln,
6872
LabelValue: lv,
6973
MetricAndAPPLabelLayout: newMetricAndAPPLabelLayout(org),
7074
Label: newLabel(org, ln, lv),
7175
}
72-
c.canRefresh <- true
7376
return c, nil
7477
}
7578

7679
func (c *Cache) GetORG() *common.ORG {
7780
return c.org
7881
}
7982

80-
func (c *Cache) Refresh() (err error) {
81-
LOOP:
82-
for {
83-
select {
84-
case <-c.canRefresh:
85-
err = c.refresh()
86-
c.canRefresh <- true
87-
break LOOP
88-
default:
89-
time.Sleep(time.Second)
90-
log.Infof("last refresh cache not completed now", c.org.LogPrefix)
83+
func (c *Cache) GetMetricNameID(name string) (int, bool) {
84+
return c.MetricName.GetID(name)
85+
}
86+
87+
func (c *Cache) SetMetricNameID(name string, id int) {
88+
c.MetricName.setID(name, id)
89+
}
90+
91+
func (c *Cache) GetLabelNameID(name string) (int, bool) {
92+
return c.LabelName.GetID(name)
93+
}
94+
95+
func (c *Cache) GetLabelValueID(value string) (int, bool) {
96+
return c.LabelValue.GetID(value)
97+
}
98+
99+
func (c *Cache) GetLabelID(name, value string) (int, bool) {
100+
return c.Label.GetIDByKey(NewLabelKey(name, value))
101+
}
102+
103+
func (c *Cache) GetLabelKeyToID() map[LabelKey]int {
104+
return c.Label.GetKeyToID()
105+
}
106+
107+
func (c *Cache) GetLabelNameByID(id int) (string, bool) {
108+
return c.LabelName.GetNameByID(id)
109+
}
110+
111+
func (c *Cache) GetLabelValueByID(id int) (string, bool) {
112+
return c.LabelValue.GetValueByID(id)
113+
}
114+
115+
func (c *Cache) GetMetricNameToID() map[string]int {
116+
return c.MetricName.GetNameToID()
117+
}
118+
119+
func (c *Cache) GetMetricAndAPPLabelLayout() map[LayoutKey]uint8 {
120+
return c.MetricAndAPPLabelLayout.GetLayoutKeyToIndex()
121+
}
122+
123+
func (c *Cache) GetMetricAndAPPLabelLayoutIndex(key LayoutKey) (uint8, bool) {
124+
return c.MetricAndAPPLabelLayout.GetIndexByKey(key)
125+
}
126+
127+
func (c *Cache) AddMetricAndAPPLabelLayoutsFromGrpc(batch []*controller.PrometheusMetricAPPLabelLayout) {
128+
c.MetricAndAPPLabelLayout.AddFromGrpc(batch)
129+
}
130+
131+
func (c *Cache) SetLabelNameID(name string, id int) {
132+
c.LabelName.setID(name, id)
133+
}
134+
135+
func (c *Cache) SetLabelValueID(value string, id int) {
136+
c.LabelValue.setID(value, id)
137+
}
138+
139+
func (c *Cache) AddMetricNames(batch []*metadbmodel.PrometheusMetricName) {
140+
c.MetricName.Add(batch)
141+
}
142+
143+
func (c *Cache) AddMetricNamesFromGrpc(batch []*controller.PrometheusMetricName) {
144+
c.MetricName.AddFromGrpc(batch)
145+
}
146+
147+
func (c *Cache) AddLabelNames(batch []*metadbmodel.PrometheusLabelName) {
148+
c.LabelName.Add(batch)
149+
}
150+
151+
func (c *Cache) AddLabelNamesFromGrpc(batch []*controller.PrometheusLabelName) {
152+
c.LabelName.AddFromGrpc(batch)
153+
}
154+
155+
func (c *Cache) AddLabelValues(batch []*metadbmodel.PrometheusLabelValue) {
156+
c.LabelValue.Add(batch)
157+
}
158+
159+
func (c *Cache) AddLabelValuesFromGrpc(batch []*controller.PrometheusLabelValue) {
160+
c.LabelValue.AddFromGrpc(batch)
161+
}
162+
163+
func (c *Cache) AddLabels(batch []*metadbmodel.PrometheusLabel) {
164+
c.Label.Add(batch)
165+
}
166+
167+
func (c *Cache) AddLabelsFromGrpc(batch []*controller.PrometheusLabel) {
168+
c.Label.AddFromGrpc(batch)
169+
}
170+
171+
func (c *Cache) Refresh(wait bool) error {
172+
c.refreshCond.L.Lock()
173+
if c.refreshing {
174+
if wait {
175+
// wait for refresh to complete
176+
for c.refreshing {
177+
c.refreshCond.Wait()
178+
}
179+
c.refreshCond.L.Unlock()
180+
return nil
91181
}
182+
c.refreshCond.L.Unlock()
183+
return nil
92184
}
93-
return
185+
186+
if !wait && c.refreshInterval > 0 && !c.lastRefresh.IsZero() && time.Since(c.lastRefresh) < c.refreshInterval {
187+
c.refreshCond.L.Unlock()
188+
return nil
189+
}
190+
191+
c.refreshing = true
192+
c.refreshCond.L.Unlock()
193+
194+
err := c.doRefresh()
195+
196+
c.refreshCond.L.Lock()
197+
c.refreshing = false
198+
c.refreshCond.Broadcast()
199+
c.refreshCond.L.Unlock()
200+
return err
201+
}
202+
203+
func (c *Cache) doRefresh() error {
204+
err := c.refresh()
205+
if err == nil {
206+
c.lastRefresh = time.Now()
207+
}
208+
return err
94209
}
95210

96211
func (c *Cache) refresh() error {
97212
log.Infof("refresh cache started", c.org.LogPrefix)
98-
// LabelName and LabelValue must be refreshed before Label,
99-
// because Label.refresh() converts name/value strings to IDs.
100-
egRunAhead := &errgroup.Group{}
101-
common.AppendErrGroup(egRunAhead, c.MetricName.refresh)
102-
common.AppendErrGroup(egRunAhead, c.LabelName.refresh)
103-
common.AppendErrGroup(egRunAhead, c.LabelValue.refresh)
104-
if err := egRunAhead.Wait(); err != nil {
105-
return err
106-
}
107213
eg := &errgroup.Group{}
108-
common.AppendErrGroup(eg, c.Label.refresh)
214+
common.AppendErrGroup(eg, c.MetricName.refresh)
215+
common.AppendErrGroup(eg, c.LabelName.refresh)
216+
common.AppendErrGroup(eg, c.LabelValue.refresh)
109217
common.AppendErrGroup(eg, c.MetricAndAPPLabelLayout.refresh)
110218
err := eg.Wait()
219+
if err != nil {
220+
return err
221+
}
222+
err = c.Label.refresh()
111223
log.Infof("refresh cache completed", c.org.LogPrefix)
112224
return err
113225

server/controller/prometheus/cache/cache_db_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ func generateDBLabels(n int) []metadbmodel.PrometheusLabel {
158158
now := time.Now()
159159
for i := 0; i < n; i++ {
160160
items[i] = metadbmodel.PrometheusLabel{
161+
PrometheusAutoIncID: metadbmodel.PrometheusAutoIncID{ID: i + 1},
161162
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
162163
Name: fmt.Sprintf("n_%d", i),
163164
Value: fmt.Sprintf("v_%d", i),
@@ -252,11 +253,33 @@ func TestSelect_Label_LoadOnlyIDNameValue(t *testing.T) {
252253
db := setupTestDB(t)
253254
defer cleanupTestDB(t)
254255

256+
nameItems := make([]metadbmodel.PrometheusLabelName, 5000)
257+
valueItems := make([]metadbmodel.PrometheusLabelValue, 5000)
258+
now := time.Now()
259+
for i := 0; i < 5000; i++ {
260+
nameItems[i] = metadbmodel.PrometheusLabelName{
261+
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
262+
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
263+
Name: fmt.Sprintf("n_%d", i),
264+
}
265+
valueItems[i] = metadbmodel.PrometheusLabelValue{
266+
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
267+
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
268+
Value: fmt.Sprintf("v_%d", i),
269+
}
270+
}
271+
require.NoError(t, batchInsert(db, nameItems, 1000))
272+
require.NoError(t, batchInsert(db, valueItems, 1000))
273+
255274
items := generateDBLabels(5000)
256275
require.NoError(t, batchInsert(db, items, 1000))
257276

258277
l := newTestLabel()
259278
l.org = newTestORG(db)
279+
l.labelName.org = l.org
280+
l.labelValue.org = l.org
281+
require.NoError(t, l.labelName.refresh())
282+
require.NoError(t, l.labelValue.refresh())
260283

261284
err := l.refresh()
262285
require.NoError(t, err)
@@ -298,12 +321,34 @@ func TestSelect_Label_RefreshDiscardsDeletedRows(t *testing.T) {
298321
db := setupTestDB(t)
299322
defer cleanupTestDB(t)
300323

324+
nameItems := make([]metadbmodel.PrometheusLabelName, 200)
325+
valueItems := make([]metadbmodel.PrometheusLabelValue, 200)
326+
now := time.Now()
327+
for i := 0; i < 200; i++ {
328+
nameItems[i] = metadbmodel.PrometheusLabelName{
329+
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
330+
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
331+
Name: fmt.Sprintf("n_%d", i),
332+
}
333+
valueItems[i] = metadbmodel.PrometheusLabelValue{
334+
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
335+
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
336+
Value: fmt.Sprintf("v_%d", i),
337+
}
338+
}
339+
require.NoError(t, batchInsert(db, nameItems, 100))
340+
require.NoError(t, batchInsert(db, valueItems, 100))
341+
301342
// 第一轮:插入 200 条并 refresh
302343
items := generateDBLabels(200)
303344
require.NoError(t, batchInsert(db, items, 100))
304345

305346
l := newTestLabel()
306347
l.org = newTestORG(db)
348+
l.labelName.org = l.org
349+
l.labelValue.org = l.org
350+
require.NoError(t, l.labelName.refresh())
351+
require.NoError(t, l.labelValue.refresh())
307352
require.NoError(t, l.refresh())
308353
assert.Equal(t, 200, countLabelConcurrentMap(l.GetKeyToID()))
309354

@@ -439,10 +484,34 @@ func TestSelect_Label_500K_Refresh(t *testing.T) {
439484
insertStart := time.Now()
440485
items := generateDBLabels(N)
441486
require.NoError(t, batchInsert(db, items, 5000))
487+
488+
// Insert unique label names and values
489+
labelNames := make([]metadbmodel.PrometheusLabelName, N)
490+
labelValues := make([]metadbmodel.PrometheusLabelValue, N)
491+
now := time.Now()
492+
for i := 0; i < N; i++ {
493+
labelNames[i] = metadbmodel.PrometheusLabelName{
494+
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
495+
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
496+
Name: fmt.Sprintf("n_%d", i),
497+
}
498+
labelValues[i] = metadbmodel.PrometheusLabelValue{
499+
PrometheusID: metadbmodel.PrometheusID{ID: i + 1},
500+
PrometheusOperatedTime: metadbmodel.PrometheusOperatedTime{CreatedAt: now, SyncedAt: now},
501+
Value: fmt.Sprintf("v_%d", i),
502+
}
503+
}
504+
require.NoError(t, batchInsert(db, labelNames, 5000))
505+
require.NoError(t, batchInsert(db, labelValues, 5000))
506+
442507
t.Logf("insert completed in %v", time.Since(insertStart))
443508

444509
l := newTestLabel()
445510
l.org = newTestORG(db)
511+
l.labelName.org = l.org
512+
l.labelValue.org = l.org
513+
require.NoError(t, l.labelName.refresh())
514+
require.NoError(t, l.labelValue.refresh())
446515

447516
refreshStart := time.Now()
448517
err := l.refresh()

0 commit comments

Comments
 (0)