Skip to content

Commit a051098

Browse files
authored
Merge pull request #1211 from erain/codex/harden-event-log-writes
Harden event log writes
2 parents 37b221a + c23b0c6 commit a051098

2 files changed

Lines changed: 96 additions & 3 deletions

File tree

event-exporter/sinks/stackdriver/writer.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ func newSdWriter(service *sd.Service) sdWriter {
4848
// unless the API returns BadRequest error.
4949
func (w sdWriterImpl) Write(entries []*sd.LogEntry, logName string, resource *sd.MonitoredResource) {
5050
req := &sd.WriteLogEntriesRequest{
51-
Entries: entries,
52-
LogName: logName,
53-
Resource: resource,
51+
Entries: entries,
52+
LogName: logName,
53+
PartialSuccess: true,
54+
Resource: resource,
5455
}
5556

5657
// We retry forever, until request either succeeds or API returns
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
Copyright 2026 Google Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package stackdriver
18+
19+
import (
20+
"encoding/json"
21+
"fmt"
22+
"net/http"
23+
"net/http/httptest"
24+
"testing"
25+
"time"
26+
27+
sd "google.golang.org/api/logging/v2"
28+
)
29+
30+
func TestWriterEnablesPartialSuccess(t *testing.T) {
31+
type writeRequest struct {
32+
method string
33+
path string
34+
body *sd.WriteLogEntriesRequest
35+
err error
36+
}
37+
38+
requests := make(chan writeRequest, 1)
39+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
40+
defer r.Body.Close()
41+
42+
var req sd.WriteLogEntriesRequest
43+
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
44+
requests <- writeRequest{
45+
method: r.Method,
46+
path: r.URL.Path,
47+
err: fmt.Errorf("decode request body: %w", err),
48+
}
49+
} else {
50+
requests <- writeRequest{
51+
method: r.Method,
52+
path: r.URL.Path,
53+
body: &req,
54+
}
55+
}
56+
w.Header().Set("Content-Type", "application/json")
57+
w.WriteHeader(http.StatusOK)
58+
w.Write([]byte(`{}`))
59+
}))
60+
defer server.Close()
61+
62+
service, err := sd.New(server.Client())
63+
if err != nil {
64+
t.Fatalf("New logging service: %v", err)
65+
}
66+
service.BasePath = server.URL + "/"
67+
68+
writer := sdWriterImpl{service: service}
69+
writer.Write(
70+
[]*sd.LogEntry{{TextPayload: "entry"}},
71+
"projects/test-project/logs/events",
72+
&sd.MonitoredResource{Type: k8sCluster},
73+
)
74+
75+
select {
76+
case req := <-requests:
77+
if req.err != nil {
78+
t.Fatal(req.err)
79+
}
80+
if req.path != "/v2/entries:write" {
81+
t.Fatalf("request path = %q, want %q", req.path, "/v2/entries:write")
82+
}
83+
if req.method != http.MethodPost {
84+
t.Fatalf("request method = %q, want %q", req.method, http.MethodPost)
85+
}
86+
if !req.body.PartialSuccess {
87+
t.Fatal("PartialSuccess = false, want true")
88+
}
89+
case <-time.After(1 * time.Second):
90+
t.Fatal("timed out waiting for write request")
91+
}
92+
}

0 commit comments

Comments
 (0)