Skip to content

Commit 3bfa348

Browse files
authored
pkg/settings/limits: fixes and enhancements (#1482)
1 parent 2c539ba commit 3bfa348

13 files changed

Lines changed: 293 additions & 104 deletions

File tree

pkg/settings/cresettings/defaults.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"WorkflowLimit": "200",
33
"WorkflowRegistrationQueueLimit": "20",
4+
"WorkflowExecutionConcurrencyLimit": "50",
45
"HTTPTrigger": {
56
"AuthRateLimit": "100rps:-1"
67
},
@@ -9,7 +10,7 @@
910
"ZeroBalancePruningTimeout": "24h0m0s"
1011
},
1112
"PerOwner": {
12-
"ExecutionConcurrencyLimit": "50"
13+
"WorkflowExecutionConcurrencyLimit": "50"
1314
},
1415
"PerWorkflow": {
1516
"TriggerLimit": "10",

pkg/settings/cresettings/defaults.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
WorkflowLimit = '200'
22
WorkflowRegistrationQueueLimit = '20'
3+
WorkflowExecutionConcurrencyLimit = '50'
34

45
[HTTPTrigger]
56
AuthRateLimit = '100rps:-1'
@@ -9,7 +10,7 @@ WorkflowDeploymentRateLimit = 'every1m0s:1'
910
ZeroBalancePruningTimeout = '24h0m0s'
1011

1112
[PerOwner]
12-
ExecutionConcurrencyLimit = '50'
13+
WorkflowExecutionConcurrencyLimit = '50'
1314

1415
[PerWorkflow]
1516
TriggerLimit = '10'

pkg/settings/cresettings/settings.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,20 @@ import (
1212
)
1313

1414
func init() {
15-
err := InitConfig(&Config)
15+
err := InitConfig(&Default)
1616
if err != nil {
1717
log.Fatalf("failed to initialize keys: %v", err)
1818
}
19+
Config = Default
1920
}
2021

2122
// Deprecated: use Default
22-
var Config = Default
23+
var Config Schema
2324

2425
var Default = Schema{
25-
WorkflowLimit: Int(200),
26-
WorkflowRegistrationQueueLimit: Int(20),
26+
WorkflowLimit: Int(200),
27+
WorkflowRegistrationQueueLimit: Int(20),
28+
WorkflowExecutionConcurrencyLimit: Int(50),
2729

2830
HTTPTrigger: httpTriggerGlobal{
2931
AuthRateLimit: Rate(100, -1), //TODO
@@ -34,7 +36,7 @@ var Default = Schema{
3436
ZeroBalancePruningTimeout: Duration(24 * time.Hour),
3537
},
3638
PerOwner: Owners{
37-
ExecutionConcurrencyLimit: Int(50),
39+
WorkflowExecutionConcurrencyLimit: Int(50),
3840
},
3941
PerWorkflow: Workflows{
4042
TriggerLimit: Int(10),
@@ -97,8 +99,9 @@ var Default = Schema{
9799
}
98100

99101
type Schema struct {
100-
WorkflowLimit Setting[int] `unit:"{workflow}"`
101-
WorkflowRegistrationQueueLimit Setting[int] `unit:"{workflow}"`
102+
WorkflowLimit Setting[int] `unit:"{workflow}"`
103+
WorkflowRegistrationQueueLimit Setting[int] `unit:"{workflow}"`
104+
WorkflowExecutionConcurrencyLimit Setting[int] `unit:"{workflow}"`
102105

103106
HTTPTrigger httpTriggerGlobal
104107

@@ -112,7 +115,7 @@ type Orgs struct {
112115
}
113116

114117
type Owners struct {
115-
ExecutionConcurrencyLimit Setting[int] `unit:"{workflow}"`
118+
WorkflowExecutionConcurrencyLimit Setting[int] `unit:"{workflow}"`
116119
}
117120

118121
type Workflows struct {

pkg/settings/cresettings/settings_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ var (
2727
defaultsTOML string
2828
)
2929

30-
func TestConfig(t *testing.T) {
30+
func TestDefault(t *testing.T) {
3131
t.Run("json", func(t *testing.T) {
32-
b, err := json.MarshalIndent(Config, "", "\t")
32+
b, err := json.MarshalIndent(Default, "", "\t")
3333
if err != nil {
3434
log.Fatal(err)
3535
}
@@ -41,7 +41,7 @@ func TestConfig(t *testing.T) {
4141
})
4242

4343
t.Run("toml", func(t *testing.T) {
44-
b, err := toml.Marshal(Config)
44+
b, err := toml.Marshal(Default)
4545
if err != nil {
4646
log.Fatal(err)
4747
}

pkg/settings/limits/factory.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (f Factory) NewRateLimiter(rate settings.Setting[config.Rate]) (RateLimiter
3434
// - rate.*.limit - float gauge
3535
// - rate.*.burst - int gauge
3636
// - rate.*.usage - int counter
37-
// opt: .denied
37+
// - rate.*.denied - int histogram
3838
func (f Factory) MakeRateLimiter(rate settings.Setting[config.Rate]) (RateLimiter, error) {
3939
if rate.Scope == settings.ScopeGlobal {
4040
return f.globalRateLimiter(rate)
@@ -68,7 +68,7 @@ func NewResourcePoolLimiter[N Number](f Factory, limit settings.Setting[N]) (Res
6868
// - resource.*.limit - gauge
6969
// - resource.*.usage - gauge
7070
// - resource.*.amount - histogram
71-
// opt: .denied
71+
// - queue.*.denied - histogram
7272
func MakeResourcePoolLimiter[N Number](f Factory, limit settings.Setting[N]) (ResourcePoolLimiter[N], error) {
7373
if limit.Scope == settings.ScopeGlobal {
7474
return newGlobalResourcePoolLimiter(f, limit)
@@ -87,9 +87,9 @@ func MakeBoundLimiter[N Number](f Factory, bound settings.Setting[N]) (BoundLimi
8787

8888
// MakeQueueLimiter returns a QueueLimiter for the given limit and configured by the Factory.
8989
// If Meter is set, the following metrics will be emitted
90-
// - queue.*limit - gauge
91-
// - queue.*usage - gauge
92-
// opt: .denied
90+
// - queue.*.limit - int gauge
91+
// - queue.*.usage - int gauge
92+
// - queue.*.denied - int histogram
9393
func MakeQueueLimiter[T any](f Factory, limit settings.Setting[int]) (QueueLimiter[T], error) {
9494
if limit.Scope == settings.ScopeGlobal {
9595
return newUnscopedQueue[T](f, limit)

pkg/settings/limits/limits.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// - [ResourceLimiter]/[ResourcePoolLimiter]: for allocating resources
44
// - [TimeLimiter]: for enforcing timeouts
55
// - [BoundLimiter]: for enforcing bounds
6+
// - [QueueLimiter]: for limited capacity queues
67
//
78
// Every limit requires a default value. Additional features like Otel metrics and dynamic updates are available by
89
// using the [settings.Setting] variants.

pkg/settings/limits/metrics_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ func (e *exporter) lastResourceFirstScopeMetric(t *testing.T) metrics {
131131
type metrics []metricdata.Metrics
132132

133133
func redactHistogramVals[N int64 | float64](t *testing.T, ms metrics, name string) {
134+
t.Helper()
134135
i := ms.forName(name)
135136
if i < 0 {
136137
t.Fatalf("failed to find histogram named: %s", name)

pkg/settings/limits/queue.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@ type QueueLimiter[T any] interface {
3131
// NewQueueLimiter returns a simple static QueueLimiter.
3232
func NewQueueLimiter[T any](capacity int) QueueLimiter[T] {
3333
q := &queue[T]{
34-
cap: capacity,
35-
recordLimit: func(context.Context, int) {},
36-
recordUsage: func(context.Context, int) {},
34+
cap: capacity,
35+
recordLimit: func(context.Context, int) {},
36+
recordUsage: func(context.Context, int) {},
37+
recordDenied: func(context.Context, int) {},
3738
}
3839
q.cond.L = &q.mu
3940
return q
@@ -46,8 +47,9 @@ type queue[T any] struct {
4647
scope settings.Scope // optional
4748
tenant string // optional
4849

49-
recordLimit func(context.Context, int)
50-
recordUsage func(context.Context, int)
50+
recordLimit func(context.Context, int)
51+
recordUsage func(context.Context, int)
52+
recordDenied func(context.Context, int)
5153

5254
cap int
5355
list list.List
@@ -81,8 +83,9 @@ func (q *queue[T]) Put(ctx context.Context, t T) error {
8183
q.mu.Lock()
8284
defer q.mu.Unlock()
8385

84-
if q.list.Len() >= q.cap {
85-
return ErrorQueueFull{Key: q.key, Scope: q.scope, Tenant: q.tenant, Limit: q.cap}
86+
if c := q.cap; q.list.Len() >= c {
87+
q.recordDenied(ctx, 1)
88+
return ErrorQueueFull{Key: q.key, Scope: q.scope, Tenant: q.tenant, Limit: c}
8689
}
8790
q.list.PushBack(t)
8891
q.cond.Signal()
@@ -158,12 +161,23 @@ func newUnscopedQueue[T any](f Factory, limit settings.Setting[int]) (QueueLimit
158161
if err != nil {
159162
return nil, err
160163
}
164+
deniedHist, err := f.Meter.Int64Histogram("queue."+limit.Key+".denied", metric.WithUnit(limit.Unit))
165+
if err != nil {
166+
return nil, err
167+
}
161168
q.recordLimit = func(ctx context.Context, i int) {
162169
limitGauge.Record(ctx, int64(i))
163170
}
164171
q.recordUsage = func(ctx context.Context, i int) {
165172
usageGauge.Record(ctx, int64(i))
166173
}
174+
q.recordDenied = func(ctx context.Context, i int) {
175+
deniedHist.Record(ctx, int64(i))
176+
}
177+
} else {
178+
q.recordLimit = func(context.Context, int) {}
179+
q.recordUsage = func(context.Context, int) {}
180+
q.recordDenied = func(context.Context, int) {}
167181
}
168182

169183
if f.Settings != nil {
@@ -174,7 +188,6 @@ func newUnscopedQueue[T any](f Factory, limit settings.Setting[int]) (QueueLimit
174188
q.subFn = func(ctx context.Context) (updates <-chan settings.Update[int], cancelSub func()) {
175189
return limit.Subscribe(ctx, registry)
176190
}
177-
178191
}
179192
}
180193

@@ -207,6 +220,10 @@ func newScopedQueue[T any](f Factory, limit settings.Setting[int]) (QueueLimiter
207220
if err != nil {
208221
return nil, err
209222
}
223+
q.deniedHist, err = f.Meter.Int64Histogram("queue."+limit.Key+".denied", metric.WithUnit(limit.Unit))
224+
if err != nil {
225+
return nil, err
226+
}
210227
}
211228

212229
if f.Settings != nil {
@@ -237,8 +254,9 @@ type scopedQueue[T any] struct {
237254

238255
key string // optional
239256

240-
limitGauge metric.Int64Gauge // optional
241-
usageGauge metric.Int64Gauge // optional
257+
limitGauge metric.Int64Gauge // optional
258+
usageGauge metric.Int64Gauge // optional
259+
deniedHist metric.Int64Histogram // optional
242260

243261
// opt: reap after period of non-use
244262
queues sync.Map // map[string]*queue
@@ -344,6 +362,11 @@ func (s *scopedQueue[T]) newQueue(tenant string) *queue[T] {
344362
s.usageGauge.Record(ctx, int64(c), withScope(ctx, s.scope))
345363
}
346364
},
365+
recordDenied: func(ctx context.Context, c int) {
366+
if s.deniedHist != nil {
367+
s.deniedHist.Record(ctx, int64(c), withScope(ctx, s.scope))
368+
}
369+
},
347370
}
348371
q.cond.L = &q.mu
349372
q.updater.recordLimit = q.setCap

pkg/settings/limits/queue_test.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestMakeQueueLimiter(t *testing.T) {
9595
limit := settings.Int(2)
9696
limit.Key = "foo.bar"
9797
limit.Scope = tt.scope
98-
limit.Unit = "task"
98+
limit.Unit = "{task}"
9999
ql, err := MakeQueueLimiter[string](f, limit)
100100
require.NoError(t, err)
101101
t.Cleanup(func() { assert.NoError(t, ql.Close()) })
@@ -111,24 +111,40 @@ func TestMakeQueueLimiter(t *testing.T) {
111111
require.Equal(t, "foo", v)
112112

113113
ms := mc.lastResourceFirstScopeMetric(t)
114+
redactHistogramVals[int64](t, ms, "queue.foo.bar.denied")
114115
attrs := attribute.NewSet(kvsFromScope(ctx, tt.scope)...)
115116
require.Equal(t, metrics{
116117
metricdata.Metrics{
117118
Name: "queue.foo.bar.limit",
118-
Unit: "task",
119+
Unit: "{task}",
119120
Data: metricdata.Gauge[int64]{
120121
DataPoints: []metricdata.DataPoint[int64]{
121122
{Attributes: attrs, Value: 2}},
122123
},
123124
},
124125
metricdata.Metrics{
125126
Name: "queue.foo.bar.usage",
126-
Unit: "task",
127+
Unit: "{task}",
127128
Data: metricdata.Gauge[int64]{
128129
DataPoints: []metricdata.DataPoint[int64]{
129130
{Attributes: attrs, Value: 1}},
130131
},
131132
},
133+
{
134+
Name: "queue.foo.bar.denied",
135+
Unit: "{task}",
136+
Data: metricdata.Histogram[int64]{
137+
DataPoints: []metricdata.HistogramDataPoint[int64]{
138+
{
139+
Attributes: attrs,
140+
Count: 1,
141+
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
142+
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
143+
},
144+
},
145+
Temporality: metricdata.CumulativeTemporality,
146+
},
147+
},
132148
}, ms)
133149
})
134150
}

0 commit comments

Comments
 (0)