Skip to content

Commit 5bb249b

Browse files
blob: removes race condition between ctx cancellation and stream close (#167489)
blob: removes race condition between ctx cancellation and stream close
2 parents d3ac034 + add28c0 commit 5bb249b

1 file changed

Lines changed: 8 additions & 1 deletion

File tree

pkg/blobs/client.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,15 @@ func (c *remoteClient) readFileWithFlowControl(
124124
}
125125

126126
type streamWriter struct {
127+
ctx context.Context
127128
s blobspb.RPCBlob_PutStreamClient
128129
buf blobspb.StreamChunk
129130
}
130131

131132
func (w *streamWriter) Write(p []byte) (int, error) {
133+
if err := w.ctx.Err(); err != nil {
134+
return 0, err
135+
}
132136
n := 0
133137
for len(p) > 0 {
134138
l := copy(w.buf.Payload[:cap(w.buf.Payload)], p)
@@ -145,6 +149,9 @@ func (w *streamWriter) Write(p []byte) (int, error) {
145149
}
146150

147151
func (w *streamWriter) Close() error {
152+
if err := w.ctx.Err(); err != nil {
153+
return err
154+
}
148155
_, err := w.s.CloseAndRecv()
149156
return err
150157
}
@@ -156,7 +163,7 @@ func (c *remoteClient) Writer(ctx context.Context, file string) (io.WriteCloser,
156163
return nil, err
157164
}
158165
buf := make([]byte, 0, ChunkSize)
159-
return &streamWriter{s: stream, buf: blobspb.StreamChunk{Payload: buf}}, nil
166+
return &streamWriter{ctx: ctx, s: stream, buf: blobspb.StreamChunk{Payload: buf}}, nil
160167
}
161168

162169
func (c *remoteClient) List(ctx context.Context, pattern string) ([]string, error) {

0 commit comments

Comments
 (0)