Skip to content

Commit 7221087

Browse files
committed
preserving the capacity of the Symbols slice during resets
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 53f7ae7 commit 7221087

4 files changed

Lines changed: 79 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## master / unreleased
4+
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
45
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
56
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
67
* [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374
@@ -9,11 +10,11 @@
910
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359
1011
* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357
1112
* [ENHANCEMENT] Update build image and Go version to 1.26. #7437
12-
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
1313
* [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363
1414
* [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355
1515
* [ENHANCEMENT] Compactor: Prevent partition compaction to compact any blocks marked for deletion. #7391
1616
* [ENHANCEMENT] Distributor: Optimize memory allocations by reusing the existing capacity of these pooled slices in the Prometheus Remote Write 2.0 path. #7392
17+
* [ENHANCEMENT] Distributor: Optimize memory allocations by pooling PreallocWriteRequestV2 and preserving the capacity of the Symbols slice during resets. #7404
1718
* [BUGFIX] Alertmanager: Fix disappearing user config and state when ring is temporarily unreachable. #7372
1819
* [BUGFIX] Fix nil when ingester_query_max_attempts > 1. #7369
1920
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370

pkg/cortexpb/timeseriesv2.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,14 @@ func PreallocWriteRequestV2FromPool() *PreallocWriteRequestV2 {
130130
return writeRequestPoolV2.Get().(*PreallocWriteRequestV2)
131131
}
132132

133+
// Reset implements proto.Message and preserves the capacity of the Symbols slice.
134+
func (p *PreallocWriteRequestV2) Reset() {
135+
savedSymbols := p.Symbols
136+
p.WriteRequestV2.Reset()
137+
p.Symbols = savedSymbols[:0]
138+
p.data = nil
139+
}
140+
133141
// PreallocTimeseriesV2SliceFromPool retrieves a slice of PreallocTimeseriesV2 from a sync.Pool.
134142
// ReuseSliceV2 should be called once done.
135143
func PreallocTimeseriesV2SliceFromPool() []PreallocTimeseriesV2 {

pkg/cortexpb/timeseriesv2_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,71 @@ func TestReuseWriteRequestV2(t *testing.T) {
122122
})
123123
}
124124

125+
func TestPreallocWriteRequestV2Reset(t *testing.T) {
126+
t.Run("preserves Symbols capacity", func(t *testing.T) {
127+
const symbolsCap = 100
128+
req := &PreallocWriteRequestV2{
129+
WriteRequestV2: WriteRequestV2{
130+
Symbols: make([]string, 0, symbolsCap),
131+
},
132+
}
133+
req.Symbols = append(req.Symbols, "a", "b", "c")
134+
135+
ptrBefore := &req.Symbols[:cap(req.Symbols)][0]
136+
137+
req.Reset()
138+
139+
assert.Equal(t, 0, len(req.Symbols), "Symbols length should be 0 after Reset")
140+
assert.Equal(t, symbolsCap, cap(req.Symbols), "Symbols capacity should be preserved after Reset")
141+
assert.Same(t, ptrBefore, &req.Symbols[:cap(req.Symbols)][0], "Symbols backing array should be reused after Reset")
142+
})
143+
144+
t.Run("clears non-Symbols WriteRequestV2 fields", func(t *testing.T) {
145+
b := []byte{1, 2, 3}
146+
req := &PreallocWriteRequestV2{
147+
WriteRequestV2: WriteRequestV2{
148+
Source: RULE,
149+
SkipLabelNameValidation: true,
150+
Timeseries: []PreallocTimeseriesV2{{TimeSeriesV2: &TimeSeriesV2{}}},
151+
},
152+
data: &b,
153+
}
154+
155+
req.Reset()
156+
157+
assert.Equal(t, SourceEnum(0), req.Source)
158+
assert.False(t, req.SkipLabelNameValidation)
159+
assert.Nil(t, req.Timeseries)
160+
assert.Nil(t, req.data)
161+
})
162+
163+
t.Run("Unmarshal after Reset reuses Symbols backing array", func(t *testing.T) {
164+
const symbolsCount = 50
165+
symbols := make([]string, symbolsCount)
166+
for i := range symbols {
167+
symbols[i] = fmt.Sprintf("symbol_%04d", i)
168+
}
169+
data, err := (&WriteRequestV2{Symbols: symbols}).Marshal()
170+
require.NoError(t, err)
171+
172+
req := &PreallocWriteRequestV2{
173+
WriteRequestV2: WriteRequestV2{
174+
Symbols: make([]string, 0, symbolsCount*2),
175+
},
176+
}
177+
178+
// Simulate Reset in util.ParseProtoReader()
179+
req.Reset()
180+
ptrAfterReset := &req.Symbols[:cap(req.Symbols)][0]
181+
capAfterReset := cap(req.Symbols)
182+
183+
require.NoError(t, req.WriteRequestV2.Unmarshal(data))
184+
assert.Equal(t, symbolsCount, len(req.Symbols))
185+
assert.Equal(t, capAfterReset, cap(req.Symbols), "capacity should not change: Unmarshal reused the existing backing array")
186+
assert.Same(t, ptrAfterReset, &req.Symbols[:cap(req.Symbols)][0], "backing array pointer should be identical: no new allocation occurred")
187+
})
188+
}
189+
125190
func BenchmarkMarshallWriteRequestV2(b *testing.B) {
126191
ts := PreallocTimeseriesV2SliceFromPool()
127192

pkg/util/push/push.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,14 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool,
9494
return
9595
}
9696

97-
var req cortexpb.PreallocWriteRequestV2
97+
req := cortexpb.PreallocWriteRequestV2FromPool()
9898
// v1 request is put back into the pool by the Distributor.
9999
defer func() {
100-
cortexpb.ReuseWriteRequestV2(&req)
100+
cortexpb.ReuseWriteRequestV2(req)
101101
req.Free()
102102
}()
103103

104-
err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
104+
err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, req, util.RawSnappy)
105105
if err != nil {
106106
level.Error(logger).Log("err", err.Error())
107107
http.Error(w, err.Error(), http.StatusBadRequest)
@@ -113,7 +113,7 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool,
113113
req.Source = cortexpb.API
114114
}
115115

116-
v1Req, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID))
116+
v1Req, err := convertV2RequestToV1(req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID))
117117
if err != nil {
118118
level.Error(logger).Log("err", err.Error())
119119
http.Error(w, err.Error(), http.StatusBadRequest)

0 commit comments

Comments
 (0)