Skip to content

Commit 0bb3bf2

Browse files
committed
adds a decompressing roundtripper
This roundtripper makes sure to hand over to the application code a decompressed http.Request.Body. This centralizes the decompression at the reverse proxy layer.
1 parent 79e62d8 commit 0bb3bf2

4 files changed

Lines changed: 123 additions & 33 deletions

File tree

pkg/authz/responsefilterer.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package authz
22

33
import (
44
"bytes"
5-
"compress/gzip"
65
"context"
76
"encoding/json"
87
"fmt"
@@ -234,30 +233,7 @@ func (rf *StandardResponseFilterer) FilterResp(resp *http.Response) error {
234233
return nil
235234
}
236235

237-
bodyStream := resp.Body
238-
239-
// NOTE: we need to manually check for gzipped encoding here
240-
// because we're proxying - the request/response context would
241-
// know about the encoding if we were the ones originating the
242-
// request, but since we aren't, we need to manually check this.
243-
// This is needed because the k8s API will automatically gzip responses
244-
// above 128kb by default.
245-
if resp.Header.Get("Content-Encoding") == "gzip" {
246-
bodyStream, err = gzip.NewReader(bodyStream)
247-
if err != nil {
248-
return err
249-
}
250-
defer func() {
251-
_ = bodyStream.Close()
252-
}()
253-
// We're decompressing the body here, so remove the header.
254-
// If we leave it, the downstream HTTP transport will try to
255-
// gzip-decompress the already-plain bytes and fail with
256-
// "gzip: invalid header".
257-
resp.Header.Del("Content-Encoding")
258-
}
259-
260-
body, err := io.ReadAll(bodyStream)
236+
body, err := io.ReadAll(resp.Body)
261237
if err != nil {
262238
return err
263239
}

pkg/proxy/embedded_test.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -558,14 +558,9 @@ func TestGzippedUpstreamResponse(t *testing.T) {
558558

559559
body, err := io.ReadAll(resp.Body)
560560
require.NoError(t, err)
561-
require.Equal(t, "gzip", resp.Header.Get("Content-Encoding"), "expecting the response to be gzipped")
561+
require.Empty(t, resp.Header.Get("Content-Encoding"),
562+
"decompressingTransport must strip Content-Encoding: gzip after decompression")
562563
require.Equal(t, responseText+"\n", string(body), "unexpected body value")
563-
564-
// The proxy should relay the response successfully.
565-
// A 502 Bad Gateway here means FilterResp attempted to decode the gzip
566-
// bytes as JSON/protobuf without first decompressing them. The fix is to
567-
// detect Content-Encoding: gzip in FilterResp and decompress the body
568-
// before passing it to the codec decoder.
569564
require.Equal(t, http.StatusOK, resp.StatusCode,
570565
"proxy must handle gzip-encoded upstream responses; a 502 indicates the body was not decompressed before decoding")
571566
}

pkg/proxy/server.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package proxy
22

33
import (
4+
"bytes"
5+
"compress/gzip"
46
"context"
57
"fmt"
8+
"io"
69
"net/http"
710
"net/http/httputil"
811
"os"
@@ -110,7 +113,7 @@ func NewServer(ctx context.Context, c *CompletedConfig) (*Server, error) {
110113
}
111114
return responseFilterer.FilterResp(response)
112115
},
113-
Transport: transport,
116+
Transport: &decompressingTransport{base: transport},
114117
ErrorHandler: func(writer http.ResponseWriter, h *http.Request, err error) {
115118
klog.V(3).InfoSDepth(1, "upstream Kubernetes API error response", "error", err)
116119
writer.WriteHeader(http.StatusBadGateway)
@@ -387,3 +390,39 @@ func (t *authHeaderTransport) RoundTrip(req *http.Request) (*http.Response, erro
387390

388391
return t.base.RoundTrip(newReq)
389392
}
393+
394+
// decompressingTransport wraps an http.RoundTripper and transparently decompresses
395+
// Content-Encoding: gzip responses from the upstream.
396+
type decompressingTransport struct {
397+
base http.RoundTripper
398+
}
399+
400+
func (t *decompressingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
401+
base := t.base
402+
if base == nil {
403+
base = http.DefaultTransport
404+
}
405+
406+
resp, err := base.RoundTrip(req)
407+
if err != nil || resp == nil || resp.Header.Get("Content-Encoding") != "gzip" {
408+
return resp, err
409+
}
410+
411+
gr, err := gzip.NewReader(resp.Body)
412+
if err != nil {
413+
_ = resp.Body.Close()
414+
return nil, fmt.Errorf("decompressingTransport: gzip.NewReader: %w", err)
415+
}
416+
417+
body, err := io.ReadAll(gr)
418+
_ = gr.Close()
419+
_ = resp.Body.Close()
420+
if err != nil {
421+
return nil, fmt.Errorf("decompressingTransport: reading gzip body: %w", err)
422+
}
423+
424+
resp.Body = io.NopCloser(bytes.NewReader(body))
425+
resp.Header.Del("Content-Encoding")
426+
resp.ContentLength = int64(len(body))
427+
return resp, nil
428+
}

pkg/proxy/transport_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package proxy
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
"io"
7+
"net/http"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
// TestDecompressingTransport verifies that decompressingTransport transparently
14+
// decompresses Content-Encoding: gzip responses and strips the header.
15+
//
16+
// This prevents the "gzip: invalid header" error that occurs when a real
17+
// http.Transport (used by kubernetes.Clientset) receives a response with
18+
// Content-Encoding: gzip but a body that has already been decompressed by
19+
// FilterResp. The transport would wrap the body in a gzip.Reader and fail.
20+
func TestDecompressingTransport(t *testing.T) {
21+
t.Parallel()
22+
23+
plain := []byte(`{"apiVersion":"v1","kind":"Secret","metadata":{"name":"s","namespace":"default"}}`)
24+
25+
var compressed bytes.Buffer
26+
gzw := gzip.NewWriter(&compressed)
27+
_, err := gzw.Write(plain)
28+
require.NoError(t, err)
29+
require.NoError(t, gzw.Close())
30+
31+
t.Run("decompresses gzip and strips header", func(t *testing.T) {
32+
t.Parallel()
33+
34+
base := roundTripFunc(func(_ *http.Request) (*http.Response, error) {
35+
return &http.Response{
36+
StatusCode: http.StatusOK,
37+
Header: http.Header{"Content-Encoding": []string{"gzip"}, "Content-Type": []string{"application/json"}},
38+
Body: io.NopCloser(bytes.NewReader(compressed.Bytes())),
39+
ContentLength: int64(compressed.Len()),
40+
}, nil
41+
})
42+
43+
resp, err := (&decompressingTransport{base: base}).RoundTrip(&http.Request{}) //nolint: bodyclose
44+
require.NoError(t, err)
45+
require.Empty(t, resp.Header.Get("Content-Encoding"),
46+
"Content-Encoding must be stripped after decompression")
47+
require.Equal(t, int64(len(plain)), resp.ContentLength)
48+
49+
got, err := io.ReadAll(resp.Body)
50+
require.NoError(t, err)
51+
require.Equal(t, plain, got)
52+
})
53+
54+
t.Run("non-gzip response passes through unchanged", func(t *testing.T) {
55+
t.Parallel()
56+
57+
base := roundTripFunc(func(_ *http.Request) (*http.Response, error) {
58+
return &http.Response{
59+
StatusCode: http.StatusOK,
60+
Header: http.Header{"Content-Type": []string{"application/json"}},
61+
Body: io.NopCloser(bytes.NewReader(plain)),
62+
}, nil
63+
})
64+
65+
resp, err := (&decompressingTransport{base: base}).RoundTrip(&http.Request{}) //nolint: bodyclose
66+
require.NoError(t, err)
67+
require.Empty(t, resp.Header.Get("Content-Encoding"))
68+
69+
got, err := io.ReadAll(resp.Body)
70+
require.NoError(t, err)
71+
require.Equal(t, plain, got)
72+
})
73+
}
74+
75+
// roundTripFunc is a helper that implements http.RoundTripper via a function.
76+
type roundTripFunc func(*http.Request) (*http.Response, error)
77+
78+
func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
79+
return f(req)
80+
}

0 commit comments

Comments
 (0)