Skip to content

Commit b87bbf7

Browse files
committed
Detect JSON-RPC application errors in proxy audit middleware
The audit middleware previously determined outcome solely from the HTTP status code. Because MCP servers return errors inside HTTP 200 responses per the JSON-RPC spec, application-level failures (expired tokens, API errors, invalid parameters) were logged as outcome=success, making them invisible in audit logs. This change inspects the response body for a JSON-RPC error field when the HTTP status is 200. If found, the outcome is recorded as application_error with the error code and truncated message in metadata. A new DetectApplicationErrors config flag (default true) controls the behavior independently of IncludeResponseData, avoiding the need to enable full response capture just for error detection. Fixes #4678 Signed-off-by: Guillermo Gomez <guillermogomezmora@gmail.com>
1 parent 3238246 commit b87bbf7

File tree

6 files changed

+447
-4
lines changed

6 files changed

+447
-4
lines changed

pkg/audit/auditor.go

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,26 @@ func (a *Auditor) isSSETransport() bool {
106106
return a.transportType == types.TransportTypeSSE.String()
107107
}
108108

109+
// errorDetectionBufferSize is the maximum number of bytes buffered from the
110+
// response body for JSON-RPC error detection. JSON-RPC error responses have
111+
// the "error" field near the top of the object, so a small prefix is
112+
// sufficient. This buffer is allocated independently of IncludeResponseData.
113+
const errorDetectionBufferSize = 512
114+
115+
// maxAuditErrorMessageLength caps the JSON-RPC error message length stored
116+
// in audit event metadata to keep log entries compact.
117+
const maxAuditErrorMessageLength = 256
118+
109119
// responseWriter wraps http.ResponseWriter to capture response data and status.
110120
type responseWriter struct {
111121
http.ResponseWriter
112122
statusCode int
113123
body *bytes.Buffer
114-
auditor *Auditor
124+
// errorDetectionBody is a small prefix buffer used exclusively for
125+
// JSON-RPC error detection. It is allocated when DetectApplicationErrors
126+
// is true, independent of IncludeResponseData.
127+
errorDetectionBody *bytes.Buffer
128+
auditor *Auditor
115129
}
116130

117131
func (rw *responseWriter) WriteHeader(statusCode int) {
@@ -127,6 +141,15 @@ func (rw *responseWriter) Write(data []byte) (int, error) {
127141
rw.body.Write(data)
128142
}
129143
}
144+
// Capture a small prefix for JSON-RPC error detection
145+
if rw.errorDetectionBody != nil && rw.errorDetectionBody.Len() < errorDetectionBufferSize {
146+
remaining := errorDetectionBufferSize - rw.errorDetectionBody.Len()
147+
if len(data) <= remaining {
148+
rw.errorDetectionBody.Write(data)
149+
} else {
150+
rw.errorDetectionBody.Write(data[:remaining])
151+
}
152+
}
130153
return rw.ResponseWriter.Write(data)
131154
}
132155

@@ -201,6 +224,13 @@ func (a *Auditor) Middleware(next http.Handler) http.Handler {
201224
rw.body = &bytes.Buffer{}
202225
}
203226

227+
// Allocate a small prefix buffer for JSON-RPC error detection,
228+
// independent of IncludeResponseData. When IncludeResponseData
229+
// is already true, we reuse rw.body instead of double-buffering.
230+
if a.config.ShouldDetectApplicationErrors() && !a.config.IncludeResponseData {
231+
rw.errorDetectionBody = &bytes.Buffer{}
232+
}
233+
204234
// Process the request
205235
next.ServeHTTP(rw, r)
206236

@@ -220,6 +250,29 @@ func (a *Auditor) logAuditEvent(r *http.Request, rw *responseWriter, requestData
220250
// Determine outcome based on status code
221251
outcome := a.determineOutcome(rw.statusCode)
222252

253+
// When HTTP status indicates success, check the response body for
254+
// JSON-RPC errors (e.g., expired tokens wrapped inside HTTP 200).
255+
// Reuse rw.body when IncludeResponseData is on to avoid double-buffering.
256+
var mcpResponse *mcp.ParsedMCPResponse
257+
if outcome == OutcomeSuccess && a.config.ShouldDetectApplicationErrors() {
258+
var prefix []byte
259+
if rw.body != nil && rw.body.Len() > 0 {
260+
prefix = rw.body.Bytes()
261+
if len(prefix) > errorDetectionBufferSize {
262+
prefix = prefix[:errorDetectionBufferSize]
263+
}
264+
} else if rw.errorDetectionBody != nil && rw.errorDetectionBody.Len() > 0 {
265+
prefix = rw.errorDetectionBody.Bytes()
266+
}
267+
// Only attempt JSON parse if the prefix looks like a JSON object
268+
if len(prefix) > 0 && prefix[0] == '{' {
269+
mcpResponse = mcp.ParseMCPResponse(prefix)
270+
if mcpResponse.HasError {
271+
outcome = OutcomeApplicationError
272+
}
273+
}
274+
}
275+
223276
// Check if we should audit this event
224277
if !a.config.ShouldAuditEvent(eventType) {
225278
return
@@ -246,6 +299,20 @@ func (a *Auditor) logAuditEvent(r *http.Request, rw *responseWriter, requestData
246299
// Add metadata
247300
a.addMetadata(event, r, duration, rw)
248301

302+
// Attach JSON-RPC error details so operators can see the error code
303+
// and message without enabling full response data capture.
304+
if outcome == OutcomeApplicationError {
305+
if event.Metadata.Extra == nil {
306+
event.Metadata.Extra = make(map[string]any)
307+
}
308+
event.Metadata.Extra["jsonrpc_error_code"] = mcpResponse.ErrorCode
309+
msg := mcpResponse.ErrorMessage
310+
if len(msg) > maxAuditErrorMessageLength {
311+
msg = msg[:maxAuditErrorMessageLength]
312+
}
313+
event.Metadata.Extra["jsonrpc_error_message"] = msg
314+
}
315+
249316
// Add request/response data if configured
250317
a.addEventData(event, r, rw, requestData)
251318

pkg/audit/auditor_test.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,3 +803,202 @@ func TestExtractSourceWithHeaders(t *testing.T) {
803803
assert.Equal(t, "TestAgent/1.0", source.Extra[SourceExtraKeyUserAgent])
804804
assert.Equal(t, "req-12345", source.Extra[SourceExtraKeyRequestID])
805805
}
806+
807+
func TestErrorDetectionBodyCapture(t *testing.T) {
808+
t.Parallel()
809+
810+
t.Run("captures prefix when DetectApplicationErrors is enabled", func(t *testing.T) {
811+
t.Parallel()
812+
detectErrors := true
813+
config := &Config{
814+
DetectApplicationErrors: &detectErrors,
815+
}
816+
auditor, err := NewAuditorWithTransport(config, "streamable-http")
817+
require.NoError(t, err)
818+
819+
rw := &responseWriter{
820+
ResponseWriter: httptest.NewRecorder(),
821+
statusCode: http.StatusOK,
822+
auditor: auditor,
823+
errorDetectionBody: &bytes.Buffer{},
824+
}
825+
826+
responseData := `{"jsonrpc":"2.0","id":"1","error":{"code":-32603,"message":"test error"}}`
827+
_, err = rw.Write([]byte(responseData))
828+
require.NoError(t, err)
829+
830+
assert.Equal(t, responseData, rw.errorDetectionBody.String())
831+
})
832+
833+
t.Run("does not capture when DetectApplicationErrors is disabled", func(t *testing.T) {
834+
t.Parallel()
835+
detectErrors := false
836+
config := &Config{
837+
DetectApplicationErrors: &detectErrors,
838+
}
839+
auditor, err := NewAuditorWithTransport(config, "streamable-http")
840+
require.NoError(t, err)
841+
842+
rw := &responseWriter{
843+
ResponseWriter: httptest.NewRecorder(),
844+
statusCode: http.StatusOK,
845+
auditor: auditor,
846+
// errorDetectionBody is nil when detection is disabled
847+
}
848+
849+
_, err = rw.Write([]byte(`{"error":{"code":-32603}}`))
850+
require.NoError(t, err)
851+
852+
assert.Nil(t, rw.errorDetectionBody)
853+
})
854+
855+
t.Run("truncates capture at buffer size limit", func(t *testing.T) {
856+
t.Parallel()
857+
detectErrors := true
858+
config := &Config{
859+
DetectApplicationErrors: &detectErrors,
860+
}
861+
auditor, err := NewAuditorWithTransport(config, "streamable-http")
862+
require.NoError(t, err)
863+
864+
rw := &responseWriter{
865+
ResponseWriter: httptest.NewRecorder(),
866+
statusCode: http.StatusOK,
867+
auditor: auditor,
868+
errorDetectionBody: &bytes.Buffer{},
869+
}
870+
871+
// Write more than errorDetectionBufferSize bytes
872+
largeData := bytes.Repeat([]byte("x"), errorDetectionBufferSize+100)
873+
_, err = rw.Write(largeData)
874+
require.NoError(t, err)
875+
876+
assert.Equal(t, errorDetectionBufferSize, rw.errorDetectionBody.Len())
877+
})
878+
879+
t.Run("captures independently of IncludeResponseData", func(t *testing.T) {
880+
t.Parallel()
881+
detectErrors := true
882+
config := &Config{
883+
IncludeResponseData: false,
884+
DetectApplicationErrors: &detectErrors,
885+
}
886+
auditor, err := NewAuditorWithTransport(config, "streamable-http")
887+
require.NoError(t, err)
888+
889+
rw := &responseWriter{
890+
ResponseWriter: httptest.NewRecorder(),
891+
statusCode: http.StatusOK,
892+
auditor: auditor,
893+
errorDetectionBody: &bytes.Buffer{},
894+
// body is nil because IncludeResponseData is false
895+
}
896+
897+
responseData := `{"jsonrpc":"2.0","id":"1","error":{"code":-32603,"message":"unauthorized"}}`
898+
_, err = rw.Write([]byte(responseData))
899+
require.NoError(t, err)
900+
901+
// errorDetectionBody should capture even though body is nil
902+
assert.Equal(t, responseData, rw.errorDetectionBody.String())
903+
assert.Nil(t, rw.body)
904+
})
905+
}
906+
907+
func TestMiddlewareDetectsJSONRPCErrors(t *testing.T) {
908+
t.Parallel()
909+
910+
t.Run("overrides outcome to application_error for JSON-RPC error response", func(t *testing.T) {
911+
t.Parallel()
912+
var logBuf bytes.Buffer
913+
detectErrors := true
914+
config := &Config{
915+
DetectApplicationErrors: &detectErrors,
916+
}
917+
auditor, err := NewAuditorWithTransport(config, "streamable-http")
918+
require.NoError(t, err)
919+
auditor.auditLogger = NewAuditLogger(&logBuf)
920+
921+
errorResponse := `{"jsonrpc":"2.0","id":"1","error":{"code":-32603,"message":"GitLab API error: 401 Unauthorized"}}`
922+
handler := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
923+
w.WriteHeader(http.StatusOK)
924+
_, err := w.Write([]byte(errorResponse))
925+
require.NoError(t, err)
926+
})
927+
928+
middleware := auditor.Middleware(handler)
929+
req := httptest.NewRequest("POST", "/mcp", strings.NewReader(`{"jsonrpc":"2.0","id":"1","method":"tools/call","params":{"name":"test"}}`))
930+
req.Header.Set("Content-Type", "application/json")
931+
rr := httptest.NewRecorder()
932+
933+
middleware.ServeHTTP(rr, req)
934+
935+
// The response should still be passed through unchanged
936+
assert.Equal(t, http.StatusOK, rr.Code)
937+
assert.Equal(t, errorResponse, rr.Body.String())
938+
939+
// The audit log should contain application_error
940+
logOutput := logBuf.String()
941+
assert.Contains(t, logOutput, OutcomeApplicationError)
942+
assert.Contains(t, logOutput, "jsonrpc_error_code")
943+
})
944+
945+
t.Run("keeps outcome=success for valid JSON-RPC result", func(t *testing.T) {
946+
t.Parallel()
947+
var logBuf bytes.Buffer
948+
detectErrors := true
949+
config := &Config{
950+
DetectApplicationErrors: &detectErrors,
951+
}
952+
auditor, err := NewAuditorWithTransport(config, "streamable-http")
953+
require.NoError(t, err)
954+
auditor.auditLogger = NewAuditLogger(&logBuf)
955+
956+
successResponse := `{"jsonrpc":"2.0","id":"1","result":{"content":[{"type":"text","text":"hello"}]}}`
957+
handler := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
958+
w.WriteHeader(http.StatusOK)
959+
_, err := w.Write([]byte(successResponse))
960+
require.NoError(t, err)
961+
})
962+
963+
middleware := auditor.Middleware(handler)
964+
req := httptest.NewRequest("POST", "/mcp", strings.NewReader(`{"jsonrpc":"2.0","id":"1","method":"tools/call","params":{"name":"test"}}`))
965+
req.Header.Set("Content-Type", "application/json")
966+
rr := httptest.NewRecorder()
967+
968+
middleware.ServeHTTP(rr, req)
969+
970+
assert.Equal(t, http.StatusOK, rr.Code)
971+
972+
logOutput := logBuf.String()
973+
assert.NotContains(t, logOutput, OutcomeApplicationError)
974+
})
975+
976+
t.Run("does not inspect body when DetectApplicationErrors is disabled", func(t *testing.T) {
977+
t.Parallel()
978+
var logBuf bytes.Buffer
979+
detectErrors := false
980+
config := &Config{
981+
DetectApplicationErrors: &detectErrors,
982+
}
983+
auditor, err := NewAuditorWithTransport(config, "streamable-http")
984+
require.NoError(t, err)
985+
auditor.auditLogger = NewAuditLogger(&logBuf)
986+
987+
errorResponse := `{"jsonrpc":"2.0","id":"1","error":{"code":-32603,"message":"should not be detected"}}`
988+
handler := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
989+
w.WriteHeader(http.StatusOK)
990+
_, err := w.Write([]byte(errorResponse))
991+
require.NoError(t, err)
992+
})
993+
994+
middleware := auditor.Middleware(handler)
995+
req := httptest.NewRequest("POST", "/mcp", strings.NewReader(`{"jsonrpc":"2.0","id":"1","method":"tools/call","params":{"name":"test"}}`))
996+
req.Header.Set("Content-Type", "application/json")
997+
rr := httptest.NewRecorder()
998+
999+
middleware.ServeHTTP(rr, req)
1000+
1001+
logOutput := logBuf.String()
1002+
assert.NotContains(t, logOutput, OutcomeApplicationError)
1003+
})
1004+
}

pkg/audit/config.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ type Config struct {
4040
// +kubebuilder:default=false
4141
// +optional
4242
IncludeResponseData bool `json:"includeResponseData,omitempty" yaml:"includeResponseData,omitempty"`
43+
// DetectApplicationErrors controls whether the audit middleware inspects
44+
// JSON-RPC response bodies for application-level errors when the HTTP
45+
// status code indicates success (2xx). When enabled, a small prefix of
46+
// the response body is buffered to detect JSON-RPC error fields,
47+
// independent of the IncludeResponseData setting.
48+
// +kubebuilder:default=true
49+
// +optional
50+
DetectApplicationErrors *bool `json:"detectApplicationErrors,omitempty" yaml:"detectApplicationErrors,omitempty"`
4351
// MaxDataSize limits the size of request/response data included in audit logs (in bytes).
4452
// +kubebuilder:default=1024
4553
// +optional
@@ -66,13 +74,24 @@ func (c *Config) GetLogWriter() (io.Writer, error) {
6674

6775
// DefaultConfig returns a default audit configuration.
6876
func DefaultConfig() *Config {
77+
detectErrors := true
6978
return &Config{
7079
// Note, these defaults are also present on the kubebuilder annotations above.
7180
// If you change these defaults, you must also change the kubebuilder annotations.
72-
IncludeRequestData: false, // Disabled by default for privacy
73-
IncludeResponseData: false, // Disabled by default for privacy
74-
MaxDataSize: 1024, // 1KB default limit
81+
IncludeRequestData: false, // Disabled by default for privacy
82+
IncludeResponseData: false, // Disabled by default for privacy
83+
MaxDataSize: 1024, // 1KB default limit
84+
DetectApplicationErrors: &detectErrors, // Enabled by default to surface JSON-RPC errors
85+
}
86+
}
87+
88+
// ShouldDetectApplicationErrors returns whether JSON-RPC error detection is enabled.
89+
// Defaults to true when DetectApplicationErrors is nil.
90+
func (c *Config) ShouldDetectApplicationErrors() bool {
91+
if c.DetectApplicationErrors == nil {
92+
return true
7593
}
94+
return *c.DetectApplicationErrors
7695
}
7796

7897
// LoadFromFile loads audit configuration from a file.

pkg/audit/event.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,10 @@ const (
210210
OutcomeError = "error"
211211
// OutcomeDenied indicates the event was denied (e.g., by authorization)
212212
OutcomeDenied = "denied"
213+
// OutcomeApplicationError indicates the HTTP transport succeeded but the
214+
// JSON-RPC response body contained an application-level error (e.g.,
215+
// expired tokens, backend failures, invalid parameters).
216+
OutcomeApplicationError = "application_error"
213217
)
214218

215219
// Common source types

0 commit comments

Comments
 (0)