From c23b0c64bd707e9b492eafecdecf446b25c9615f Mon Sep 17 00:00:00 2001 From: Yu Yi Date: Thu, 28 May 2026 19:56:52 +0000 Subject: [PATCH] Harden event log writes --- event-exporter/sinks/stackdriver/writer.go | 7 +- .../sinks/stackdriver/writer_test.go | 92 +++++++++++++++++++ 2 files changed, 96 insertions(+), 3 deletions(-) create mode 100644 event-exporter/sinks/stackdriver/writer_test.go diff --git a/event-exporter/sinks/stackdriver/writer.go b/event-exporter/sinks/stackdriver/writer.go index 0d11b3084..15d71055f 100644 --- a/event-exporter/sinks/stackdriver/writer.go +++ b/event-exporter/sinks/stackdriver/writer.go @@ -48,9 +48,10 @@ func newSdWriter(service *sd.Service) sdWriter { // unless the API returns BadRequest error. func (w sdWriterImpl) Write(entries []*sd.LogEntry, logName string, resource *sd.MonitoredResource) { req := &sd.WriteLogEntriesRequest{ - Entries: entries, - LogName: logName, - Resource: resource, + Entries: entries, + LogName: logName, + PartialSuccess: true, + Resource: resource, } // We retry forever, until request either succeeds or API returns diff --git a/event-exporter/sinks/stackdriver/writer_test.go b/event-exporter/sinks/stackdriver/writer_test.go new file mode 100644 index 000000000..9f589fb3e --- /dev/null +++ b/event-exporter/sinks/stackdriver/writer_test.go @@ -0,0 +1,92 @@ +/* +Copyright 2026 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stackdriver + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + sd "google.golang.org/api/logging/v2" +) + +func TestWriterEnablesPartialSuccess(t *testing.T) { + type writeRequest struct { + method string + path string + body *sd.WriteLogEntriesRequest + err error + } + + requests := make(chan writeRequest, 1) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + var req sd.WriteLogEntriesRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + requests <- writeRequest{ + method: r.Method, + path: r.URL.Path, + err: fmt.Errorf("decode request body: %w", err), + } + } else { + requests <- writeRequest{ + method: r.Method, + path: r.URL.Path, + body: &req, + } + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{}`)) + })) + defer server.Close() + + service, err := sd.New(server.Client()) + if err != nil { + t.Fatalf("New logging service: %v", err) + } + service.BasePath = server.URL + "/" + + writer := sdWriterImpl{service: service} + writer.Write( + []*sd.LogEntry{{TextPayload: "entry"}}, + "projects/test-project/logs/events", + &sd.MonitoredResource{Type: k8sCluster}, + ) + + select { + case req := <-requests: + if req.err != nil { + t.Fatal(req.err) + } + if req.path != "/v2/entries:write" { + t.Fatalf("request path = %q, want %q", req.path, "/v2/entries:write") + } + if req.method != http.MethodPost { + t.Fatalf("request method = %q, want %q", req.method, http.MethodPost) + } + if !req.body.PartialSuccess { + t.Fatal("PartialSuccess = false, want true") + } + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for write request") + } +}