diff --git a/components/router/pkg/store/mem.go b/components/router/pkg/store/mem.go index e9113309..611e1716 100644 --- a/components/router/pkg/store/mem.go +++ b/components/router/pkg/store/mem.go @@ -22,11 +22,66 @@ import ( "sync" ) +type DataStore struct { + mu sync.RWMutex + data map[string]Indicator // Key: name, Value: Indicator + + // Keep track of the min/max values for each metric, 0-index is min and 1-index is max. + // They will be used in score plugins. + RunningQueueSize [2]float64 + WaitingQueueSize [2]float64 + KVCacheUsage [2]float64 +} + +func (d *DataStore) Get(ctx context.Context, name string) (Indicator, error) { + d.mu.RLock() + defer d.mu.RUnlock() + + metrics, exists := d.data[name] + if !exists { + return Indicator{}, fmt.Errorf("metrics for datastore %s not found", name) + } + return metrics, nil +} + +// TODO: we should not iterate all the data which may lead to performance issue. +func (d *DataStore) FilterIterate(ctx context.Context, fn func(context.Context, Indicator) bool) (names []string) { + d.mu.RLock() + defer d.mu.RUnlock() + + for name, indicator := range d.data { + if fn(ctx, indicator) { + names = append(names, name) + } + } + return + +} + +// TODO: return multi candidates to avoid hotspot with multi instances. +func (d *DataStore) ScoreIterate(ctx context.Context, fn func(context.Context, Indicator) float32) string { + d.mu.RLock() + defer d.mu.RUnlock() + + var highestScore float32 + var candidate string + + for name, indicator := range d.data { + score := fn(ctx, indicator) + // Iterate the d.data is already in random order, so we can just pick the first one with the highest score. + if score > highestScore { + highestScore = score + candidate = name + } + } + return candidate +} + var _ Store = &MemoryStore{} type MemoryStore struct { mu sync.RWMutex - data map[string]*DataStore // Key: modelName, Value: *podWrapperStore + data map[string]*DataStore // Key: modelName } func NewMemoryStore() *MemoryStore { diff --git a/components/router/pkg/store/store.go b/components/router/pkg/store/store.go index 15216365..49a7456e 100644 --- a/components/router/pkg/store/store.go +++ b/components/router/pkg/store/store.go @@ -18,8 +18,6 @@ package store import ( "context" - "fmt" - "sync" ) type Store interface { @@ -31,57 +29,3 @@ type Store interface { // Should only used for testing. Len() int32 } - -type DataStore struct { - mu sync.RWMutex - data map[string]Indicator // Key: name, Value: Indicator - - // Keep track of the min/max values for each metric, 0-index is min and 1-index is max. - // They will be used in score plugins. - RunningQueueSize [2]float64 - WaitingQueueSize [2]float64 - KVCacheUsage [2]float64 -} - -func (d *DataStore) Get(ctx context.Context, name string) (Indicator, error) { - d.mu.RLock() - defer d.mu.RUnlock() - - metrics, exists := d.data[name] - if !exists { - return Indicator{}, fmt.Errorf("metrics for datastore %s not found", name) - } - return metrics, nil -} - -// TODO: we should not iterate all the dataStore which may lead to performance issue. -func (d *DataStore) FilterIterate(ctx context.Context, fn func(context.Context, Indicator) bool) (names []string) { - d.mu.RLock() - defer d.mu.RUnlock() - - for name, indicator := range d.data { - if fn(ctx, indicator) { - names = append(names, name) - } - } - return - -} - -func (d *DataStore) ScoreIterate(ctx context.Context, fn func(context.Context, Indicator) float32) string { - d.mu.RLock() - defer d.mu.RUnlock() - - var highestScore float32 - var candidate string - - for name, indicator := range d.data { - score := fn(ctx, indicator) - // Iterate the d.data is already in random order, so we can just pick the first one with the highest score. - if score > highestScore { - highestScore = score - candidate = name - } - } - return candidate -} diff --git a/docs/proposals/376-metric-aggregagor/README.md b/docs/proposals/376-metric-aggregagor/README.md index e7fcd86e..1ba56932 100644 --- a/docs/proposals/376-metric-aggregagor/README.md +++ b/docs/proposals/376-metric-aggregagor/README.md @@ -1,4 +1,4 @@ -# Proposal-376: Gateway Metric Aggregator +# Proposal-376: Metric Aggregator -- A simple implementation with least-latency scheduling algorithm -- Extensible with different consumers in the cluster, like the Lora autoscaler or the ai gateway -- Metrics visualization support, like Grafana +- A simple implementation with latency aware dispatching algorithm +- Extensible with different consumers in the cluster, like the HPA autoscaler or the ai gateway ### Non-Goals @@ -99,6 +98,7 @@ and make progress. - Different scheduling algorithm implementations in ai gateway, like prefix-cache aware - LoRA aware scheduling implementation, will be left to another KEP - Performance consideration in big clusters should be left to the Beta level +- How HPA consumers the metrics should be left to another KEP. ## Proposal @@ -175,41 +175,16 @@ The overall flow looks like: Let's break down the flow into several steps: - Step 1: we'll collect the metrics from the inference workloads in metrics aggregator. -- Step 2: the aggregator will parse the metrics and store them in the redis, this is for HA consideration and cache sharing. Once the instance is down, we can still retrieve the metrics from redis. And if we have multiple instances, we can share the metrics with each other via redis. Considering Envoy AI gateway already uses Redis for limit rating, we'll reuse the Redis here. -- Step 3 & 4: Traffic comes, the gateway plugin (we'll call it router later) will retrieve the metrics from Redis and make routing decisions based on different algorithms, like queue size aware scheduling. +- Step 2: the aggregator will parse the metrics and store them in the disk memory. We'll use the disk memory at first for quick starting and fast access. We may upgrade the architecture in the future, see Drawbacks section for more details. +- Step 3 & 4: Traffic comes, the gateway plugin (we'll call it router later) will retrieve the metrics from the storage and make routing decisions based on different algorithms, like latency aware scheduling. - Step 5: The router will send the request to the selected instance, and the instance will return the result to the router, return to the user finally. ### Additional components introduced: -- Metrics Aggregator (MA): MA is working as the controller plane to sync the metrics, this is also one of the reason why we want to decouple it from the router, which working as a data plane. MA has several components: - - A Pod controller to manage the Pod lifecycle, for example, once a Pod is ready, it will add it to the internal store, and each Pod will fork a background goroutine to sync the metrics continuously, 50ms interval by default. Once the Pod is deleted, the goroutine will be stopped and removed from the store. - - A internal store to parse the metric results, and store it in the backend storage, like Redis. -- Redis: a Redis instance is necessary for the metrics storage and sharing, we can use the existing Redis instance in the cluster, or deploy a new one if necessary. We should have storage interface to support different backends in the future. -- Router: a new router or [DynamicLoadBalancingBackend](https://github.com/envoyproxy/ai-gateway/blob/be2b479b04bc7a219b0c8239143bfbabebdcd615/filterapi/filterconfig.go#L199-L208) specifically in Envoy AI gateway to pick the best-fit Pod endpoints. However, we may block by the upstream issue [here](https://github.com/envoyproxy/ai-gateway/issues/604), we'll work with the Envoy AI Gateway team to resolve it ASAP. Maybe the final design will impact our implementation a bit but not much I think. - -### Data Structure - -The data structure could be varied based on the metrics we want to collect, let's take the queue size as an example: - -Because redis is a kv store, we'll use the ZSET to store the results, `LeastLatency::ModelName` as the key, Pod name as the member and the (runningQueueSize * 0.3 + waitingQueueSize * 0.7) as the score, the factor of waitingQueueSize is higher because metric is a delayed indicator. RunningQueueSize and WaitingQueueSize are two metrics most of the inference engines support. - -We'll also have another key to record the update timestamp. For example, a Pod named "default/fake-pod" with the score = 0.5, the set commands look like: - -```bash -# set the update timestamp -SET default/fake-pod "2025-05-12T06:16:27Z" - -# set the score -ZADD LeastLatency::ModelName 0.5 default/fake-pod -``` - -When collecting, we'll update the timestamp and score together. Setting the top 5 is enough for us to help reduce the storage pressure since it's a memory-based database. We don't use the expiration key here is just because most of the time, the metrics should be updated at a regular interval. - -When retrieving, we'll first query the ZSET to get the top 5 records, and iterate them one by one to verify that `currentTimestamp - recordTimestamp < 5s`, if not, skipping to the next one. This is to avoid outdated metrics. Once picked the exact endpoint, we'll reset the score with waitingQueueSize + 1 to avoid hotspot issues, especially when metrics update is blocked by some reasons. - -If all metrics are outdated, we'll fallback to the default service. - -Note: the algorithm is not the final one, we'll have more discussions with the community to find the best one. +- Metrics Aggregator (MA): MA is working as the controller plane to sync the metrics, however, it works as a data plane as well at this moment, we will revisit this once we graduate to Beta/GA. MA has several components: + - A Pod controller to manage the Pod lifecycle, for example, once a Pod is ready, it will add it to the internal store, and each Pod will fork a background goroutine to sync the metrics continuously, 100ms interval by default. Once the Pod is deleted, the goroutine will be stopped and removed from the store. + - A internal store to parse the metric results, and store it in the backend storage, right now we only support disk memory, but the interface is defined and we can extend it later. +- Router: A LLM request dispatcher to route the requests to specific Pods based on the metrics reading from the MA. However, we may block by the upstream issue [here](https://github.com/envoyproxy/ai-gateway/issues/604), we'll work with the Envoy AI Gateway team to resolve it ASAP. Maybe the final design will impact our implementation a bit but not much I think. ### Test Plan @@ -308,10 +283,8 @@ milestones with these graduation criteria: Beta: -- Other storages rather than KV store who supports only key-value pairs which might be not enough for more complex scenarios, like the prefix-cache aware scenario. -- HA support, once the metrics aggregator is down, the system should still work. -- No performance issues in big clusters, we may use daemonsets to report metrics. -- Once the picked Pod is down after the routing decision, router will fallback to the default service. Fallback mode is already supported in Envoy AI gateway. +- No performance issues in big clusters, especially we have multiple router instances there. +- The data plane and the control plane should be decoupled. ## Implementation History @@ -327,12 +300,11 @@ Major milestones might include: --> - 2025-05-08: Proposal initialized and submitted for review +- 2025-05-19: Proposal polished with the new architecture design and flow diagram. ## Drawbacks - +The biggest drawback of this proposal is that the router is now coupled with the metrics aggregator because of the shared memory store. In the future, we should optimize this either by using a database or hammer the metric report logics to the inference engines directly, which works as a event driven architecture, then the router instances will watch the events to build a local memory, together with the metrics aggregator. ## Alternatives @@ -342,4 +314,4 @@ not need to be as detailed as the proposal, but should include enough information to express the idea and why it was not acceptable. --> -- When collecting metrics from the inference workloads, `PUSH` mode will put less pressure on the gateway side, or the gateway will have iterate all the Pods which obviously will lead to performance issues. We didn't pick the approach because it will either add additional load to the inference workload and introduces more complexity to the system. The current approach will fork as much goroutines as the number of inference workloads to sync the metrics in parallel, this is feasible because goroutine is lightweight. Once the metrics aggregator becomes the bottleneck, we can consider to use `PUSH` mode at node level. \ No newline at end of file +- When collecting metrics from the inference workloads, `PUSH` mode will put less pressure on the gateway side, or the gateway will have iterate all the Pods which obviously will lead to performance issues. We didn't pick the approach because it will either add additional load to the inference workload and introduces more complexity to the system. The current approach will fork as much goroutines as the number of inference workloads to sync the metrics in parallel, this is feasible because goroutine is lightweight. Once the metrics aggregator becomes the bottleneck, we can consider to use `PUSH` mode at node level. diff --git a/docs/proposals/376-metric-aggregagor/flow.png b/docs/proposals/376-metric-aggregagor/flow.png index b0b9e83d..83cb0f23 100644 Binary files a/docs/proposals/376-metric-aggregagor/flow.png and b/docs/proposals/376-metric-aggregagor/flow.png differ diff --git a/docs/proposals/376-metric-aggregagor/proposal.yaml b/docs/proposals/376-metric-aggregagor/proposal.yaml index 8bdb5c79..f3c75f2c 100644 --- a/docs/proposals/376-metric-aggregagor/proposal.yaml +++ b/docs/proposals/376-metric-aggregagor/proposal.yaml @@ -1,4 +1,4 @@ -title: Gateway Metric Aggregator +title: Metric Aggregator proposal-number: 376 authors: - kerthcet