Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 9 additions & 55 deletions internal/gensupport/resumable.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (rx *ResumableUpload) Progress() int64 {
// size is the number of bytes in data.
// final specifies whether data is the final chunk to be uploaded.
func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader, off, size int64, final bool) (*http.Response, error) {
req, err := http.NewRequest("POST", rx.URI, data)
req, err := http.NewRequest(http.MethodPost, rx.URI, data)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -139,65 +139,19 @@ func (rx *ResumableUpload) reportProgress(old, updated int64) {
}

// transferChunk performs a single HTTP request to upload a single chunk.
// It uses a goroutine to perform the upload and a timer to enforce ChunkTransferTimeout.
// If ChunkTransferTimeout is set, the request will be cancelled if it takes longer.
func (rx *ResumableUpload) transferChunk(ctx context.Context, chunk io.Reader, off, size int64, done bool) (*http.Response, error) {
// If no timeout is specified, perform the request synchronously without a timer.
// If no timeout is specified, perform the request directly.
if rx.ChunkTransferTimeout == 0 {
res, err := rx.doUploadRequest(ctx, chunk, off, size, done)
if err != nil {
return res, err
}
return res, nil
return rx.doUploadRequest(ctx, chunk, off, size, done)
}

// Start a timer for the ChunkTransferTimeout duration.
timer := time.NewTimer(rx.ChunkTransferTimeout)

// A struct to hold the result from the goroutine.
type uploadResult struct {
res *http.Response
err error
}
// Use context.WithTimeout to enforce ChunkTransferTimeout.
// This combines the timer and cancellable context into one.
rCtx, cancel := context.WithTimeout(ctx, rx.ChunkTransferTimeout)
defer cancel()

// A buffered channel to receive the result of the upload.
resultCh := make(chan uploadResult, 1)

// Create a cancellable context for the upload request. This allows us to
// abort the request if the timer fires first.
rCtx, cancel := context.WithCancel(ctx)
// NOTE: We do NOT use `defer cancel()` here. The context must remain valid
// for the caller to read the response body of a successful request.
// Cancellation is handled manually on timeout paths.

// Starting the chunk upload in parallel.
go func() {
res, err := rx.doUploadRequest(rCtx, chunk, off, size, done)
resultCh <- uploadResult{res: res, err: err}
}()

// Wait for timer to fire or result channel to have the uploadResult or ctx to be cancelled.
select {
// Note: Calling cancel() will guarantee that the goroutine finishes,
// so these two cases will never block forever on draining the resultCh.
case <-ctx.Done():
// Context is cancelled for the overall upload.
cancel()
// Drain resultCh.
<-resultCh
return nil, ctx.Err()
case <-timer.C:
// Chunk Transfer timer fired before resultCh so we return context.DeadlineExceeded.
cancel()
// Drain resultCh.
<-resultCh
return nil, context.DeadlineExceeded
case result := <-resultCh:
// Handle the result from the upload.
if result.err != nil {
return result.res, result.err
}
return result.res, nil
}
return rx.doUploadRequest(rCtx, chunk, off, size, done)
}

// uploadChunkWithRetries attempts to upload a single chunk, with retries
Expand Down
Loading