Skip to content

Commit 551b1f4

Browse files
authored
preserving the capacity of the Symbols slice during resets (#7404)
* preserving the capacity of the Symbols slice during resets Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * delete req.Free() due to http path Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * Add benchmark Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * fix lint Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> --------- Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 8edbeb8 commit 551b1f4

5 files changed

Lines changed: 196 additions & 10 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## master / unreleased
4+
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
45
* [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Supports Grafana Explore, Perses, and other UIs. #7302
56
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
67
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
@@ -10,11 +11,11 @@
1011
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359
1112
* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357
1213
* [ENHANCEMENT] Update build image and Go version to 1.26. #7437
13-
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
1414
* [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363
1515
* [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355
1616
* [ENHANCEMENT] Compactor: Prevent partition compaction to compact any blocks marked for deletion. #7391
1717
* [ENHANCEMENT] Distributor: Optimize memory allocations by reusing the existing capacity of these pooled slices in the Prometheus Remote Write 2.0 path. #7392
18+
* [ENHANCEMENT] Distributor: Optimize memory allocations by pooling PreallocWriteRequestV2 and preserving the capacity of the Symbols slice during resets. #7404
1819
* [BUGFIX] Alertmanager: Fix disappearing user config and state when ring is temporarily unreachable. #7372
1920
* [BUGFIX] Fix nil when ingester_query_max_attempts > 1. #7369
2021
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370

pkg/cortexpb/timeseriesv2.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,14 @@ func PreallocWriteRequestV2FromPool() *PreallocWriteRequestV2 {
130130
return writeRequestPoolV2.Get().(*PreallocWriteRequestV2)
131131
}
132132

133+
// Reset implements proto.Message and preserves the capacity of the Symbols slice.
134+
func (p *PreallocWriteRequestV2) Reset() {
135+
savedSymbols := p.Symbols
136+
p.WriteRequestV2.Reset()
137+
p.Symbols = savedSymbols[:0]
138+
p.data = nil
139+
}
140+
133141
// PreallocTimeseriesV2SliceFromPool retrieves a slice of PreallocTimeseriesV2 from a sync.Pool.
134142
// ReuseSliceV2 should be called once done.
135143
func PreallocTimeseriesV2SliceFromPool() []PreallocTimeseriesV2 {

pkg/cortexpb/timeseriesv2_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,71 @@ func TestReuseWriteRequestV2(t *testing.T) {
122122
})
123123
}
124124

125+
func TestPreallocWriteRequestV2Reset(t *testing.T) {
126+
t.Run("preserves Symbols capacity", func(t *testing.T) {
127+
const symbolsCap = 100
128+
req := &PreallocWriteRequestV2{
129+
WriteRequestV2: WriteRequestV2{
130+
Symbols: make([]string, 0, symbolsCap),
131+
},
132+
}
133+
req.Symbols = append(req.Symbols, "a", "b", "c")
134+
135+
ptrBefore := &req.Symbols[:cap(req.Symbols)][0]
136+
137+
req.Reset()
138+
139+
assert.Equal(t, 0, len(req.Symbols), "Symbols length should be 0 after Reset")
140+
assert.Equal(t, symbolsCap, cap(req.Symbols), "Symbols capacity should be preserved after Reset")
141+
assert.Same(t, ptrBefore, &req.Symbols[:cap(req.Symbols)][0], "Symbols backing array should be reused after Reset")
142+
})
143+
144+
t.Run("clears non-Symbols WriteRequestV2 fields", func(t *testing.T) {
145+
b := []byte{1, 2, 3}
146+
req := &PreallocWriteRequestV2{
147+
WriteRequestV2: WriteRequestV2{
148+
Source: RULE,
149+
SkipLabelNameValidation: true,
150+
Timeseries: []PreallocTimeseriesV2{{TimeSeriesV2: &TimeSeriesV2{}}},
151+
},
152+
data: &b,
153+
}
154+
155+
req.Reset()
156+
157+
assert.Equal(t, SourceEnum(0), req.Source)
158+
assert.False(t, req.SkipLabelNameValidation)
159+
assert.Nil(t, req.Timeseries)
160+
assert.Nil(t, req.data)
161+
})
162+
163+
t.Run("Unmarshal after Reset reuses Symbols backing array", func(t *testing.T) {
164+
const symbolsCount = 50
165+
symbols := make([]string, symbolsCount)
166+
for i := range symbols {
167+
symbols[i] = fmt.Sprintf("symbol_%04d", i)
168+
}
169+
data, err := (&WriteRequestV2{Symbols: symbols}).Marshal()
170+
require.NoError(t, err)
171+
172+
req := &PreallocWriteRequestV2{
173+
WriteRequestV2: WriteRequestV2{
174+
Symbols: make([]string, 0, symbolsCount*2),
175+
},
176+
}
177+
178+
// Simulate Reset in util.ParseProtoReader()
179+
req.Reset()
180+
ptrAfterReset := &req.Symbols[:cap(req.Symbols)][0]
181+
capAfterReset := cap(req.Symbols)
182+
183+
require.NoError(t, req.WriteRequestV2.Unmarshal(data))
184+
assert.Equal(t, symbolsCount, len(req.Symbols))
185+
assert.Equal(t, capAfterReset, cap(req.Symbols), "capacity should not change: Unmarshal reused the existing backing array")
186+
assert.Same(t, ptrAfterReset, &req.Symbols[:cap(req.Symbols)][0], "backing array pointer should be identical: no new allocation occurred")
187+
})
188+
}
189+
125190
func BenchmarkMarshallWriteRequestV2(b *testing.B) {
126191
ts := PreallocTimeseriesV2SliceFromPool()
127192

pkg/util/push/push.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,11 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool,
9494
return
9595
}
9696

97-
var req cortexpb.PreallocWriteRequestV2
97+
req := cortexpb.PreallocWriteRequestV2FromPool()
9898
// v1 request is put back into the pool by the Distributor.
99-
defer func() {
100-
cortexpb.ReuseWriteRequestV2(&req)
101-
req.Free()
102-
}()
99+
defer cortexpb.ReuseWriteRequestV2(req)
103100

104-
err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
101+
err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, req, util.RawSnappy)
105102
if err != nil {
106103
level.Error(logger).Log("err", err.Error())
107104
http.Error(w, err.Error(), http.StatusBadRequest)
@@ -113,7 +110,7 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool,
113110
req.Source = cortexpb.API
114111
}
115112

116-
v1Req, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID))
113+
v1Req, err := convertV2RequestToV1(req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID))
117114
if err != nil {
118115
level.Error(logger).Log("err", err.Error())
119116
http.Error(w, err.Error(), http.StatusBadRequest)

pkg/util/push/push_test.go

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@ import (
2424
"github.com/weaveworks/common/user"
2525

2626
"github.com/cortexproject/cortex/pkg/cortexpb"
27+
"github.com/cortexproject/cortex/pkg/util"
2728
"github.com/cortexproject/cortex/pkg/util/flagext"
2829
"github.com/cortexproject/cortex/pkg/util/validation"
2930
)
3031

32+
// benchMaxRecvMsgSize is the max message size used in benchmarks.
33+
const benchMaxRecvMsgSize = 100 * 1024 * 1024
34+
3135
var (
3236
testHistogram = histogram.Histogram{
3337
Schema: 2,
@@ -42,6 +46,51 @@ var (
4246
}
4347
)
4448

49+
// makeV2ReqWithSeriesAndSymbols builds a PRW2 request with the given number of
50+
// series and symbols
51+
func makeV2ReqWithSeriesAndSymbols(seriesNum, symbolCount int) *cortexpb.PreallocWriteRequestV2 {
52+
const baseSymbols = 5 // "", "__name__", "bench_metric", "help text", "unit"
53+
if symbolCount < baseSymbols {
54+
symbolCount = baseSymbols
55+
}
56+
57+
symbols := make([]string, 0, symbolCount)
58+
symbols = append(symbols, "", "__name__", "bench_metric", "help text", "unit")
59+
60+
extraPairs := (symbolCount - baseSymbols) / 2
61+
for i := range extraPairs {
62+
symbols = append(symbols, fmt.Sprintf("lbl_%d", i), fmt.Sprintf("val_%d", i))
63+
}
64+
65+
labelsRefs := []uint32{1, 2} // __name__ = "bench_metric"
66+
for i := range extraPairs {
67+
nameIdx := uint32(baseSymbols + i*2)
68+
labelsRefs = append(labelsRefs, nameIdx, nameIdx+1)
69+
}
70+
71+
ts := make([]cortexpb.PreallocTimeseriesV2, 0, seriesNum)
72+
for range seriesNum {
73+
ts = append(ts, cortexpb.PreallocTimeseriesV2{
74+
TimeSeriesV2: &cortexpb.TimeSeriesV2{
75+
LabelsRefs: labelsRefs,
76+
Metadata: cortexpb.MetadataV2{
77+
Type: cortexpb.METRIC_TYPE_GAUGE,
78+
HelpRef: 3,
79+
UnitRef: 4,
80+
},
81+
Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 10}},
82+
},
83+
})
84+
}
85+
86+
return &cortexpb.PreallocWriteRequestV2{
87+
WriteRequestV2: cortexpb.WriteRequestV2{
88+
Symbols: symbols,
89+
Timeseries: ts,
90+
},
91+
}
92+
}
93+
4594
func makeV2ReqWithSeries(num int) *cortexpb.PreallocWriteRequestV2 {
4695
ts := make([]cortexpb.PreallocTimeseriesV2, 0, num)
4796
symbols := []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}
@@ -50,8 +99,7 @@ func makeV2ReqWithSeries(num int) *cortexpb.PreallocWriteRequestV2 {
5099
TimeSeriesV2: &cortexpb.TimeSeriesV2{
51100
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
52101
Metadata: cortexpb.MetadataV2{
53-
Type: cortexpb.METRIC_TYPE_GAUGE,
54-
102+
Type: cortexpb.METRIC_TYPE_GAUGE,
55103
HelpRef: 15,
56104
UnitRef: 16,
57105
},
@@ -178,6 +226,73 @@ func Benchmark_convertV2RequestToV1(b *testing.B) {
178226
}
179227
}
180228

229+
func makeEncodedPRW2Body(b *testing.B, seriesNum, symbolCount int) (body []byte, contentLength int) {
230+
b.Helper()
231+
series := makeV2ReqWithSeriesAndSymbols(seriesNum, symbolCount)
232+
protobuf, err := series.Marshal()
233+
if err != nil {
234+
b.Fatal(err)
235+
}
236+
encoded := snappy.Encode(nil, protobuf)
237+
return encoded, len(encoded)
238+
}
239+
240+
// runPRW2HandleFromPool simulates handlePRW2 using the sync.Pool
241+
func runPRW2HandleFromPool(ctx context.Context, body []byte, contentLength int, overrides *validation.Overrides, userID string) error {
242+
req := cortexpb.PreallocWriteRequestV2FromPool()
243+
defer cortexpb.ReuseWriteRequestV2(req)
244+
245+
if err := util.ParseProtoReader(ctx, bytes.NewReader(body), contentLength, benchMaxRecvMsgSize, req, util.RawSnappy); err != nil {
246+
return err
247+
}
248+
_, err := convertV2RequestToV1(req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID))
249+
return err
250+
}
251+
252+
// runPRW2HandleFromScratch simulates handlePRW2 without using the sync.Pool.
253+
func runPRW2HandleFromScratch(ctx context.Context, body []byte, contentLength int, overrides *validation.Overrides, userID string) error {
254+
var req cortexpb.PreallocWriteRequestV2
255+
defer cortexpb.ReuseWriteRequestV2(&req)
256+
257+
if err := util.ParseProtoReader(ctx, bytes.NewReader(body), contentLength, benchMaxRecvMsgSize, &req, util.RawSnappy); err != nil {
258+
return err
259+
}
260+
_, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID))
261+
return err
262+
}
263+
264+
// Benchmark_HandlePRW2_PoolVsScratch compares two allocation strategies for the PRW2 parse path.
265+
// - pool: req := cortexpb.PreallocWriteRequestV2FromPool() + defer ReuseWriteRequestV2(req)
266+
// - scratch: var req cortexpb.PreallocWriteRequestV2 + defer ReuseWriteRequestV2(&req)
267+
func Benchmark_HandlePRW2_PoolVsScratch(b *testing.B) {
268+
var limits validation.Limits
269+
flagext.DefaultValues(&limits)
270+
overrides := validation.NewOverrides(limits, nil)
271+
272+
userID := "bench-user"
273+
seriesNum := 100
274+
ctx := user.InjectOrgID(context.Background(), userID)
275+
276+
for _, symCount := range []int{32, 128, 512, 2048, 4096} {
277+
body, contentLength := makeEncodedPRW2Body(b, seriesNum, symCount)
278+
name := fmt.Sprintf("symbols=%d", symCount)
279+
280+
b.Run("pool/"+name, func(b *testing.B) {
281+
b.ReportAllocs()
282+
for b.Loop() {
283+
require.NoError(b, runPRW2HandleFromPool(ctx, body, contentLength, overrides, userID))
284+
}
285+
})
286+
287+
b.Run("scratch/"+name, func(b *testing.B) {
288+
b.ReportAllocs()
289+
for b.Loop() {
290+
require.NoError(b, runPRW2HandleFromScratch(ctx, body, contentLength, overrides, userID))
291+
}
292+
})
293+
}
294+
}
295+
181296
func Test_convertV2RequestToV1_WithEnableTypeAndUnitLabels(t *testing.T) {
182297
symbols := []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes", "__type__", "exist type", "__unit__", "exist unit"}
183298
samples := []cortexpb.Sample{

0 commit comments

Comments
 (0)