Skip to content

Commit a3f4a5e

Browse files
yhl25vigith
andauthored
feat: Integration of Distributed Throttler With Numaflow Data-plane (numaproj#2904)
Signed-off-by: Vigith Maurice <vigith@gmail.com> Signed-off-by: Yashash H L <yashashhl25@gmail.com> Co-authored-by: Vigith Maurice <vigith@gmail.com>
1 parent 48cfbe9 commit a3f4a5e

58 files changed

Lines changed: 4856 additions & 1314 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

api/json-schema/schema.json

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21066,6 +21066,10 @@
2106621066
},
2106721067
"io.numaproj.numaflow.v1alpha1.MonoVertexLimits": {
2106821068
"properties": {
21069+
"rateLimit": {
21070+
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit",
21071+
"description": "RateLimit is used to define the rate limit for the mono vertex."
21072+
},
2106921073
"readBatchSize": {
2107021074
"description": "Read batch size from the source.",
2107121075
"format": "int64",
@@ -21439,6 +21443,10 @@
2143921443
"format": "int64",
2144021444
"type": "integer"
2144121445
},
21446+
"rateLimit": {
21447+
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit",
21448+
"description": "RateLimit is used to define the rate limit for all the vertices in the pipeline, it could be overridden by the vertex's limit settings."
21449+
},
2144221450
"readBatchSize": {
2144321451
"description": "Read batch size for all the vertices in the pipeline, can be overridden by the vertex's limit settings.",
2144421452
"format": "int64",
@@ -21707,6 +21715,57 @@
2170721715
],
2170821716
"type": "object"
2170921717
},
21718+
"io.numaproj.numaflow.v1alpha1.RateLimit": {
21719+
"properties": {
21720+
"max": {
21721+
"description": "Max is the maximum TPS that this vertex can process give a distributed `Store` is configured. Otherwise, it will be the maximum TPS for a single replica.",
21722+
"format": "int64",
21723+
"type": "integer"
21724+
},
21725+
"min": {
21726+
"description": "Minimum TPS allowed during initial bootup. This value will be distributed across all the replicas if a distributed `Store` is configured. Otherwise, it will be the minimum TPS for a single replica.",
21727+
"format": "int64",
21728+
"type": "integer"
21729+
},
21730+
"rampUpDuration": {
21731+
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
21732+
"description": "RampUpDuration is the duration to reach the maximum TPS from the minimum TPS. The min unit of ramp up is 1 in 1 second."
21733+
},
21734+
"store": {
21735+
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimiterStore",
21736+
"description": "Store is used to define the Distributed Store for the rate limiting. We also support in-memory store if no store is configured. This means that every replica will have its own rate limit and the actual TPS will be the sum of all the replicas."
21737+
}
21738+
},
21739+
"type": "object"
21740+
},
21741+
"io.numaproj.numaflow.v1alpha1.RateLimiterInMemoryStore": {
21742+
"type": "object"
21743+
},
21744+
"io.numaproj.numaflow.v1alpha1.RateLimiterRedisStore": {
21745+
"properties": {
21746+
"url": {
21747+
"description": "URL of the persistent store to write the rate limit data.",
21748+
"type": "string"
21749+
}
21750+
},
21751+
"required": [
21752+
"url"
21753+
],
21754+
"type": "object"
21755+
},
21756+
"io.numaproj.numaflow.v1alpha1.RateLimiterStore": {
21757+
"properties": {
21758+
"inMemoryStore": {
21759+
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimiterInMemoryStore",
21760+
"description": "InMemoryStore is used to define the in-memory store for the rate limit."
21761+
},
21762+
"redisStore": {
21763+
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimiterRedisStore",
21764+
"description": "RedisStore is used to define the redis store for the rate limit."
21765+
}
21766+
},
21767+
"type": "object"
21768+
},
2171021769
"io.numaproj.numaflow.v1alpha1.RetryStrategy": {
2171121770
"description": "The RetryStrategy struct defines the configuration for handling operation retries in case of failures. It incorporates an Exponential BackOff strategy to control retry timing and specifies the actions to take upon failure.",
2171221771
"properties": {
@@ -22606,6 +22665,10 @@
2260622665
"format": "int64",
2260722666
"type": "integer"
2260822667
},
22668+
"rateLimit": {
22669+
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit",
22670+
"description": "RateLimit is used to define the rate limit for the vertex, it overrides the settings from pipeline limits."
22671+
},
2260922672
"readBatchSize": {
2261022673
"description": "Read batch size from the source or buffer. It overrides the settings from pipeline limits.",
2261122674
"format": "int64",

api/openapi-spec/swagger.json

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21062,6 +21062,10 @@
2106221062
"io.numaproj.numaflow.v1alpha1.MonoVertexLimits": {
2106321063
"type": "object",
2106421064
"properties": {
21065+
"rateLimit": {
21066+
"description": "RateLimit is used to define the rate limit for the mono vertex.",
21067+
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit"
21068+
},
2106521069
"readBatchSize": {
2106621070
"description": "Read batch size from the source.",
2106721071
"type": "integer",
@@ -21426,6 +21430,10 @@
2142621430
"type": "integer",
2142721431
"format": "int64"
2142821432
},
21433+
"rateLimit": {
21434+
"description": "RateLimit is used to define the rate limit for all the vertices in the pipeline, it could be overridden by the vertex's limit settings.",
21435+
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit"
21436+
},
2142921437
"readBatchSize": {
2143021438
"description": "Read batch size for all the vertices in the pipeline, can be overridden by the vertex's limit settings.",
2143121439
"type": "integer",
@@ -21693,6 +21701,57 @@
2169321701
}
2169421702
}
2169521703
},
21704+
"io.numaproj.numaflow.v1alpha1.RateLimit": {
21705+
"type": "object",
21706+
"properties": {
21707+
"max": {
21708+
"description": "Max is the maximum TPS that this vertex can process give a distributed `Store` is configured. Otherwise, it will be the maximum TPS for a single replica.",
21709+
"type": "integer",
21710+
"format": "int64"
21711+
},
21712+
"min": {
21713+
"description": "Minimum TPS allowed during initial bootup. This value will be distributed across all the replicas if a distributed `Store` is configured. Otherwise, it will be the minimum TPS for a single replica.",
21714+
"type": "integer",
21715+
"format": "int64"
21716+
},
21717+
"rampUpDuration": {
21718+
"description": "RampUpDuration is the duration to reach the maximum TPS from the minimum TPS. The min unit of ramp up is 1 in 1 second.",
21719+
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
21720+
},
21721+
"store": {
21722+
"description": "Store is used to define the Distributed Store for the rate limiting. We also support in-memory store if no store is configured. This means that every replica will have its own rate limit and the actual TPS will be the sum of all the replicas.",
21723+
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimiterStore"
21724+
}
21725+
}
21726+
},
21727+
"io.numaproj.numaflow.v1alpha1.RateLimiterInMemoryStore": {
21728+
"type": "object"
21729+
},
21730+
"io.numaproj.numaflow.v1alpha1.RateLimiterRedisStore": {
21731+
"type": "object",
21732+
"required": [
21733+
"url"
21734+
],
21735+
"properties": {
21736+
"url": {
21737+
"description": "URL of the persistent store to write the rate limit data.",
21738+
"type": "string"
21739+
}
21740+
}
21741+
},
21742+
"io.numaproj.numaflow.v1alpha1.RateLimiterStore": {
21743+
"type": "object",
21744+
"properties": {
21745+
"inMemoryStore": {
21746+
"description": "InMemoryStore is used to define the in-memory store for the rate limit.",
21747+
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimiterInMemoryStore"
21748+
},
21749+
"redisStore": {
21750+
"description": "RedisStore is used to define the redis store for the rate limit.",
21751+
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimiterRedisStore"
21752+
}
21753+
}
21754+
},
2169621755
"io.numaproj.numaflow.v1alpha1.RetryStrategy": {
2169721756
"description": "The RetryStrategy struct defines the configuration for handling operation retries in case of failures. It incorporates an Exponential BackOff strategy to control retry timing and specifies the actions to take upon failure.",
2169821757
"type": "object",
@@ -22584,6 +22643,10 @@
2258422643
"type": "integer",
2258522644
"format": "int64"
2258622645
},
22646+
"rateLimit": {
22647+
"description": "RateLimit is used to define the rate limit for the vertex, it overrides the settings from pipeline limits.",
22648+
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit"
22649+
},
2258722650
"readBatchSize": {
2258822651
"description": "Read batch size from the source or buffer. It overrides the settings from pipeline limits.",
2258922652
"type": "integer",

config/base/crds/full/numaflow.numaproj.io_monovertices.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2546,6 +2546,32 @@ spec:
25462546
type: object
25472547
limits:
25482548
properties:
2549+
rateLimit:
2550+
properties:
2551+
max:
2552+
default: 10
2553+
format: int64
2554+
type: integer
2555+
min:
2556+
default: 1
2557+
format: int64
2558+
type: integer
2559+
rampUpDuration:
2560+
default: 1s
2561+
type: string
2562+
store:
2563+
properties:
2564+
inMemoryStore:
2565+
type: object
2566+
redisStore:
2567+
properties:
2568+
url:
2569+
type: string
2570+
required:
2571+
- url
2572+
type: object
2573+
type: object
2574+
type: object
25492575
readBatchSize:
25502576
default: 500
25512577
format: int64

config/base/crds/full/numaflow.numaproj.io_pipelines.yaml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,32 @@ spec:
152152
default: 80
153153
format: int32
154154
type: integer
155+
rateLimit:
156+
properties:
157+
max:
158+
default: 10
159+
format: int64
160+
type: integer
161+
min:
162+
default: 1
163+
format: int64
164+
type: integer
165+
rampUpDuration:
166+
default: 1s
167+
type: string
168+
store:
169+
properties:
170+
inMemoryStore:
171+
type: object
172+
redisStore:
173+
properties:
174+
url:
175+
type: string
176+
required:
177+
- url
178+
type: object
179+
type: object
180+
type: object
155181
readBatchSize:
156182
default: 500
157183
format: int64
@@ -7005,6 +7031,32 @@ spec:
70057031
bufferUsageLimit:
70067032
format: int32
70077033
type: integer
7034+
rateLimit:
7035+
properties:
7036+
max:
7037+
default: 10
7038+
format: int64
7039+
type: integer
7040+
min:
7041+
default: 1
7042+
format: int64
7043+
type: integer
7044+
rampUpDuration:
7045+
default: 1s
7046+
type: string
7047+
store:
7048+
properties:
7049+
inMemoryStore:
7050+
type: object
7051+
redisStore:
7052+
properties:
7053+
url:
7054+
type: string
7055+
required:
7056+
- url
7057+
type: object
7058+
type: object
7059+
type: object
70087060
readBatchSize:
70097061
format: int64
70107062
type: integer

config/base/crds/full/numaflow.numaproj.io_servingpipelines.yaml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,32 @@ spec:
131131
default: 80
132132
format: int32
133133
type: integer
134+
rateLimit:
135+
properties:
136+
max:
137+
default: 10
138+
format: int64
139+
type: integer
140+
min:
141+
default: 1
142+
format: int64
143+
type: integer
144+
rampUpDuration:
145+
default: 1s
146+
type: string
147+
store:
148+
properties:
149+
inMemoryStore:
150+
type: object
151+
redisStore:
152+
properties:
153+
url:
154+
type: string
155+
required:
156+
- url
157+
type: object
158+
type: object
159+
type: object
134160
readBatchSize:
135161
default: 500
136162
format: int64
@@ -6984,6 +7010,32 @@ spec:
69847010
bufferUsageLimit:
69857011
format: int32
69867012
type: integer
7013+
rateLimit:
7014+
properties:
7015+
max:
7016+
default: 10
7017+
format: int64
7018+
type: integer
7019+
min:
7020+
default: 1
7021+
format: int64
7022+
type: integer
7023+
rampUpDuration:
7024+
default: 1s
7025+
type: string
7026+
store:
7027+
properties:
7028+
inMemoryStore:
7029+
type: object
7030+
redisStore:
7031+
properties:
7032+
url:
7033+
type: string
7034+
required:
7035+
- url
7036+
type: object
7037+
type: object
7038+
type: object
69877039
readBatchSize:
69887040
format: int64
69897041
type: integer

0 commit comments

Comments
 (0)