Skip to content

Commit 3c4ecbe

Browse files
committed
feat: add ChunkProcessor interface for streaming chunk processing
Add ChunkProcessor interface that plugins declaring BodyChunked must implement. The framework validates this at startup and pre-computes the list of ChunkProcessors per profile. When streaming (no buffering), each response body chunk is passed through all ChunkProcessors before being acked to Envoy, enabling in-flight chunk transformation. Depends on llm-d#169 Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
1 parent 7ebc2d8 commit 3c4ecbe

8 files changed

Lines changed: 281 additions & 85 deletions

File tree

docs/proposals/044-plugin-body-mode-capabilities/README.md

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,27 @@ Plugins that don't implement `ResponseBodyRequirement` get a warning:
7373
INFO Response plugin does not declare ResponseBodyRequirement, defaulting to BodyFull profile=default plugin=legacy-plugin/legacy
7474
```
7575

76+
### ChunkProcessor Interface
77+
78+
Plugins that declare `BodyChunked` must also implement `ChunkProcessor`:
79+
80+
```go
81+
type ChunkProcessor interface {
82+
ProcessResponseChunk(ctx context.Context, cycleState *CycleState, chunk []byte, isFinal bool) ([]byte, error)
83+
}
84+
```
85+
86+
- Validated at startup: declaring `BodyChunked` without implementing `ChunkProcessor` is a fatal error
87+
- `ProcessResponseChunk` receives each chunk as it arrives and returns (possibly modified) bytes
88+
- The returned bytes are sent to Envoy as the ack — the ChunkProcessor can transform the data in-flight
89+
- `ChunkProcessors` are pre-computed per profile and stored on the `Profile` struct
90+
7691
## Implementation Phases
7792

7893
| Phase | Scope | Status |
7994
|-------|-------|--------|
80-
| 1 — Framework | `ResponseBodyMode`, `ResponseBodyRequirement` interface, pre-computation, conditional buffering, tests | **This PR** |
81-
| 2 — ChunkProcessor | `ChunkProcessor` interface for `BodyChunked` plugins, per-chunk dispatch in server loop | Next PR |
95+
| 1 — Framework | `ResponseBodyMode`, `ResponseBodyRequirement` interface, pre-computation, conditional buffering, tests | **Done** (PR #169) |
96+
| 2 — ChunkProcessor | `ChunkProcessor` interface for `BodyChunked` plugins, per-chunk dispatch in server loop | **This PR** |
8297
| 3 — Plugin declarations | Existing plugins implement `ResponseBodyRequirement` (ODH repo) | Separate PR |
8398

8499
## Migration

pkg/config/loader/configloader.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,10 @@ func LoadConfiguration(configBytes []byte, handle plugin.Handle, processor datas
9696
return nil, err
9797
}
9898

99-
computeResponseBuffering(profiles, logger)
99+
if err = computeResponseBuffering(profiles, logger); err != nil {
100+
logger.Error(err, "failed to compute response buffering requirements")
101+
return nil, err
102+
}
100103

101104
return &config.Config{
102105
ProfilePicker: profilePicker,
@@ -309,13 +312,15 @@ func buildPostProcessors(rawConfig *configapi.PluginRefList, handle plugin.Handl
309312
return postProcessors, nil
310313
}
311314

312-
// computeResponseBuffering pre-computes NeedsResponseBuffering for each profile based on the
313-
// ResponseBodyMode declared by each response plugin. If any response plugin returns BodyFull
314-
// or doesn't implement ResponseBodyRequirement, the profile needs buffering.
315-
func computeResponseBuffering(profiles map[string]*requesthandling.Profile, logger logr.Logger) {
315+
// computeResponseBuffering pre-computes NeedsResponseBuffering and ChunkProcessors for each
316+
// profile based on the ResponseBodyMode declared by each response plugin. If any response
317+
// plugin returns BodyFull or doesn't implement ResponseBodyRequirement, the profile needs
318+
// buffering. Returns an error if a plugin declares BodyChunked but doesn't implement ChunkProcessor.
319+
func computeResponseBuffering(profiles map[string]*requesthandling.Profile, logger logr.Logger) error {
316320
for name, profile := range profiles {
317321
needsBuffering := false
318322
var bufferingPlugins []string
323+
var chunkProcessors []requesthandling.ChunkProcessor
319324

320325
for _, rp := range profile.ResponsePlugins {
321326
mode := requesthandling.BodyFull
@@ -325,18 +330,30 @@ func computeResponseBuffering(profiles map[string]*requesthandling.Profile, logg
325330
logger.Info("Response plugin does not declare ResponseBodyRequirement, defaulting to BodyFull",
326331
"profile", name, "plugin", rp.TypedName())
327332
}
328-
if mode == requesthandling.BodyFull {
333+
334+
switch mode {
335+
case requesthandling.BodyFull:
329336
needsBuffering = true
330337
bufferingPlugins = append(bufferingPlugins, rp.TypedName().Name)
338+
case requesthandling.BodyChunked:
339+
cp, ok := rp.(requesthandling.ChunkProcessor)
340+
if !ok {
341+
return fmt.Errorf("plugin %q in profile %q declares BodyChunked but does not implement ChunkProcessor",
342+
rp.TypedName().Name, name)
343+
}
344+
chunkProcessors = append(chunkProcessors, cp)
331345
}
332346
}
333347

334348
profile.NeedsResponseBuffering = needsBuffering
349+
profile.ChunkProcessors = chunkProcessors
335350
logger.Info("Profile response buffering computed",
336351
"profile", name,
337352
"needsResponseBuffering", needsBuffering,
338-
"bufferingPlugins", bufferingPlugins)
353+
"bufferingPlugins", bufferingPlugins,
354+
"chunkProcessors", len(chunkProcessors))
339355
}
356+
return nil
340357
}
341358

342359
// buildModelSelector iterates all built profiles and, for each model-selector plugin found in

pkg/config/loader/response_buffering_test.go

Lines changed: 75 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package loader
1818

1919
import (
2020
"context"
21+
"strings"
2122
"testing"
2223

2324
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -52,6 +53,22 @@ var (
5253
_ requesthandling.ResponseBodyRequirement = &fakeResponsePlugin{}
5354
)
5455

56+
// fakeChunkPlugin declares BodyChunked and implements ChunkProcessor.
57+
type fakeChunkPlugin struct {
58+
fakeResponsePlugin
59+
}
60+
61+
func (p *fakeChunkPlugin) ProcessResponseChunk(_ context.Context, _ *plugin.CycleState, chunk []byte, _ bool) ([]byte, error) {
62+
return chunk, nil
63+
}
64+
65+
var _ requesthandling.ChunkProcessor = &fakeChunkPlugin{}
66+
67+
// badChunkedPlugin declares BodyChunked but does NOT implement ChunkProcessor.
68+
type badChunkedPlugin struct {
69+
fakeResponsePlugin
70+
}
71+
5572
type legacyResponsePlugin struct {
5673
name string
5774
}
@@ -71,10 +88,13 @@ func modePtr(m requesthandling.ResponseBodyMode) *requesthandling.ResponseBodyMo
7188
func TestComputeResponseBuffering(t *testing.T) {
7289
logger := log.FromContext(logutil.NewTestLoggerIntoContext(context.Background()))
7390

91+
chunkedMode := modePtr(requesthandling.BodyChunked)
92+
7493
tests := []struct {
75-
name string
76-
plugins []requesthandling.ResponseProcessor
77-
wantBuffering bool
94+
name string
95+
plugins []requesthandling.ResponseProcessor
96+
wantBuffering bool
97+
wantChunkCount int
7898
}{
7999
{
80100
name: "no response plugins",
@@ -90,11 +110,12 @@ func TestComputeResponseBuffering(t *testing.T) {
90110
wantBuffering: false,
91111
},
92112
{
93-
name: "all BodyChunked",
113+
name: "BodyChunked with ChunkProcessor",
94114
plugins: []requesthandling.ResponseProcessor{
95-
&fakeResponsePlugin{name: "a", mode: modePtr(requesthandling.BodyChunked)},
115+
&fakeChunkPlugin{fakeResponsePlugin{name: "chunker", mode: chunkedMode}},
96116
},
97-
wantBuffering: false,
117+
wantBuffering: false,
118+
wantChunkCount: 1,
98119
},
99120
{
100121
name: "one BodyFull forces buffering",
@@ -114,18 +135,20 @@ func TestComputeResponseBuffering(t *testing.T) {
114135
{
115136
name: "mixed: BodyChunked + legacy forces buffering",
116137
plugins: []requesthandling.ResponseProcessor{
117-
&fakeResponsePlugin{name: "a", mode: modePtr(requesthandling.BodyChunked)},
138+
&fakeChunkPlugin{fakeResponsePlugin{name: "a", mode: chunkedMode}},
118139
&legacyResponsePlugin{name: "legacy"},
119140
},
120-
wantBuffering: true,
141+
wantBuffering: true,
142+
wantChunkCount: 1,
121143
},
122144
{
123145
name: "mixed: BodyNotNeeded + BodyChunked — no buffering",
124146
plugins: []requesthandling.ResponseProcessor{
125147
&fakeResponsePlugin{name: "a", mode: modePtr(requesthandling.BodyNotNeeded)},
126-
&fakeResponsePlugin{name: "b", mode: modePtr(requesthandling.BodyChunked)},
148+
&fakeChunkPlugin{fakeResponsePlugin{name: "b", mode: chunkedMode}},
127149
},
128-
wantBuffering: false,
150+
wantBuffering: false,
151+
wantChunkCount: 1,
129152
},
130153
}
131154

@@ -136,35 +159,75 @@ func TestComputeResponseBuffering(t *testing.T) {
136159
ResponsePlugins: tc.plugins,
137160
},
138161
}
139-
computeResponseBuffering(profiles, logger)
162+
if err := computeResponseBuffering(profiles, logger); err != nil {
163+
t.Fatalf("computeResponseBuffering() unexpected error: %v", err)
164+
}
140165
if profiles["test"].NeedsResponseBuffering != tc.wantBuffering {
141166
t.Errorf("NeedsResponseBuffering = %v, want %v", profiles["test"].NeedsResponseBuffering, tc.wantBuffering)
142167
}
168+
if got := len(profiles["test"].ChunkProcessors); got != tc.wantChunkCount {
169+
t.Errorf("ChunkProcessors count = %d, want %d", got, tc.wantChunkCount)
170+
}
143171
})
144172
}
145173
}
146174

175+
func TestComputeResponseBuffering_BodyChunkedWithoutChunkProcessor(t *testing.T) {
176+
logger := log.FromContext(logutil.NewTestLoggerIntoContext(context.Background()))
177+
178+
profiles := map[string]*requesthandling.Profile{
179+
"test": {
180+
ResponsePlugins: []requesthandling.ResponseProcessor{
181+
&badChunkedPlugin{fakeResponsePlugin{name: "bad", mode: modePtr(requesthandling.BodyChunked)}},
182+
},
183+
},
184+
}
185+
186+
err := computeResponseBuffering(profiles, logger)
187+
if err == nil {
188+
t.Fatal("expected error for BodyChunked plugin without ChunkProcessor")
189+
}
190+
if !strings.Contains(err.Error(), "does not implement ChunkProcessor") {
191+
t.Errorf("error message should mention ChunkProcessor, got: %v", err)
192+
}
193+
}
194+
147195
func TestComputeResponseBuffering_MultipleProfiles(t *testing.T) {
148196
logger := log.FromContext(logutil.NewTestLoggerIntoContext(context.Background()))
149197

198+
chunkedMode := modePtr(requesthandling.BodyChunked)
199+
150200
profiles := map[string]*requesthandling.Profile{
151201
"streaming": {
152202
ResponsePlugins: []requesthandling.ResponseProcessor{
153203
&fakeResponsePlugin{name: "headers-only", mode: modePtr(requesthandling.BodyNotNeeded)},
154204
},
155205
},
206+
"chunked": {
207+
ResponsePlugins: []requesthandling.ResponseProcessor{
208+
&fakeChunkPlugin{fakeResponsePlugin{name: "meter", mode: chunkedMode}},
209+
},
210+
},
156211
"full-body": {
157212
ResponsePlugins: []requesthandling.ResponseProcessor{
158213
&fakeResponsePlugin{name: "translator", mode: modePtr(requesthandling.BodyFull)},
159214
},
160215
},
161216
}
162217

163-
computeResponseBuffering(profiles, logger)
218+
if err := computeResponseBuffering(profiles, logger); err != nil {
219+
t.Fatalf("unexpected error: %v", err)
220+
}
164221

165222
if profiles["streaming"].NeedsResponseBuffering {
166223
t.Error("streaming profile should not need buffering")
167224
}
225+
if profiles["chunked"].NeedsResponseBuffering {
226+
t.Error("chunked profile should not need buffering")
227+
}
228+
if len(profiles["chunked"].ChunkProcessors) != 1 {
229+
t.Errorf("chunked profile should have 1 ChunkProcessor, got %d", len(profiles["chunked"].ChunkProcessors))
230+
}
168231
if !profiles["full-body"].NeedsResponseBuffering {
169232
t.Error("full-body profile should need buffering")
170233
}

pkg/framework/interface/requesthandling/plugins.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,11 @@ func (m ResponseBodyMode) String() string {
8989
type ResponseBodyRequirement interface {
9090
ResponseBodyMode() ResponseBodyMode
9191
}
92+
93+
// ChunkProcessor allows a response plugin to process individual response body chunks without
94+
// waiting for the full body. Plugins declaring BodyChunked MUST implement this interface
95+
// (validated at startup). The framework calls ProcessResponseChunk for each chunk as it arrives,
96+
// then acks the chunk to Envoy so it's forwarded to the client immediately.
97+
type ChunkProcessor interface {
98+
ProcessResponseChunk(ctx context.Context, cycleState *plugin.CycleState, chunk []byte, isFinal bool) ([]byte, error)
99+
}

pkg/framework/interface/requesthandling/types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,9 @@ type Profile struct {
126126
// HandleResponseBody with the full body on EndOfStream. When false, each chunk is acked
127127
// immediately and forwarded to the client without buffering.
128128
NeedsResponseBuffering bool
129+
130+
// ChunkProcessors are the response plugins that implement ChunkProcessor, pre-computed at
131+
// startup. When NeedsResponseBuffering is false, these are called for each response body
132+
// chunk before it is acked to Envoy.
133+
ChunkProcessors []ChunkProcessor
129134
}

pkg/handlers/response.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,10 @@ func (s *Server) generateEmptyResponseBodyResponse(responseBodyBytes []byte) []*
131131
return responses
132132
}
133133

134-
// ackResponseBodyChunk returns an immediate ack for a response body chunk, allowing Envoy
135-
// to forward it to the client without waiting for the full body to be accumulated.
136-
func (s *Server) ackResponseBodyChunk(body *eppb.HttpBody) []*eppb.ProcessingResponse {
134+
// ackResponseBodyChunkData returns an immediate ack for a response body chunk with the given
135+
// data and EndOfStream flag, allowing Envoy to forward it to the client without waiting for
136+
// the full body to be accumulated.
137+
func (s *Server) ackResponseBodyChunkData(data []byte, endOfStream bool) []*eppb.ProcessingResponse {
137138
return []*eppb.ProcessingResponse{
138139
{
139140
Response: &eppb.ProcessingResponse_ResponseBody{
@@ -142,8 +143,8 @@ func (s *Server) ackResponseBodyChunk(body *eppb.HttpBody) []*eppb.ProcessingRes
142143
BodyMutation: &eppb.BodyMutation{
143144
Mutation: &eppb.BodyMutation_StreamedResponse{
144145
StreamedResponse: &eppb.StreamedBodyResponse{
145-
Body: body.Body,
146-
EndOfStream: body.EndOfStream,
146+
Body: data,
147+
EndOfStream: endOfStream,
147148
},
148149
},
149150
},
@@ -154,6 +155,21 @@ func (s *Server) ackResponseBodyChunk(body *eppb.HttpBody) []*eppb.ProcessingRes
154155
}
155156
}
156157

158+
// runChunkProcessors runs all ChunkProcessors for the selected profile on a response body chunk.
159+
func (s *Server) runChunkProcessors(ctx context.Context, reqCtx *RequestContext, chunk []byte, isFinal bool) ([]byte, error) {
160+
logger := log.FromContext(ctx).V(logutil.DEFAULT)
161+
data := chunk
162+
for _, cp := range reqCtx.Profile.ChunkProcessors {
163+
var err error
164+
data, err = cp.ProcessResponseChunk(ctx, reqCtx.CycleState, data, isFinal)
165+
if err != nil {
166+
logger.Error(err, "ChunkProcessor failed")
167+
return nil, err
168+
}
169+
}
170+
return data, nil
171+
}
172+
157173
// HandleResponseTrailers handles response trailers.
158174
func (s *Server) HandleResponseTrailers(trailers *eppb.HttpTrailers) ([]*eppb.ProcessingResponse, error) {
159175
return []*eppb.ProcessingResponse{

0 commit comments

Comments
 (0)