Skip to content

Commit 7c2ff1b

Browse files
committed
tun, conn, device: allocate buffers in the I/O devices
Previously crypto device maintained a batch of preallocated to the MaxMessageSize buffers that the I/O only needs to read into. This change inverts the buffer ownership. A (wrapped) nil pointer is passed into I/O. Device expects a backing array to be allocated, and a slice of read data cut to offset+size returned from the read within the same wrapper. The wrapper is defined in buffer.Buffer, and carries the backing reference and an optional Recycler implementation to return the backing for reuse. I/O is encouraged to implement a buffer management solution that works best for its host OS. A shared sync.Pool is provided as a default option. Updates tailscale/corp#36989 Signed-off-by: Alex Valiushko <alexvaliushko@tailscale.com> Change-Id: I58908d9d3fd09441e9378a74b0ee19136a6a6964
1 parent 20486f7 commit 7c2ff1b

36 files changed

Lines changed: 413 additions & 232 deletions

conn/bind_std.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"sync"
1717
"syscall"
1818

19+
"github.com/tailscale/wireguard-go/iobuf"
1920
"golang.org/x/net/ipv4"
2021
"golang.org/x/net/ipv6"
2122
)
@@ -233,13 +234,13 @@ func (s *StdNetBind) receiveIP(
233234
br batchReader,
234235
conn *net.UDPConn,
235236
rxOffload bool,
236-
bufs [][]byte,
237-
sizes []int,
237+
bufs []iobuf.View,
238238
eps []Endpoint,
239239
) (n int, err error) {
240240
msgs := s.getMessages()
241+
iobuf.EnsureAllocated(bufs)
241242
for i := range bufs {
242-
(*msgs)[i].Buffers[0] = bufs[i]
243+
(*msgs)[i].Buffers[0] = bufs[i].Bytes
243244
(*msgs)[i].OOB = (*msgs)[i].OOB[:cap((*msgs)[i].OOB)]
244245
}
245246
defer s.putMessages(msgs)
@@ -271,8 +272,8 @@ func (s *StdNetBind) receiveIP(
271272
}
272273
for i := 0; i < numMsgs; i++ {
273274
msg := &(*msgs)[i]
274-
sizes[i] = msg.N
275-
if sizes[i] == 0 {
275+
bufs[i].Bytes = bufs[i].Bytes[:msg.N]
276+
if len(bufs[i].Bytes) == 0 {
276277
continue
277278
}
278279
addrPort := msg.Addr.(*net.UDPAddr).AddrPort()
@@ -284,14 +285,14 @@ func (s *StdNetBind) receiveIP(
284285
}
285286

286287
func (s *StdNetBind) makeReceiveIPv4(pc *ipv4.PacketConn, conn *net.UDPConn, rxOffload bool) ReceiveFunc {
287-
return func(bufs [][]byte, sizes []int, eps []Endpoint) (n int, err error) {
288-
return s.receiveIP(pc, conn, rxOffload, bufs, sizes, eps)
288+
return func(bufs []iobuf.View, eps []Endpoint) (n int, err error) {
289+
return s.receiveIP(pc, conn, rxOffload, bufs, eps)
289290
}
290291
}
291292

292293
func (s *StdNetBind) makeReceiveIPv6(pc *ipv6.PacketConn, conn *net.UDPConn, rxOffload bool) ReceiveFunc {
293-
return func(bufs [][]byte, sizes []int, eps []Endpoint) (n int, err error) {
294-
return s.receiveIP(pc, conn, rxOffload, bufs, sizes, eps)
294+
return func(bufs []iobuf.View, eps []Endpoint) (n int, err error) {
295+
return s.receiveIP(pc, conn, rxOffload, bufs, eps)
295296
}
296297
}
297298

conn/bind_std_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net"
66
"testing"
77

8+
"github.com/tailscale/wireguard-go/iobuf"
89
"golang.org/x/net/ipv6"
910
)
1011

@@ -15,15 +16,14 @@ func TestStdNetBindReceiveFuncAfterClose(t *testing.T) {
1516
t.Fatal(err)
1617
}
1718
bind.Close()
18-
bufs := make([][]byte, 1)
19-
bufs[0] = make([]byte, 1)
20-
sizes := make([]int, 1)
19+
bufs := make([]iobuf.View, 1)
20+
bufs[0] = iobuf.View{Bytes: make([]byte, 1)}
2121
eps := make([]Endpoint, 1)
2222
for _, fn := range fns {
2323
// The ReceiveFuncs must not access conn-related fields on StdNetBind
2424
// unguarded. Close() nils the conn-related fields resulting in a panic
2525
// if they violate the mutex.
26-
fn(bufs, sizes, eps)
26+
fn(bufs, eps)
2727
}
2828
}
2929

conn/bind_windows.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"golang.org/x/sys/windows"
1919

2020
"github.com/tailscale/wireguard-go/conn/winrio"
21+
"github.com/tailscale/wireguard-go/iobuf"
2122
)
2223

2324
const (
@@ -416,20 +417,22 @@ retry:
416417
return n, &ep, nil
417418
}
418419

419-
func (bind *WinRingBind) receiveIPv4(bufs [][]byte, sizes []int, eps []Endpoint) (int, error) {
420+
func (bind *WinRingBind) receiveIPv4(bufs []iobuf.View, eps []Endpoint) (int, error) {
420421
bind.mu.RLock()
421422
defer bind.mu.RUnlock()
422-
n, ep, err := bind.v4.Receive(bufs[0], &bind.isOpen)
423-
sizes[0] = n
423+
iobuf.EnsureAllocated(bufs[:1])
424+
n, ep, err := bind.v4.Receive(bufs[0].Bytes, &bind.isOpen)
425+
bufs[0].Bytes = bufs[0].Bytes[:n]
424426
eps[0] = ep
425427
return 1, err
426428
}
427429

428-
func (bind *WinRingBind) receiveIPv6(bufs [][]byte, sizes []int, eps []Endpoint) (int, error) {
430+
func (bind *WinRingBind) receiveIPv6(bufs []iobuf.View, eps []Endpoint) (int, error) {
429431
bind.mu.RLock()
430432
defer bind.mu.RUnlock()
431-
n, ep, err := bind.v6.Receive(bufs[0], &bind.isOpen)
432-
sizes[0] = n
433+
iobuf.EnsureAllocated(bufs[:1])
434+
n, ep, err := bind.v6.Receive(bufs[0].Bytes, &bind.isOpen)
435+
bufs[0].Bytes = bufs[0].Bytes[:n]
433436
eps[0] = ep
434437
return 1, err
435438
}

conn/bindtest/bindtest.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"os"
1414

1515
"github.com/tailscale/wireguard-go/conn"
16+
"github.com/tailscale/wireguard-go/iobuf"
1617
)
1718

1819
type ChannelBind struct {
@@ -94,13 +95,14 @@ func (c *ChannelBind) BatchSize() int { return 1 }
9495
func (c *ChannelBind) SetMark(mark uint32) error { return nil }
9596

9697
func (c *ChannelBind) makeReceiveFunc(ch chan []byte) conn.ReceiveFunc {
97-
return func(bufs [][]byte, sizes []int, eps []conn.Endpoint) (n int, err error) {
98+
return func(bufs []iobuf.View, eps []conn.Endpoint) (n int, err error) {
9899
select {
99100
case <-c.closeSignal:
100101
return 0, net.ErrClosed
101102
case rx := <-ch:
102-
copied := copy(bufs[0], rx)
103-
sizes[0] = copied
103+
iobuf.EnsureAllocated(bufs[:1])
104+
n := copy(bufs[0].Bytes, rx)
105+
bufs[0].Bytes = bufs[0].Bytes[:n]
104106
eps[0] = c.target6
105107
return 1, nil
106108
}

conn/conn.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,20 @@ import (
1313
"reflect"
1414
"runtime"
1515
"strings"
16+
17+
"github.com/tailscale/wireguard-go/iobuf"
1618
)
1719

1820
const (
1921
IdealBatchSize = 128 // maximum number of packets handled per read and write
2022
)
2123

22-
// A ReceiveFunc receives at least one packet from the network and writes them
23-
// into packets. On a successful read it returns the number of elements of
24-
// sizes, packets, and endpoints that should be evaluated. Some elements of
25-
// sizes may be zero, and callers should ignore them. Callers must pass a sizes
26-
// and eps slice with a length greater than or equal to the length of packets.
27-
// These lengths must not exceed the length of the associated Bind.BatchSize().
28-
type ReceiveFunc func(packets [][]byte, sizes []int, eps []Endpoint) (n int, err error)
24+
// A ReceiveFunc receives at least one packet from the network into bufs.
25+
// On a successful read it returns the number of elements of bufs and eps
26+
// that should be evaluated. Callers must pass an eps slice with a length
27+
// greater than or equal to the length of bufs. These lengths must not
28+
// exceed the length of the associated Bind.BatchSize().
29+
type ReceiveFunc func(bufs []iobuf.View, eps []Endpoint) (n int, err error)
2930

3031
// A Bind listens on a port for both IPv6 and IPv4 UDP traffic.
3132
//

conn/conn_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ package conn
77

88
import (
99
"testing"
10+
11+
"github.com/tailscale/wireguard-go/iobuf"
1012
)
1113

1214
func TestPrettyName(t *testing.T) {
1315
var (
14-
recvFunc ReceiveFunc = func(bufs [][]byte, sizes []int, eps []Endpoint) (n int, err error) { return }
16+
recvFunc ReceiveFunc = func(bufs []iobuf.View, eps []Endpoint) (n int, err error) { return }
1517
)
1618

1719
const want = "TestPrettyName"

device/channels.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (device *Device) flushInboundQueue(q *autodrainingInboundQueue) {
9393
case elemsContainer := <-q.c:
9494
elemsContainer.Lock()
9595
for _, elem := range elemsContainer.elems {
96-
device.PutMessageBuffer(elem.buffer)
96+
elem.buffer.Release()
9797
device.PutInboundElement(elem)
9898
}
9999
device.PutInboundElementsContainer(elemsContainer)
@@ -126,7 +126,7 @@ func (device *Device) flushOutboundQueue(q *autodrainingOutboundQueue) {
126126
case elemsContainer := <-q.c:
127127
elemsContainer.Lock()
128128
for _, elem := range elemsContainer.elems {
129-
device.PutMessageBuffer(elem.buffer)
129+
elem.buffer.Release()
130130
device.PutOutboundElement(elem)
131131
}
132132
device.PutOutboundElementsContainer(elemsContainer)

device/constants.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ package device
77

88
import (
99
"time"
10+
11+
"github.com/tailscale/wireguard-go/iobuf"
1012
)
1113

1214
/* Specification constants */
@@ -27,9 +29,9 @@ const (
2729
)
2830

2931
const (
30-
MinMessageSize = MessageKeepaliveSize // minimum size of transport message (keepalive)
31-
MaxMessageSize = MaxSegmentSize // maximum size of transport message
32-
MaxContentSize = MaxSegmentSize - MessageTransportSize - MessageEncapsulatingTransportSize // maximum size of transport message content
32+
MinMessageSize = MessageKeepaliveSize // minimum size of transport message (keepalive)
33+
MaxMessageSize = iobuf.MaxReadSize // maximum size of transport message
34+
MaxContentSize = iobuf.MaxReadSize - MessageTransportSize - MessageEncapsulatingTransportSize // maximum size of transport message content
3335
)
3436

3537
/* Implementation constants */

device/device.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ type Device struct {
7474
pool struct {
7575
inboundElementsContainer *waitpool.WaitPool
7676
outboundElementsContainer *waitpool.WaitPool
77-
messageBuffers *waitpool.WaitPool
7877
inboundElements *waitpool.WaitPool
7978
outboundElements *waitpool.WaitPool
8079
}

device/device_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/tailscale/wireguard-go/conn"
2424
"github.com/tailscale/wireguard-go/conn/bindtest"
25+
"github.com/tailscale/wireguard-go/iobuf"
2526
"github.com/tailscale/wireguard-go/tun"
2627
"github.com/tailscale/wireguard-go/tun/tuntest"
2728
)
@@ -437,7 +438,7 @@ type fakeTUNDeviceSized struct {
437438
}
438439

439440
func (t *fakeTUNDeviceSized) File() *os.File { return nil }
440-
func (t *fakeTUNDeviceSized) Read(bufs [][]byte, sizes []int, offset int) (n int, err error) {
441+
func (t *fakeTUNDeviceSized) Read(bufs []iobuf.View, offset int) (n int, err error) {
441442
return 0, nil
442443
}
443444
func (t *fakeTUNDeviceSized) Write(bufs [][]byte, offset int) (int, error) { return 0, nil }

0 commit comments

Comments
 (0)