Skip to content

Commit a77c7b2

Browse files
Running req extractor (#75)
* Add data layer interfaces. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * add NotificationSource in datalayer for asynchronous event pipeline. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Remove unneeded starttime. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Add running request counter extractor. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Align with model interface changes. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Add plugin factory. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Add factory to source plugin. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Use datastore without handle. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Restore handle file. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Simplify notification source. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Fix doc comments. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Simplify the extractor. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Register the plugin. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Remove lock. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Adding the factory and register the extractor in runner. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Revert notification source changes. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Small fix. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Fix comments. Signed-off-by: Mohammad Nassar <mohammad.nassar@ibm.com> * Small fix. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Add unittest file. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Fix lint error. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Update after datalayer change. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Fix datastore lint error. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Change factory name. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Revert datastore changes. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Fix conflicts. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Fix conflicts. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Fix conflicts. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> --------- Signed-off-by: Mohammad <mohammad.nassar@ibm.com> Signed-off-by: Mohammad-nassar10 <79787844+Mohammad-nassar10@users.noreply.github.com> Signed-off-by: Mohammad Nassar <mohammad.nassar@ibm.com>
1 parent 9b8f4de commit a77c7b2

3 files changed

Lines changed: 363 additions & 0 deletions

File tree

cmd/runner/runner.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,12 @@ import (
3838
logutil "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/logging"
3939
"github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/profiling"
4040
"github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/tracing"
41+
"github.com/llm-d/llm-d-inference-payload-processor/pkg/datastore"
4142
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework"
4243
"github.com/llm-d/llm-d-inference-payload-processor/pkg/metrics"
4344
"github.com/llm-d/llm-d-inference-payload-processor/pkg/plugins/basemodelextractor"
4445
"github.com/llm-d/llm-d-inference-payload-processor/pkg/plugins/bodyfieldtoheader"
46+
inflightrequests "github.com/llm-d/llm-d-inference-payload-processor/pkg/plugins/datalayer/inflightrequests"
4547
notificationsource "github.com/llm-d/llm-d-inference-payload-processor/pkg/plugins/datalayer/notificationsource"
4648
runserver "github.com/llm-d/llm-d-inference-payload-processor/pkg/server"
4749
"github.com/llm-d/llm-d-inference-payload-processor/version"
@@ -172,6 +174,8 @@ func (r *Runner) Run(ctx context.Context) error {
172174

173175
handle := framework.NewHandle(ctx, mgr)
174176

177+
ds := datastore.NewStore()
178+
175179
// Register factories for all known in-tree plugins
176180
r.registerInTreePlugins()
177181

@@ -220,6 +224,18 @@ func (r *Runner) Run(ctx context.Context) error {
220224
}
221225
}
222226

227+
// Wire the inflight-requests data pipeline: extractor → notification source.
228+
// TODO: config-driven path does not yet support NotificationSource + extractors.
229+
notifSrc, err := notificationsource.New("default", inflightrequests.NewInflightRequestsExtractor(ds))
230+
if err != nil {
231+
setupLog.Error(err, "failed to create notification source")
232+
return err
233+
}
234+
if err := notifSrc.Start(ctx); err != nil {
235+
setupLog.Error(err, "failed to start notification source")
236+
return err
237+
}
238+
223239
// Setup ExtProc Server Runner.
224240
serverRunner := &runserver.ExtProcServerRunner{
225241
GrpcPort: opts.GRPCPort,
@@ -253,6 +269,7 @@ func (r *Runner) Run(ctx context.Context) error {
253269
func (r *Runner) registerInTreePlugins() {
254270
framework.Register(bodyfieldtoheader.BodyFieldToHeaderPluginType, bodyfieldtoheader.BodyFieldToHeaderPluginFactory)
255271
framework.Register(basemodelextractor.BaseModelToHeaderPluginType, basemodelextractor.BaseModelToHeaderPluginFactory)
272+
framework.Register(inflightrequests.PluginType, inflightrequests.ExtractorFactory)
256273
framework.Register(notificationsource.PluginType, notificationsource.Factory)
257274
}
258275

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
Copyright 2026 The llm-d Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package inflightrequests
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
23+
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework"
24+
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/datalayer"
25+
)
26+
27+
const (
28+
// PluginType is the identifier used when registering this extractor.
29+
PluginType = "inflight-requests-extractor"
30+
31+
// InflightRequestsAttributeKey is the attribute key written to each model's attribute store.
32+
InflightRequestsAttributeKey = "inflight-requests"
33+
)
34+
35+
// compile-time interface assertion
36+
var _ datalayer.Extractor = &InflightRequestsExtractor{}
37+
38+
// ExtractorFactory creates a InflightRequestsExtractor with a nil DataStore.
39+
// The factory path is limited: the DataStore is not available via framework.Handle,
40+
// so the created extractor cannot write to the store. Use NewInflightRequestsExtractor
41+
// directly when constructing for production use.
42+
func ExtractorFactory(name string, _ json.RawMessage, _ framework.Handle) (framework.Plugin, error) {
43+
return NewInflightRequestsExtractor(nil).WithName(name), nil
44+
}
45+
46+
// InflightRequestsCount holds in-flight request and token counts for one model.
47+
type InflightRequestsCount struct {
48+
Requests int64
49+
Tokens int64
50+
}
51+
52+
func (r InflightRequestsCount) Clone() datalayer.Cloneable { return r }
53+
54+
// InflightRequestsExtractor tracks in-flight request counts and token sums per model.
55+
// It writes InflightRequestsCount to each model's InflightRequestsAttributeKey attribute.
56+
//
57+
// Extract is assumed to be called from a single goroutine (the NotificationSource event loop).
58+
// If parallel dispatch is introduced, add a sync.Mutex around counters and the DataStore write.
59+
//
60+
// TODO: counters leak if a request fails without a corresponding ResponseEventType (e.g. connection
61+
// drop, upstream error, context cancellation). The call site should fire a
62+
// synthetic ResponseEventType in its error/EOF path to keep counts accurate.
63+
type InflightRequestsExtractor struct {
64+
typedName framework.TypedName
65+
dataStore datalayer.DataStore
66+
counters map[string]InflightRequestsCount
67+
}
68+
69+
func NewInflightRequestsExtractor(ds datalayer.DataStore) *InflightRequestsExtractor {
70+
return &InflightRequestsExtractor{
71+
typedName: framework.TypedName{Type: PluginType, Name: PluginType},
72+
dataStore: ds,
73+
counters: make(map[string]InflightRequestsCount),
74+
}
75+
}
76+
77+
func (e *InflightRequestsExtractor) TypedName() framework.TypedName { return e.typedName }
78+
79+
// WithName sets the instance name, used by the factory when the plugin is configured by name.
80+
func (e *InflightRequestsExtractor) WithName(name string) *InflightRequestsExtractor {
81+
e.typedName.Name = name
82+
return e
83+
}
84+
85+
func (e *InflightRequestsExtractor) Extract(_ context.Context, events []datalayer.Event) error {
86+
updated := map[string]InflightRequestsCount{}
87+
88+
for _, ev := range events {
89+
switch ev.Type {
90+
case datalayer.RequestEventType:
91+
p, ok := ev.Payload.(datalayer.RequestPayload)
92+
if !ok {
93+
continue
94+
}
95+
model, _ := p.Request.Body["model"].(string)
96+
if model == "" {
97+
continue
98+
}
99+
maxTokens, _ := p.Request.Body["max_tokens"].(float64)
100+
c := e.counters[model]
101+
c.Requests++
102+
c.Tokens += int64(maxTokens)
103+
e.counters[model] = c
104+
updated[model] = c
105+
106+
case datalayer.ResponseEventType:
107+
p, ok := ev.Payload.(datalayer.ResponsePayload)
108+
if !ok {
109+
continue
110+
}
111+
model, _ := p.Request.Body["model"].(string)
112+
if model == "" {
113+
continue
114+
}
115+
maxTokens, _ := p.Request.Body["max_tokens"].(float64)
116+
c := e.counters[model]
117+
floorDecrement(&c.Requests, 1)
118+
floorDecrement(&c.Tokens, int64(maxTokens))
119+
e.counters[model] = c
120+
updated[model] = c
121+
}
122+
}
123+
124+
for model, c := range updated {
125+
e.dataStore.GetOrCreateModel(model).GetAttributes().Put(InflightRequestsAttributeKey, c)
126+
}
127+
return nil
128+
}
129+
130+
// floorDecrement decrements v by delta, flooring at zero.
131+
func floorDecrement(v *int64, delta int64) {
132+
*v -= delta
133+
if *v < 0 {
134+
*v = 0
135+
}
136+
}
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
Copyright 2026 The llm-d Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package inflightrequests
18+
19+
import (
20+
"context"
21+
"sync"
22+
"testing"
23+
"time"
24+
25+
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework"
26+
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/datalayer"
27+
)
28+
29+
// fakeDataStore is an in-memory DataStore for tests.
30+
type fakeDataStore struct {
31+
mu sync.Mutex
32+
models map[string]datalayer.Model
33+
}
34+
35+
func newFakeDataStore() *fakeDataStore {
36+
return &fakeDataStore{models: make(map[string]datalayer.Model)}
37+
}
38+
39+
func (f *fakeDataStore) GetOrCreateModel(name string) datalayer.Model {
40+
f.mu.Lock()
41+
defer f.mu.Unlock()
42+
if m, ok := f.models[name]; ok {
43+
return m
44+
}
45+
m := datalayer.NewModel(name)
46+
f.models[name] = m
47+
return m
48+
}
49+
50+
// makeRequestEvent creates a RequestEventType event with model and max_tokens.
51+
func makeRequestEvent(model string, maxTokens float64) datalayer.Event {
52+
req := framework.NewInferenceRequest()
53+
req.Body["model"] = model
54+
req.Body["max_tokens"] = maxTokens
55+
return datalayer.Event{
56+
Type: datalayer.RequestEventType,
57+
Payload: datalayer.RequestPayload{Request: req},
58+
}
59+
}
60+
61+
// makeResponseEvent creates a ResponseEventType event with model, duration, and max_tokens.
62+
// maxTokens mirrors the original request's max_tokens so the extractor can decrement correctly.
63+
func makeResponseEvent(model string, durationMs int, maxTokens float64) datalayer.Event {
64+
req := framework.NewInferenceRequest()
65+
req.Body["model"] = model
66+
req.Body["max_tokens"] = maxTokens
67+
return datalayer.Event{
68+
Type: datalayer.ResponseEventType,
69+
Payload: datalayer.ResponsePayload{
70+
Request: req,
71+
Response: framework.NewInferenceResponse(),
72+
Duration: time.Duration(durationMs) * time.Millisecond,
73+
},
74+
}
75+
}
76+
77+
// getInflightRequests asserts the inflight-requests attribute exists for model and returns it.
78+
func getInflightRequests(t testing.TB, ds *fakeDataStore, model string) InflightRequestsCount {
79+
t.Helper()
80+
val, ok := ds.GetOrCreateModel(model).GetAttributes().Get(InflightRequestsAttributeKey)
81+
if !ok {
82+
t.Fatalf("expected %q attribute for model %q", InflightRequestsAttributeKey, model)
83+
}
84+
rc, ok := val.(InflightRequestsCount)
85+
if !ok {
86+
t.Fatalf("expected InflightRequestsCount for model %q", model)
87+
}
88+
return rc
89+
}
90+
91+
func newInflightRequestsTest(t *testing.T) (*InflightRequestsExtractor, *fakeDataStore) {
92+
t.Helper()
93+
ds := newFakeDataStore()
94+
return NewInflightRequestsExtractor(ds), ds
95+
}
96+
97+
func TestRequestIncrementsCounter(t *testing.T) {
98+
ext, ds := newInflightRequestsTest(t)
99+
100+
batch := []datalayer.Event{makeRequestEvent("m1", 100)}
101+
if err := ext.Extract(context.Background(), batch); err != nil {
102+
t.Fatalf("Extract failed: %v", err)
103+
}
104+
105+
rc := getInflightRequests(t, ds, "m1")
106+
if rc.Requests != 1 {
107+
t.Errorf("expected Requests=1, got %d", rc.Requests)
108+
}
109+
if rc.Tokens != 100 {
110+
t.Errorf("expected Tokens=100, got %d", rc.Tokens)
111+
}
112+
}
113+
114+
func TestResponseDecrementsCounter(t *testing.T) {
115+
ext, ds := newInflightRequestsTest(t)
116+
117+
// Response carries the original request's max_tokens so the extractor can decrement correctly.
118+
batch := []datalayer.Event{
119+
makeRequestEvent("m1", 100),
120+
makeResponseEvent("m1", 50, 100),
121+
}
122+
if err := ext.Extract(context.Background(), batch); err != nil {
123+
t.Fatalf("Extract failed: %v", err)
124+
}
125+
126+
rc := getInflightRequests(t, ds, "m1")
127+
if rc.Requests != 0 {
128+
t.Errorf("expected Requests=0, got %d", rc.Requests)
129+
}
130+
if rc.Tokens != 0 {
131+
t.Errorf("expected Tokens=0, got %d", rc.Tokens)
132+
}
133+
}
134+
135+
func TestCounterFloorsAtZero(t *testing.T) {
136+
ext, ds := newInflightRequestsTest(t)
137+
138+
// Response with no prior request — both counters must floor at zero.
139+
batch := []datalayer.Event{makeResponseEvent("m1", 50, 100)}
140+
if err := ext.Extract(context.Background(), batch); err != nil {
141+
t.Fatalf("Extract failed: %v", err)
142+
}
143+
144+
rc := getInflightRequests(t, ds, "m1")
145+
if rc.Requests != 0 {
146+
t.Errorf("expected Requests=0, got %d", rc.Requests)
147+
}
148+
if rc.Tokens != 0 {
149+
t.Errorf("expected Tokens=0, got %d", rc.Tokens)
150+
}
151+
}
152+
153+
func TestInflightRequestsMultipleModels(t *testing.T) {
154+
ext, ds := newInflightRequestsTest(t)
155+
156+
batch := []datalayer.Event{
157+
makeRequestEvent("m1", 10),
158+
makeRequestEvent("m2", 20),
159+
}
160+
if err := ext.Extract(context.Background(), batch); err != nil {
161+
t.Fatalf("Extract failed: %v", err)
162+
}
163+
164+
rc1 := getInflightRequests(t, ds, "m1")
165+
if rc1.Requests != 1 || rc1.Tokens != 10 {
166+
t.Errorf("m1: expected {Requests:1, Tokens:10}, got %+v", rc1)
167+
}
168+
169+
rc2 := getInflightRequests(t, ds, "m2")
170+
if rc2.Requests != 1 || rc2.Tokens != 20 {
171+
t.Errorf("m2: expected {Requests:1, Tokens:20}, got %+v", rc2)
172+
}
173+
}
174+
175+
func TestInflightRequestsUnknownEventTypeIgnored(t *testing.T) {
176+
ext, ds := newInflightRequestsTest(t)
177+
178+
batch := []datalayer.Event{{Type: "unknown"}}
179+
if err := ext.Extract(context.Background(), batch); err != nil {
180+
t.Fatalf("Extract failed: %v", err)
181+
}
182+
183+
ds.mu.Lock()
184+
modelCount := len(ds.models)
185+
ds.mu.Unlock()
186+
if modelCount != 0 {
187+
t.Errorf("expected no models in datastore, got %d", modelCount)
188+
}
189+
}
190+
191+
func TestInflightRequestsMissingModelFieldIgnored(t *testing.T) {
192+
ext, ds := newInflightRequestsTest(t)
193+
194+
// Payload without a "model" key — no counter should be updated.
195+
req := framework.NewInferenceRequest()
196+
req.Body["max_tokens"] = float64(50)
197+
batch := []datalayer.Event{
198+
{Type: datalayer.RequestEventType, Payload: datalayer.RequestPayload{Request: req}},
199+
}
200+
if err := ext.Extract(context.Background(), batch); err != nil {
201+
t.Fatalf("Extract failed: %v", err)
202+
}
203+
204+
ds.mu.Lock()
205+
modelCount := len(ds.models)
206+
ds.mu.Unlock()
207+
if modelCount != 0 {
208+
t.Errorf("expected no models in datastore, got %d", modelCount)
209+
}
210+
}

0 commit comments

Comments
 (0)