Skip to content

Commit 0fba903

Browse files
authored
Merge pull request #38 from shubham-stepsecurity/sm/update
feat(mdm-npm): add gzip compression for stdout/stderr output
2 parents 22f15ad + aaae474 commit 0fba903

2 files changed

Lines changed: 185 additions & 8 deletions

File tree

internal/telemetry/telemetry.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package telemetry
22

33
import (
44
"bytes"
5+
"compress/gzip"
56
"context"
67
"encoding/json"
78
"fmt"
@@ -525,9 +526,18 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload) err
525526
return fmt.Errorf("marshaling payload: %w", err)
526527
}
527528

529+
// Gzip-compress the payload before upload. The backend signals support by
530+
// honoring is_compressed=true on the upload-URL request and appending .gz
531+
// to the S3 key, which tells GetTelemetryFromS3 to decompress on read.
532+
compressedPayload, err := gzipBytes(payloadJSON)
533+
if err != nil {
534+
return fmt.Errorf("compressing payload: %w", err)
535+
}
536+
528537
// Request upload URL
529-
reqBody, _ := json.Marshal(map[string]string{
530-
"device_id": payload.DeviceID,
538+
reqBody, _ := json.Marshal(map[string]any{
539+
"device_id": payload.DeviceID,
540+
"is_compressed": true,
531541
})
532542

533543
uploadURLEndpoint := fmt.Sprintf("%s/v1/%s/developer-mdm-agent/telemetry/upload-url",
@@ -562,15 +572,15 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload) err
562572
return fmt.Errorf("empty upload URL in response")
563573
}
564574

565-
// Upload payload to S3 with retry — use a longer timeout since payloads
566-
// with npm scan data and execution logs can be several MB.
567-
log.Progress("Uploading telemetry to S3 (%d bytes)...", len(payloadJSON))
575+
// Upload payload to S3 with retry. Content-Type stays application/json to
576+
// match the presigned URL's signed headers — the body is gzipped JSON bytes.
577+
log.Progress("Uploading telemetry to S3 (%d bytes)...", len(compressedPayload))
568578
s3Client := &http.Client{Timeout: 10 * time.Minute}
569579
const maxRetries = 3
570580
var putResp *http.Response
571581
for attempt := 1; attempt <= maxRetries; attempt++ {
572582
uploadStart := time.Now()
573-
putReq, reqErr := http.NewRequestWithContext(ctx, http.MethodPut, urlResp.UploadURL, bytes.NewReader(payloadJSON))
583+
putReq, reqErr := http.NewRequestWithContext(ctx, http.MethodPut, urlResp.UploadURL, bytes.NewReader(compressedPayload))
574584
if reqErr != nil {
575585
return fmt.Errorf("creating S3 PUT request: %w", reqErr)
576586
}
@@ -598,10 +608,10 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload) err
598608
if attempt == maxRetries {
599609
if err != nil {
600610
return fmt.Errorf("uploading to S3 (payload: %d bytes, elapsed: %s, attempts: %d): %w",
601-
len(payloadJSON), elapsed, maxRetries, err)
611+
len(compressedPayload), elapsed, maxRetries, err)
602612
}
603613
return fmt.Errorf("S3 upload failed with status %d (payload: %d bytes, attempts: %d)",
604-
putResp.StatusCode, len(payloadJSON), maxRetries)
614+
putResp.StatusCode, len(compressedPayload), maxRetries)
605615
}
606616

607617
// Log retry and backoff
@@ -654,6 +664,19 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload) err
654664
return nil
655665
}
656666

667+
// gzipBytes returns a gzip-compressed copy of the input bytes.
668+
func gzipBytes(data []byte) ([]byte, error) {
669+
var buf bytes.Buffer
670+
gz := gzip.NewWriter(&buf)
671+
if _, err := gz.Write(data); err != nil {
672+
return nil, err
673+
}
674+
if err := gz.Close(); err != nil {
675+
return nil, err
676+
}
677+
return buf.Bytes(), nil
678+
}
679+
657680
func resolveSearchDirs(exec executor.Executor, dirs []string) []string {
658681
resolved := make([]string, 0, len(dirs))
659682
for _, d := range dirs {
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package telemetry
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
"context"
7+
"encoding/json"
8+
"io"
9+
"net/http"
10+
"net/http/httptest"
11+
"strings"
12+
"sync"
13+
"testing"
14+
15+
"github.com/step-security/dev-machine-guard/internal/config"
16+
"github.com/step-security/dev-machine-guard/internal/progress"
17+
)
18+
19+
func TestGzipBytes_RoundTrip(t *testing.T) {
20+
original := []byte(`{"customer_id":"acme","node_projects":[{"project_path":"/x"}]}`)
21+
compressed, err := gzipBytes(original)
22+
if err != nil {
23+
t.Fatalf("gzipBytes failed: %v", err)
24+
}
25+
if len(compressed) < 2 || compressed[0] != 0x1f || compressed[1] != 0x8b {
26+
t.Fatal("expected gzip magic bytes")
27+
}
28+
29+
gz, err := gzip.NewReader(bytes.NewReader(compressed))
30+
if err != nil {
31+
t.Fatalf("gzip.NewReader failed: %v", err)
32+
}
33+
defer gz.Close()
34+
got, err := io.ReadAll(gz)
35+
if err != nil {
36+
t.Fatalf("decompression failed: %v", err)
37+
}
38+
if !bytes.Equal(got, original) {
39+
t.Errorf("round-trip mismatch: got %q, want %q", got, original)
40+
}
41+
}
42+
43+
func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) {
44+
var (
45+
mu sync.Mutex
46+
uploadURLBody []byte
47+
putBody []byte
48+
putContentType string
49+
notifyBody []byte
50+
)
51+
52+
// Mock S3 PUT endpoint — captures the body the agent uploads.
53+
s3Server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
54+
body, _ := io.ReadAll(r.Body)
55+
mu.Lock()
56+
putBody = body
57+
putContentType = r.Header.Get("Content-Type")
58+
mu.Unlock()
59+
w.WriteHeader(http.StatusOK)
60+
}))
61+
defer s3Server.Close()
62+
63+
// Mock backend — handles upload-URL and process-uploaded calls.
64+
backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
65+
switch {
66+
case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"):
67+
body, _ := io.ReadAll(r.Body)
68+
mu.Lock()
69+
uploadURLBody = body
70+
mu.Unlock()
71+
_ = json.NewEncoder(w).Encode(map[string]string{
72+
"upload_url": s3Server.URL + "/put",
73+
"s3_key": "developer-mdm/test-customer/dev-1/123.json.gz",
74+
})
75+
case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"):
76+
body, _ := io.ReadAll(r.Body)
77+
mu.Lock()
78+
notifyBody = body
79+
mu.Unlock()
80+
w.WriteHeader(http.StatusOK)
81+
default:
82+
http.NotFound(w, r)
83+
}
84+
}))
85+
defer backendServer.Close()
86+
87+
// Override config globals for the duration of the test.
88+
origEndpoint, origCustomer, origKey := config.APIEndpoint, config.CustomerID, config.APIKey
89+
config.APIEndpoint = backendServer.URL
90+
config.CustomerID = "test-customer"
91+
config.APIKey = "test-key"
92+
defer func() {
93+
config.APIEndpoint, config.CustomerID, config.APIKey = origEndpoint, origCustomer, origKey
94+
}()
95+
96+
payload := &Payload{
97+
CustomerID: "test-customer",
98+
DeviceID: "dev-1",
99+
}
100+
101+
if err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), payload); err != nil {
102+
t.Fatalf("uploadToS3 failed: %v", err)
103+
}
104+
105+
mu.Lock()
106+
defer mu.Unlock()
107+
108+
// Upload-URL request body must include is_compressed: true.
109+
var uploadReq map[string]any
110+
if err := json.Unmarshal(uploadURLBody, &uploadReq); err != nil {
111+
t.Fatalf("failed to parse upload-URL request body: %v", err)
112+
}
113+
if uploadReq["device_id"] != "dev-1" {
114+
t.Errorf("expected device_id=dev-1, got %v", uploadReq["device_id"])
115+
}
116+
if uploadReq["is_compressed"] != true {
117+
t.Errorf("expected is_compressed=true, got %v", uploadReq["is_compressed"])
118+
}
119+
120+
// PUT body must be gzip-compressed.
121+
if len(putBody) < 2 || putBody[0] != 0x1f || putBody[1] != 0x8b {
122+
t.Fatalf("expected gzip-compressed PUT body (got %d bytes)", len(putBody))
123+
}
124+
if putContentType != "application/json" {
125+
t.Errorf("expected Content-Type application/json (matches presigned URL), got %q", putContentType)
126+
}
127+
128+
// Decompressing the PUT body should yield the original JSON payload.
129+
gz, err := gzip.NewReader(bytes.NewReader(putBody))
130+
if err != nil {
131+
t.Fatalf("PUT body is not valid gzip: %v", err)
132+
}
133+
defer gz.Close()
134+
decompressed, err := io.ReadAll(gz)
135+
if err != nil {
136+
t.Fatalf("failed to decompress PUT body: %v", err)
137+
}
138+
var roundTrip Payload
139+
if err := json.Unmarshal(decompressed, &roundTrip); err != nil {
140+
t.Fatalf("decompressed body is not valid JSON: %v", err)
141+
}
142+
if roundTrip.DeviceID != "dev-1" {
143+
t.Errorf("decompressed payload device_id mismatch: got %q", roundTrip.DeviceID)
144+
}
145+
146+
// Notify-backend was called with the s3_key returned from the upload-URL endpoint.
147+
var notify map[string]string
148+
if err := json.Unmarshal(notifyBody, &notify); err != nil {
149+
t.Fatalf("failed to parse notify body: %v", err)
150+
}
151+
if !strings.HasSuffix(notify["s3_key"], ".json.gz") {
152+
t.Errorf("expected s3_key with .json.gz suffix, got %q", notify["s3_key"])
153+
}
154+
}

0 commit comments

Comments
 (0)