Skip to content

Commit c234c6a

Browse files
authored
Merge pull request #438 from fredbi/refact/runtime-submit
Refact/runtime submit
2 parents 3412b4e + 4049a8d commit c234c6a

5 files changed

Lines changed: 426 additions & 146 deletions

File tree

client/content_negotiation_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ func runBuildHTTPCases(t *testing.T, cases iter.Seq[buildHTTPCase]) {
9595

9696
func runBuildHTTPCase(tc buildHTTPCase) func(*testing.T) {
9797
return func(t *testing.T) {
98+
ctx := t.Context()
9899
method := tc.method
99100
if method == "" {
100101
method = http.MethodPost
@@ -110,7 +111,9 @@ func runBuildHTTPCase(tc buildHTTPCase) func(*testing.T) {
110111

111112
r := request.New(method, "/", writer)
112113
r.SetConsumes(tc.consumes)
113-
req, err := r.BuildHTTP(tc.mediaType, "/", producers, strfmt.Default, nil)
114+
req, cancel, err := r.BuildHTTPContext(ctx, tc.mediaType, "/", producers, strfmt.Default, nil)
115+
defer cancel()
116+
114117
if tc.wantErr != "" {
115118
require.Error(t, err)
116119
assert.Contains(t, err.Error(), tc.wantErr)
@@ -395,7 +398,7 @@ func payloadStructCases() iter.Seq[buildHTTPCase] {
395398
//
396399
// Cases with empty consumes exercise the buildHTTP-direct entry point
397400
// (i.e. external callers of BuildHTTP that have already picked a mime
398-
// without going through createHttpRequest).
401+
// without going through createHTTPRequest).
399402
func payloadReaderCases() iter.Seq[buildHTTPCase] {
400403
return slices.Values([]buildHTTPCase{
401404
{

client/internal/request/consts_test.go

Lines changed: 0 additions & 14 deletions
This file was deleted.

client/internal/request/request.go

Lines changed: 92 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ var _ runtime.ClientRequest = new(Request) // ensure compliance to the interface
3737
//
3838
// # Request binding
3939
//
40-
// The binding of parameters is carried out by method [request.BuildHTTP].
40+
// The binding of parameters is carried out by method [Request.BuildHTTPContext].
4141
//
4242
// It analyzes parameters, which may come in different flavors:
4343
//
@@ -52,8 +52,8 @@ var _ runtime.ClientRequest = new(Request) // ensure compliance to the interface
5252
// - file, multipart form or io.Reader body: a streaming request with an attached go routine that consumes the [io.Reader].
5353
// - buffered body: a simple request
5454
//
55-
// In all cases, it is left to the caller to set the request's [context.Context]: [request.BuildHTTP] only builds
56-
// requests with [context.Background].
55+
// The caller passes the parent [context.Context] to [Request.BuildHTTPContext] and receives back a cancel
56+
// function to release the resources held by the derived request context once the response is consumed.
5757
//
5858
// # Authentication
5959
//
@@ -289,7 +289,9 @@ func (r *Request) SetConsumes(consumers []string) {
289289
r.consumes = consumers
290290
}
291291

292-
// BuildHTTP dispatches to one of two end-to-end builders based on whether:
292+
// BuildHTTPContext binds the request parameters and returns a ready-to-send [http.Request].
293+
//
294+
// Dispatch picks one of two end-to-end builders based on whether:
293295
//
294296
// - the body source is a stream (multipart pipe or stream payload)
295297
// - or a buffer (urlencoded form, producer output, or no body)
@@ -299,17 +301,56 @@ func (r *Request) SetConsumes(consumers []string) {
299301
//
300302
// The split mirrors the auth question: streaming bodies require a lazy body-copy closure during AuthenticateRequest,
301303
// whereas buffered bodies do not.
302-
func (r *Request) BuildHTTP(mediaType, basePath string, producers map[string]runtime.Producer, registry strfmt.Registry, auth runtime.ClientAuthInfoWriter) (*http.Request, error) {
304+
//
305+
// The returned [http.Request] carries a context derived from parentCtx that:
306+
//
307+
// - inherits any deadline or cancellation already set on parentCtx;
308+
// - additionally honors the per-request timeout set via [Request.SetTimeout]
309+
// (the [runtime.ClientRequestWriter] may override the runtime default during
310+
// WriteToRequest, which is why the derivation happens here rather than
311+
// at the call site).
312+
//
313+
// The returned cancel must be invoked by the caller (typically deferred)
314+
// once the response has been fully read; otherwise resources held by the
315+
// derived context — including any timeout timer — are leaked.
316+
//
317+
// On error the cancel is invoked internally and a no-op cancel is returned,
318+
// so callers can defer cancel unconditionally.
319+
func (r *Request) BuildHTTPContext(parentCtx context.Context, mediaType, basePath string, producers map[string]runtime.Producer, registry strfmt.Registry, auth runtime.ClientAuthInfoWriter) (*http.Request, context.CancelFunc, error) {
303320
if err := r.writer.WriteToRequest(r, registry); err != nil {
304-
return nil, err
321+
return nil, noop, err
305322
}
306323

324+
ctx, cancel := deriveRequestContext(parentCtx, r.timeout)
307325
r.buf = bytes.NewBuffer(nil)
308326

327+
var (
328+
httpReq *http.Request
329+
err error
330+
)
309331
if r.usesStreamingBody(mediaType) {
310-
return r.buildStreamingRequest(mediaType, basePath, producers, registry, auth)
332+
httpReq, err = r.buildStreamingRequest(ctx, mediaType, basePath, producers, registry, auth)
333+
} else {
334+
httpReq, err = r.buildBufferedRequest(ctx, mediaType, basePath, producers, registry, auth)
335+
}
336+
if err != nil {
337+
cancel()
338+
return nil, noop, err
339+
}
340+
return httpReq, cancel, nil
341+
}
342+
343+
func noop() {}
344+
345+
// deriveRequestContext returns a child of parent bounded by timeout.
346+
// If timeout == 0 the child is only canceled when the caller invokes
347+
// cancel; any deadline already on parent is preserved. If timeout > 0
348+
// the child uses the shortest of timeout and parent's existing deadline.
349+
func deriveRequestContext(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
350+
if timeout == 0 {
351+
return context.WithCancel(parent)
311352
}
312-
return r.buildBufferedRequest(mediaType, basePath, producers, registry, auth)
353+
return context.WithTimeout(parent, timeout)
313354
}
314355

315356
// usesStreamingBody reports whether the request body must be assembled
@@ -368,7 +409,7 @@ func (r *Request) isMultipart(mediaType string) bool {
368409
//
369410
// Auth is trivial in this flow because the buffer is already populated when the auth helper
370411
// asks for the body via r.GetBody().
371-
func (r *Request) buildBufferedRequest(mediaType, basePath string, producers map[string]runtime.Producer, registry strfmt.Registry, auth runtime.ClientAuthInfoWriter) (*http.Request, error) {
412+
func (r *Request) buildBufferedRequest(ctx context.Context, mediaType, basePath string, producers map[string]runtime.Producer, registry strfmt.Registry, auth runtime.ClientAuthInfoWriter) (*http.Request, error) {
372413
var body io.Reader
373414
var err error
374415

@@ -392,7 +433,7 @@ func (r *Request) buildBufferedRequest(mediaType, basePath string, producers map
392433
}
393434
}
394435

395-
return r.assembleRequest(basePath, body)
436+
return r.assembleRequest(ctx, basePath, body)
396437
}
397438

398439
// buildStreamingRequest assembles a request whose body is a stream —
@@ -409,10 +450,10 @@ func (r *Request) buildBufferedRequest(mediaType, basePath string, producers map
409450
// (it would otherwise park forever on pw.Write with no reader).
410451
//
411452
// For stream payloads it closes the user-provided io.ReadCloser.
412-
func (r *Request) buildStreamingRequest(mediaType, basePath string, producers map[string]runtime.Producer, registry strfmt.Registry, auth runtime.ClientAuthInfoWriter) (req *http.Request, retErr error) {
453+
func (r *Request) buildStreamingRequest(ctx context.Context, mediaType, basePath string, producers map[string]runtime.Producer, registry strfmt.Registry, auth runtime.ClientAuthInfoWriter) (req *http.Request, retErr error) {
413454
var body io.Reader
414455
if len(r.formFields) > 0 || len(r.fileFields) > 0 {
415-
body = r.writeMultipartBody(mediaType)
456+
body = r.writeMultipartBody(ctx, mediaType)
416457
} else {
417458
body = r.writeStreamPayload(mediaType, producers)
418459
}
@@ -435,19 +476,19 @@ func (r *Request) buildStreamingRequest(mediaType, basePath string, producers ma
435476
return nil, err
436477
}
437478

438-
return r.assembleRequest(basePath, body)
479+
return r.assembleRequest(ctx, basePath, body)
439480
}
440481

441482
// assembleRequest is the shared tail of both flows: build the URL
442483
// path, create the http.Request, merge static query parameters, and
443484
// finalize headers/query.
444-
func (r *Request) assembleRequest(basePath string, body io.Reader) (*http.Request, error) {
485+
func (r *Request) assembleRequest(ctx context.Context, basePath string, body io.Reader) (*http.Request, error) {
445486
urlPath, staticQueryParams, err := r.resolveURLPath(basePath)
446487
if err != nil {
447488
return nil, err
448489
}
449490

450-
req, err := http.NewRequestWithContext(context.Background(), r.method, urlPath, body)
491+
req, err := http.NewRequestWithContext(ctx, r.method, urlPath, body)
451492
if err != nil {
452493
return nil, err
453494
}
@@ -625,12 +666,12 @@ func (r *Request) writeURLEncodedBody(mediaType string) (io.Reader, error) {
625666
// The goroutine owns the pipe writer's lifecycle: it closes the
626667
// multipart writer (flushing the closing boundary) and the pipe writer
627668
// when it finishes or hits an error.
628-
func (r *Request) writeMultipartBody(mediaType string) io.Reader {
669+
func (r *Request) writeMultipartBody(ctx context.Context, mediaType string) io.Reader {
629670
pr, pw := io.Pipe()
630671
mp := multipart.NewWriter(pw)
631672
r.header.Set(runtime.HeaderContentType, mangleContentType(mediaType, mp.Boundary()))
632673

633-
go r.streamMultipartParts(mp, pw)
674+
go r.streamMultipartParts(ctx, mp, pw)
634675

635676
return pr
636677
}
@@ -639,14 +680,23 @@ func (r *Request) writeMultipartBody(mediaType string) io.Reader {
639680
// closing mp and pw when done.
640681
//
641682
// Errors are reported by closing pw with the error so the consumer of pr observes them on its next Read.
642-
func (r *Request) streamMultipartParts(mp *multipart.Writer, pw *io.PipeWriter) {
683+
//
684+
// Context cancellation is observed at iteration boundaries (between
685+
// fields and between files) and during file copy via a context-aware
686+
// reader. When ctx is canceled the pipe writer is closed with ctx.Err()
687+
// so the body consumer surfaces the cancellation as the read error.
688+
func (r *Request) streamMultipartParts(ctx context.Context, mp *multipart.Writer, pw *io.PipeWriter) {
643689
defer func() {
644690
mp.Close()
645691
pw.Close()
646692
}()
647693

648694
for fn, v := range r.formFields {
649695
for _, vi := range v {
696+
if err := ctx.Err(); err != nil {
697+
_ = pw.CloseWithError(err)
698+
return
699+
}
650700
if err := mp.WriteField(fn, vi); err != nil {
651701
logClose(err, pw)
652702
return
@@ -664,6 +714,11 @@ func (r *Request) streamMultipartParts(mp *multipart.Writer, pw *io.PipeWriter)
664714

665715
for fn, f := range r.fileFields {
666716
for _, fi := range f {
717+
if err := ctx.Err(); err != nil {
718+
_ = pw.CloseWithError(err)
719+
return
720+
}
721+
667722
var fileContentType string
668723
if p, ok := fi.(runtime.ContentTyper); ok {
669724
fileContentType = p.ContentType()
@@ -692,13 +747,31 @@ func (r *Request) streamMultipartParts(mp *multipart.Writer, pw *io.PipeWriter)
692747
logClose(err, pw)
693748
return
694749
}
695-
if _, err := io.Copy(wrtr, fi); err != nil {
750+
if _, err := io.Copy(wrtr, &ctxReader{ctx: ctx, r: fi}); err != nil {
696751
logClose(err, pw)
752+
return
697753
}
698754
}
699755
}
700756
}
701757

758+
// ctxReader wraps an [io.Reader] with a context check on each Read. Once
759+
// ctx is done, subsequent Reads return ctx.Err() instead of delegating
760+
// to the underlying reader. It does not preempt a Read already in flight
761+
// — that is the source's responsibility (e.g. *os.File honors Close from
762+
// another goroutine, network sources honor SetDeadline).
763+
type ctxReader struct {
764+
ctx context.Context //nolint:containedctx // io.Reader's Read method has no ctx parameter, so the wrapper must carry it on the struct
765+
r io.Reader
766+
}
767+
768+
func (cr *ctxReader) Read(p []byte) (int, error) {
769+
if err := cr.ctx.Err(); err != nil {
770+
return 0, err
771+
}
772+
return cr.r.Read(p)
773+
}
774+
702775
// writeStreamPayload handles a stream payload (io.Reader /
703776
// io.ReadCloser). The bytes flow through verbatim — no producer is
704777
// invoked. The wire Content-Type is resolved via setStreamContentType

0 commit comments

Comments
 (0)