Skip to content

Commit d38e2a4

Browse files
authored
Change expectedSymbols to 2048 (#7398)
* Change expectedSymbols to 2048 Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * fix changelog Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * Add adaptive symbol capacity Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * update changelog Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> --------- Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent ef39fee commit d38e2a4

File tree

3 files changed

+87
-25
lines changed

3 files changed

+87
-25
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## master / unreleased
44
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
55
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
6+
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398
67
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359
78
* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357
89
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160

pkg/cortexpb/timeseriesv2.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,19 @@ package cortexpb
22

33
import (
44
"sync"
5+
6+
"go.uber.org/atomic"
57
)
68

9+
var dynamicSymbolsCapacity atomic.Int64
10+
11+
func init() {
12+
dynamicSymbolsCapacity.Store(int64(initialSymbolsCapacity))
13+
}
14+
715
var (
8-
expectedSymbols = 20
16+
initialSymbolsCapacity = 128
17+
maxSymbolsCapacity = int64(8192)
918

1019
slicePoolV2 = sync.Pool{
1120
New: func() any {
@@ -29,7 +38,7 @@ var (
2938
New: func() any {
3039
return &PreallocWriteRequestV2{
3140
WriteRequestV2: WriteRequestV2{
32-
Symbols: make([]string, 0, expectedSymbols),
41+
Symbols: make([]string, 0, dynamicSymbolsCapacity.Load()),
3342
},
3443
}
3544
},
@@ -78,6 +87,30 @@ func ReuseWriteRequestV2(req *PreallocWriteRequestV2) {
7887
}
7988
req.Source = 0
8089

90+
// If the underlying array has grown beyond our acceptable maximum capacity,
91+
// we discard this object instead of putting it back into the pool to let GC
92+
// reclaim it.
93+
symbolsCap := int64(cap(req.Symbols))
94+
if symbolsCap > maxSymbolsCapacity {
95+
if req.Timeseries != nil {
96+
ReuseSliceV2(req.Timeseries)
97+
req.Timeseries = nil
98+
}
99+
return
100+
}
101+
102+
// Update the dynamic symbol capacity.
103+
for {
104+
current := dynamicSymbolsCapacity.Load()
105+
if symbolsCap <= current {
106+
// break when other goroutines have already updated the capacity to a larger value
107+
break
108+
}
109+
if dynamicSymbolsCapacity.CompareAndSwap(current, symbolsCap) {
110+
break
111+
}
112+
}
113+
81114
for i := range req.Symbols {
82115
req.Symbols[i] = ""
83116
}

pkg/cortexpb/timeseriesv2_test.go

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -59,35 +59,63 @@ func TestTimeseriesV2FromPool(t *testing.T) {
5959
}
6060

6161
func TestReuseWriteRequestV2(t *testing.T) {
62-
req := PreallocWriteRequestV2FromPool()
62+
t.Run("resets fields to default and cleans backing array", func(t *testing.T) {
63+
req := PreallocWriteRequestV2FromPool()
64+
65+
// Populate req with some data.
66+
req.Source = RULE
67+
req.Symbols = append(req.Symbols, "", "__name__", "test")
68+
69+
tsSlice := PreallocTimeseriesV2SliceFromPool()
70+
tsSlice = append(tsSlice, PreallocTimeseriesV2{TimeSeriesV2: TimeseriesV2FromPool()})
71+
req.Timeseries = tsSlice
72+
73+
// Capture backing array before reuse
74+
symbolsBackingArray := req.Symbols[:cap(req.Symbols)]
75+
require.Equal(t, "__name__", symbolsBackingArray[1])
76+
require.Equal(t, "test", symbolsBackingArray[2])
77+
78+
// Put the request back into the pool
79+
ReuseWriteRequestV2(req)
80+
81+
// Verify clearing directly on the backing array
82+
for i, s := range symbolsBackingArray[:3] {
83+
assert.Equalf(t, "", s, "symbol at index %d not cleared", i)
84+
}
85+
86+
// Source is reset to default
87+
assert.Equal(t, API, req.Source)
88+
// The symbol length is properly reset to 0.
89+
assert.Len(t, req.Symbols, 0)
90+
// Timeseries slice is nil
91+
assert.Nil(t, req.Timeseries)
92+
})
93+
t.Run("updates dynamic capacity", func(t *testing.T) {
94+
currentCap := dynamicSymbolsCapacity.Load()
95+
newCap := int(currentCap) + 100 // Increase capacity
6396

64-
// Populate req with some data.
65-
req.Source = RULE
66-
req.Symbols = append(req.Symbols, "", "__name__", "test")
97+
req := PreallocWriteRequestV2FromPool()
98+
req.Symbols = make([]string, newCap)
99+
req.Timeseries = PreallocTimeseriesV2SliceFromPool()
67100

68-
tsSlice := PreallocTimeseriesV2SliceFromPool()
69-
tsSlice = append(tsSlice, PreallocTimeseriesV2{TimeSeriesV2: TimeseriesV2FromPool()})
70-
req.Timeseries = tsSlice
101+
ReuseWriteRequestV2(req)
71102

72-
// Capture backing array before reuse
73-
symbolsBackingArray := req.Symbols[:cap(req.Symbols)]
74-
require.Equal(t, "__name__", symbolsBackingArray[1])
75-
require.Equal(t, "test", symbolsBackingArray[2])
103+
// Verify that the dynamic capacity has scaled up
104+
assert.Equal(t, int64(newCap), dynamicSymbolsCapacity.Load())
105+
})
106+
t.Run("outlier capacity does not update dynamic capacity and is discarded", func(t *testing.T) {
107+
currentCap := dynamicSymbolsCapacity.Load()
108+
outlierCap := int(maxSymbolsCapacity) + 100 // Exceeds the max limit
76109

77-
// Put the request back into the pool
78-
ReuseWriteRequestV2(req)
110+
req := PreallocWriteRequestV2FromPool()
111+
req.Symbols = make([]string, outlierCap)
112+
req.Timeseries = PreallocTimeseriesV2SliceFromPool()
79113

80-
// Verify clearing directly on the backing array
81-
for i, s := range symbolsBackingArray[:3] {
82-
assert.Equalf(t, "", s, "symbol at index %d not cleared", i)
83-
}
114+
ReuseWriteRequestV2(req)
84115

85-
// Source is reset to default
86-
assert.Equal(t, API, req.Source)
87-
// The symbol length is properly reset to 0.
88-
assert.Len(t, req.Symbols, 0)
89-
// Timeseries slice is nil
90-
assert.Nil(t, req.Timeseries)
116+
// Verify dynamic capacity didn't increase due to out-of-bound outlier
117+
assert.Equal(t, currentCap, dynamicSymbolsCapacity.Load())
118+
})
91119
}
92120

93121
func BenchmarkMarshallWriteRequestV2(b *testing.B) {

0 commit comments

Comments
 (0)