Skip to content

Commit 6397542

Browse files
committed
dialers/retrier: support conns other than tcp
1 parent 9561858 commit 6397542

1 file changed

Lines changed: 31 additions & 10 deletions

File tree

intra/dialers/retrier.go

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ type retrier struct {
7777
// thread, so the reader functions may access it without acquiring a lock.
7878
// nb: if embedding TCPConn; override its WriteTo instead of just ReadFrom
7979
// as io.Copy prefers WriteTo over ReadFrom; or use core.Pipe
80-
conn core.DuplexConn
80+
conn protect.Conn
8181

8282
// External read and write deadlines. These need to be stored here so that
8383
// they can be re-applied in the event of a retry.
@@ -289,7 +289,7 @@ func (r *retrier) dialStratLocked() (strat int32, err error) {
289289

290290
// dialLocked establishes a new connection to r.raddr and closes existing, if any.
291291
// Sets r.conn on non-errors and timeout as calculated from round-trip time.
292-
func (r *retrier) dialLocked() (c core.DuplexConn, err error) {
292+
func (r *retrier) dialLocked() (c protect.Conn, err error) {
293293
clos(r.conn) // close existing connection, if any
294294

295295
strat, err := r.dialStratLocked()
@@ -312,16 +312,21 @@ func (r *retrier) dialLocked() (c core.DuplexConn, err error) {
312312

313313
// dialStrat returns a core.DuplexConn to r.raddr using a specified strategy, strat,
314314
// which is one of the settings.Split* constants.
315-
func (r *retrier) doDialLocked(dialStrat int32) (_ core.DuplexConn, err error) {
315+
func (r *retrier) doDialLocked(dialStrat int32) (_ protect.Conn, err error) {
316316
var conn *net.TCPConn
317317

318318
di := 0 // di is always 0 if not multidial
319+
319320
if r.multidial {
320321
di = r.nextDialerIdx
321322
if di >= len(r.dialers) {
322323
return nil, errNoDialer
323324
}
324325
r.nextDialerIdx = di + 1
326+
if r.laddr != nil {
327+
return r.dialers[di].DialBind(r.raddr.Network(), r.laddr.String(), r.raddr.String())
328+
}
329+
return r.dialers[di].Dial(r.raddr.Network(), r.raddr.String())
325330
}
326331

327332
// r.raddr may be nil or laddr.IP may be nil.
@@ -338,7 +343,7 @@ func (r *retrier) doDialLocked(dialStrat int32) (_ core.DuplexConn, err error) {
338343
if err != nil || conn == nil {
339344
return nil, err
340345
}
341-
// todo: strat must be tcp or tls
346+
// todo: assert strat must be tcp or tls?
342347
return &splitter{conn: conn, strat: dialStrat}, nil
343348
}
344349

@@ -367,13 +372,13 @@ func (r *retrier) retryWriteReadLocked(buf []byte) (int, error) {
367372
readdone := r.readDone.Load()
368373
writedone := r.writeDone.Load()
369374
if readdone {
370-
core.CloseTCPRead(newConn)
375+
core.CloseOp(newConn, core.CopR)
371376
} else {
372377
_ = newConn.SetReadDeadline(r.readDeadline)
373378
}
374379
// caller might have set read or write deadlines before the retry.
375380
if writedone {
376-
core.CloseTCPWrite(newConn)
381+
core.CloseOp(newConn, core.CopW)
377382
} else {
378383
_ = newConn.SetWriteDeadline(r.writeDeadline)
379384
}
@@ -561,12 +566,28 @@ func (r *retrier) ReadFrom(reader io.Reader) (bytes int64, err error) {
561566
}
562567
}
563568

569+
optimizedReadFrom := true
564570
var b int64
565-
b, err = c.ReadFrom(reader)
566-
bytes += b
571+
switch x := c.(type) {
572+
case *net.TCPConn:
573+
b, err = x.ReadFrom(reader)
574+
bytes += b
575+
case *splitter:
576+
b, err = x.ReadFrom(reader)
577+
bytes += b
578+
case io.ReaderFrom:
579+
b, err = x.ReadFrom(reader)
580+
bytes += b
581+
default:
582+
// *net.UDPConn, net.PacketConn etc?
583+
optimizedReadFrom = false
584+
// read from reader until EOF
585+
b, err = core.Stream(c, reader)
586+
bytes += b
587+
}
567588

568-
logeif(err)("retrier: readfrom: done (id: %s, pinned? %t); sz: %d; err: %v",
569-
pinnedID, pinned, bytes, err)
589+
logeif(err)("retrier: readfrom: (optimized? %t) done (id: %s, pinned? %t); sz: %d; err: %v",
590+
optimizedReadFrom, pinnedID, pinned, bytes, err)
570591
return
571592
}
572593

0 commit comments

Comments
 (0)