Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion components/router/pkg/store/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,66 @@ import (
"sync"
)

type DataStore struct {
mu sync.RWMutex
data map[string]Indicator // Key: name, Value: Indicator

// Keep track of the min/max values for each metric, 0-index is min and 1-index is max.
// They will be used in score plugins.
RunningQueueSize [2]float64
WaitingQueueSize [2]float64
KVCacheUsage [2]float64
}

func (d *DataStore) Get(ctx context.Context, name string) (Indicator, error) {
d.mu.RLock()
defer d.mu.RUnlock()

metrics, exists := d.data[name]
if !exists {
return Indicator{}, fmt.Errorf("metrics for datastore %s not found", name)
}
return metrics, nil
}

// TODO: we should not iterate all the data which may lead to performance issue.
func (d *DataStore) FilterIterate(ctx context.Context, fn func(context.Context, Indicator) bool) (names []string) {
d.mu.RLock()
defer d.mu.RUnlock()

for name, indicator := range d.data {
if fn(ctx, indicator) {
names = append(names, name)
}
}
return

}

// TODO: return multi candidates to avoid hotspot with multi instances.
func (d *DataStore) ScoreIterate(ctx context.Context, fn func(context.Context, Indicator) float32) string {
d.mu.RLock()
defer d.mu.RUnlock()

var highestScore float32
var candidate string

for name, indicator := range d.data {
score := fn(ctx, indicator)
// Iterate the d.data is already in random order, so we can just pick the first one with the highest score.
if score > highestScore {
highestScore = score
candidate = name
}
}
return candidate
}

var _ Store = &MemoryStore{}

type MemoryStore struct {
mu sync.RWMutex
data map[string]*DataStore // Key: modelName, Value: *podWrapperStore
data map[string]*DataStore // Key: modelName
}

func NewMemoryStore() *MemoryStore {
Expand Down
56 changes: 0 additions & 56 deletions components/router/pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package store

import (
"context"
"fmt"
"sync"
)

type Store interface {
Expand All @@ -31,57 +29,3 @@ type Store interface {
// Should only used for testing.
Len() int32
}

type DataStore struct {
mu sync.RWMutex
data map[string]Indicator // Key: name, Value: Indicator

// Keep track of the min/max values for each metric, 0-index is min and 1-index is max.
// They will be used in score plugins.
RunningQueueSize [2]float64
WaitingQueueSize [2]float64
KVCacheUsage [2]float64
}

func (d *DataStore) Get(ctx context.Context, name string) (Indicator, error) {
d.mu.RLock()
defer d.mu.RUnlock()

metrics, exists := d.data[name]
if !exists {
return Indicator{}, fmt.Errorf("metrics for datastore %s not found", name)
}
return metrics, nil
}

// TODO: we should not iterate all the dataStore which may lead to performance issue.
func (d *DataStore) FilterIterate(ctx context.Context, fn func(context.Context, Indicator) bool) (names []string) {
d.mu.RLock()
defer d.mu.RUnlock()

for name, indicator := range d.data {
if fn(ctx, indicator) {
names = append(names, name)
}
}
return

}

func (d *DataStore) ScoreIterate(ctx context.Context, fn func(context.Context, Indicator) float32) string {
d.mu.RLock()
defer d.mu.RUnlock()

var highestScore float32
var candidate string

for name, indicator := range d.data {
score := fn(ctx, indicator)
// Iterate the d.data is already in random order, so we can just pick the first one with the highest score.
if score > highestScore {
highestScore = score
candidate = name
}
}
return candidate
}
58 changes: 15 additions & 43 deletions docs/proposals/376-metric-aggregagor/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Proposal-376: Gateway Metric Aggregator
# Proposal-376: Metric Aggregator

<!--
This is the title of your Proposal. Keep it short, simple, and descriptive. A good
Expand Down Expand Up @@ -85,9 +85,8 @@ List the specific goals of the Proposal. What is it trying to achieve? How will
know that this has succeeded?
-->

- A simple implementation with least-latency scheduling algorithm
- Extensible with different consumers in the cluster, like the Lora autoscaler or the ai gateway
- Metrics visualization support, like Grafana
- A simple implementation with latency aware dispatching algorithm
- Extensible with different consumers in the cluster, like the HPA autoscaler or the ai gateway

### Non-Goals

Expand All @@ -99,6 +98,7 @@ and make progress.
- Different scheduling algorithm implementations in ai gateway, like prefix-cache aware
- LoRA aware scheduling implementation, will be left to another KEP
- Performance consideration in big clusters should be left to the Beta level
- How HPA consumers the metrics should be left to another KEP.

## Proposal

Expand Down Expand Up @@ -175,41 +175,16 @@ The overall flow looks like:
Let's break down the flow into several steps:

- Step 1: we'll collect the metrics from the inference workloads in metrics aggregator.
- Step 2: the aggregator will parse the metrics and store them in the redis, this is for HA consideration and cache sharing. Once the instance is down, we can still retrieve the metrics from redis. And if we have multiple instances, we can share the metrics with each other via redis. Considering Envoy AI gateway already uses Redis for limit rating, we'll reuse the Redis here.
- Step 3 & 4: Traffic comes, the gateway plugin (we'll call it router later) will retrieve the metrics from Redis and make routing decisions based on different algorithms, like queue size aware scheduling.
- Step 2: the aggregator will parse the metrics and store them in the disk memory. We'll use the disk memory at first for quick starting and fast access. We may upgrade the architecture in the future, see Drawbacks section for more details.
- Step 3 & 4: Traffic comes, the gateway plugin (we'll call it router later) will retrieve the metrics from the storage and make routing decisions based on different algorithms, like latency aware scheduling.
- Step 5: The router will send the request to the selected instance, and the instance will return the result to the router, return to the user finally.

### Additional components introduced:

- Metrics Aggregator (MA): MA is working as the controller plane to sync the metrics, this is also one of the reason why we want to decouple it from the router, which working as a data plane. MA has several components:
- A Pod controller to manage the Pod lifecycle, for example, once a Pod is ready, it will add it to the internal store, and each Pod will fork a background goroutine to sync the metrics continuously, 50ms interval by default. Once the Pod is deleted, the goroutine will be stopped and removed from the store.
- A internal store to parse the metric results, and store it in the backend storage, like Redis.
- Redis: a Redis instance is necessary for the metrics storage and sharing, we can use the existing Redis instance in the cluster, or deploy a new one if necessary. We should have storage interface to support different backends in the future.
- Router: a new router or [DynamicLoadBalancingBackend](https://github.com/envoyproxy/ai-gateway/blob/be2b479b04bc7a219b0c8239143bfbabebdcd615/filterapi/filterconfig.go#L199-L208) specifically in Envoy AI gateway to pick the best-fit Pod endpoints. However, we may block by the upstream issue [here](https://github.com/envoyproxy/ai-gateway/issues/604), we'll work with the Envoy AI Gateway team to resolve it ASAP. Maybe the final design will impact our implementation a bit but not much I think.

### Data Structure

The data structure could be varied based on the metrics we want to collect, let's take the queue size as an example:

Because redis is a kv store, we'll use the ZSET to store the results, `LeastLatency::ModelName` as the key, Pod name as the member and the (runningQueueSize * 0.3 + waitingQueueSize * 0.7) as the score, the factor of waitingQueueSize is higher because metric is a delayed indicator. RunningQueueSize and WaitingQueueSize are two metrics most of the inference engines support.

We'll also have another key to record the update timestamp. For example, a Pod named "default/fake-pod" with the score = 0.5, the set commands look like:

```bash
# set the update timestamp
SET default/fake-pod "2025-05-12T06:16:27Z"

# set the score
ZADD LeastLatency::ModelName 0.5 default/fake-pod
```

When collecting, we'll update the timestamp and score together. Setting the top 5 is enough for us to help reduce the storage pressure since it's a memory-based database. We don't use the expiration key here is just because most of the time, the metrics should be updated at a regular interval.

When retrieving, we'll first query the ZSET to get the top 5 records, and iterate them one by one to verify that `currentTimestamp - recordTimestamp < 5s`, if not, skipping to the next one. This is to avoid outdated metrics. Once picked the exact endpoint, we'll reset the score with waitingQueueSize + 1 to avoid hotspot issues, especially when metrics update is blocked by some reasons.

If all metrics are outdated, we'll fallback to the default service.

Note: the algorithm is not the final one, we'll have more discussions with the community to find the best one.
- Metrics Aggregator (MA): MA is working as the controller plane to sync the metrics, however, it works as a data plane as well at this moment, we will revisit this once we graduate to Beta/GA. MA has several components:
- A Pod controller to manage the Pod lifecycle, for example, once a Pod is ready, it will add it to the internal store, and each Pod will fork a background goroutine to sync the metrics continuously, 100ms interval by default. Once the Pod is deleted, the goroutine will be stopped and removed from the store.
- A internal store to parse the metric results, and store it in the backend storage, right now we only support disk memory, but the interface is defined and we can extend it later.
- Router: A LLM request dispatcher to route the requests to specific Pods based on the metrics reading from the MA. However, we may block by the upstream issue [here](https://github.com/envoyproxy/ai-gateway/issues/604), we'll work with the Envoy AI Gateway team to resolve it ASAP. Maybe the final design will impact our implementation a bit but not much I think.

### Test Plan

Expand Down Expand Up @@ -308,10 +283,8 @@ milestones with these graduation criteria:

Beta:

- Other storages rather than KV store who supports only key-value pairs which might be not enough for more complex scenarios, like the prefix-cache aware scenario.
- HA support, once the metrics aggregator is down, the system should still work.
- No performance issues in big clusters, we may use daemonsets to report metrics.
- Once the picked Pod is down after the routing decision, router will fallback to the default service. Fallback mode is already supported in Envoy AI gateway.
- No performance issues in big clusters, especially we have multiple router instances there.
- The data plane and the control plane should be decoupled.

## Implementation History

Expand All @@ -327,12 +300,11 @@ Major milestones might include:
-->

- 2025-05-08: Proposal initialized and submitted for review
- 2025-05-19: Proposal polished with the new architecture design and flow diagram.

## Drawbacks

<!--
Why should this Proposal _not_ be implemented?
-->
The biggest drawback of this proposal is that the router is now coupled with the metrics aggregator because of the shared memory store. In the future, we should optimize this either by using a database or hammer the metric report logics to the inference engines directly, which works as a event driven architecture, then the router instances will watch the events to build a local memory, together with the metrics aggregator.

## Alternatives

Expand All @@ -342,4 +314,4 @@ not need to be as detailed as the proposal, but should include enough
information to express the idea and why it was not acceptable.
-->

- When collecting metrics from the inference workloads, `PUSH` mode will put less pressure on the gateway side, or the gateway will have iterate all the Pods which obviously will lead to performance issues. We didn't pick the approach because it will either add additional load to the inference workload and introduces more complexity to the system. The current approach will fork as much goroutines as the number of inference workloads to sync the metrics in parallel, this is feasible because goroutine is lightweight. Once the metrics aggregator becomes the bottleneck, we can consider to use `PUSH` mode at node level.
- When collecting metrics from the inference workloads, `PUSH` mode will put less pressure on the gateway side, or the gateway will have iterate all the Pods which obviously will lead to performance issues. We didn't pick the approach because it will either add additional load to the inference workload and introduces more complexity to the system. The current approach will fork as much goroutines as the number of inference workloads to sync the metrics in parallel, this is feasible because goroutine is lightweight. Once the metrics aggregator becomes the bottleneck, we can consider to use `PUSH` mode at node level.
Binary file modified docs/proposals/376-metric-aggregagor/flow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/proposals/376-metric-aggregagor/proposal.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
title: Gateway Metric Aggregator
title: Metric Aggregator
proposal-number: 376
authors:
- kerthcet
Expand Down
Loading