Skip to content

Commit 99c241e

Browse files
committed
memcache: add pipelining, modernize some of the Go
Also: * add synctest tests, which requires newer Go, so build tag those out. We need to bump go.mod to get working synctest, so: * ... also add a CI check that we keep building with Go 1.18 for now, not using Go language/API changes newer than Go 1.18. * few more config knobs Fixes #160
1 parent 4d751bb commit 99c241e

9 files changed

Lines changed: 1897 additions & 383 deletions

File tree

.github/workflows/test.yml

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,42 @@ on:
44
pull_request:
55

66
jobs:
7-
build:
8-
7+
# Verify the library still builds with Go 1.18. GOTOOLCHAIN=local prevents
8+
# auto-upgrade; Go 1.18 itself predates the toolchain directive anyway, so
9+
# it ignores go.mod's "go 1.26" line and just tries to compile. This job
10+
# catches accidental use of post-1.18 language/stdlib features.
11+
# Go 1.18 is super old, but is new enough to have generics.
12+
# TODO(bradfitz): decide on an actual support Go version policy. It'd be nice
13+
# to depend on newer Go.
14+
build-go118:
15+
name: Test on old Go 1.18
916
runs-on: ubuntu-latest
10-
strategy:
11-
matrix:
12-
go-version: [ '1.18', '1.21' ]
17+
steps:
18+
- name: install memcached
19+
run: |
20+
sudo apt-get update
21+
sudo apt-get install -y memcached
22+
- uses: actions/checkout@v4
23+
- uses: actions/setup-go@v5
24+
with:
25+
go-version: '1.18'
26+
- name: go test
27+
env:
28+
GOTOOLCHAIN: local
29+
run: go test ./...
1330

31+
# Full test suite on modern Go, including the synctest-gated pipeline tests.
32+
test:
33+
name: Run Go tests
34+
runs-on: ubuntu-latest
1435
steps:
1536
- name: install memcached
1637
run: |
17-
sudo apt update
18-
sudo apt install memcached
19-
- uses: actions/checkout@v3
20-
- name: Setup Go ${{ matrix.go-version }}
21-
uses: actions/setup-go@v4
38+
sudo apt-get update
39+
sudo apt-get install -y memcached
40+
- uses: actions/checkout@v4
41+
- uses: actions/setup-go@v5
2242
with:
23-
go-version: ${{ matrix.go-version }}
24-
- name: Test
25-
run: go test -v ./...
43+
go-version: '1.26'
44+
- name: go test
45+
run: go test -v -race ./...

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module github.com/bradfitz/gomemcache
22

3-
go 1.18
3+
go 1.26

memcache/fakenet_test.go

Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
/*
2+
Copyright 2026 The gomemcache AUTHORS
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
11+
package memcache
12+
13+
import (
14+
"errors"
15+
"io"
16+
"net"
17+
"sync"
18+
"time"
19+
)
20+
21+
// This file provides a minimal in-memory net.Conn pair with per-direction
22+
// synthetic delay.
23+
//
24+
// Under testing/synctest, time.Sleep and time.AfterFunc use fake time, so
25+
// the "delay" is deterministic and instant in wall-clock terms.
26+
27+
type fakeAddr string
28+
29+
func (a fakeAddr) Network() string { return "fakenet" }
30+
func (a fakeAddr) String() string { return string(a) }
31+
32+
// newFakePipe returns two connected fakeConns. oneWayDelay models link
33+
// latency (the time between Write on one side and the bytes becoming
34+
// readable on the other). A Write records its target arrival time when it
35+
// queues the chunk; the per-direction worker then sleeps to each arrival
36+
// time in order. Under back-to-back Writes, chunks arrive back-to-back,
37+
// emulating an in-flight pipe rather than a queue of serialized latencies.
38+
func newFakePipe(oneWayDelay time.Duration, nameA, nameB string) (a, b *fakeConn) {
39+
a = &fakeConn{
40+
local: fakeAddr(nameA),
41+
remote: fakeAddr(nameB),
42+
delay: oneWayDelay,
43+
done: make(chan struct{}),
44+
sendCh: make(chan fakeChunk, 256),
45+
}
46+
b = &fakeConn{
47+
local: fakeAddr(nameB),
48+
remote: fakeAddr(nameA),
49+
delay: oneWayDelay,
50+
done: make(chan struct{}),
51+
sendCh: make(chan fakeChunk, 256),
52+
}
53+
aToB := make(chan []byte, 256)
54+
bToA := make(chan []byte, 256)
55+
a.outCh = aToB
56+
b.inCh = aToB
57+
b.outCh = bToA
58+
a.inCh = bToA
59+
a.peer = b
60+
b.peer = a
61+
go a.deliverLoop()
62+
go b.deliverLoop()
63+
return
64+
}
65+
66+
type fakeChunk struct {
67+
arriveAt time.Time
68+
data []byte
69+
}
70+
71+
type fakeConn struct {
72+
local fakeAddr
73+
remote fakeAddr
74+
delay time.Duration
75+
76+
peer *fakeConn
77+
78+
sendCh chan fakeChunk // Write pushes here with arrival timestamp
79+
outCh chan []byte // deliverLoop writes here when arrival time passes (peer reads)
80+
inCh chan []byte // chunks arriving from peer (already delayed)
81+
82+
closeOnce sync.Once
83+
done chan struct{}
84+
85+
mu sync.Mutex
86+
readBuf []byte
87+
readDeadline time.Time
88+
writeDeadline time.Time
89+
}
90+
91+
// deliverLoop pops chunks in FIFO order and sleeps until each chunk's
92+
// arrival time. Arrival times are monotonic because Write stamps them at
93+
// submission time on a single-writer send queue.
94+
func (c *fakeConn) deliverLoop() {
95+
for {
96+
select {
97+
case ch, ok := <-c.sendCh:
98+
if !ok {
99+
return
100+
}
101+
if d := time.Until(ch.arriveAt); d > 0 {
102+
timer := time.NewTimer(d)
103+
select {
104+
case <-timer.C:
105+
case <-c.done:
106+
timer.Stop()
107+
return
108+
case <-c.peer.done:
109+
timer.Stop()
110+
return
111+
}
112+
}
113+
select {
114+
case c.outCh <- ch.data:
115+
case <-c.done:
116+
return
117+
case <-c.peer.done:
118+
return
119+
}
120+
case <-c.done:
121+
return
122+
case <-c.peer.done:
123+
return
124+
}
125+
}
126+
}
127+
128+
func (c *fakeConn) LocalAddr() net.Addr { return c.local }
129+
func (c *fakeConn) RemoteAddr() net.Addr { return c.remote }
130+
131+
func (c *fakeConn) SetDeadline(t time.Time) error {
132+
c.SetReadDeadline(t)
133+
c.SetWriteDeadline(t)
134+
return nil
135+
}
136+
137+
func (c *fakeConn) SetReadDeadline(t time.Time) error {
138+
c.mu.Lock()
139+
c.readDeadline = t
140+
c.mu.Unlock()
141+
return nil
142+
}
143+
144+
func (c *fakeConn) SetWriteDeadline(t time.Time) error {
145+
c.mu.Lock()
146+
c.writeDeadline = t
147+
c.mu.Unlock()
148+
return nil
149+
}
150+
151+
func (c *fakeConn) Close() error {
152+
c.closeOnce.Do(func() {
153+
close(c.done)
154+
})
155+
return nil
156+
}
157+
158+
// Read returns data from the per-conn read buffer, refilling it from inCh
159+
// when empty. Respects the read deadline (via time.Timer, fake-time-friendly).
160+
func (c *fakeConn) Read(p []byte) (int, error) {
161+
c.mu.Lock()
162+
if len(c.readBuf) > 0 {
163+
n := copy(p, c.readBuf)
164+
c.readBuf = c.readBuf[n:]
165+
c.mu.Unlock()
166+
return n, nil
167+
}
168+
dl := c.readDeadline
169+
c.mu.Unlock()
170+
171+
var dlCh <-chan time.Time
172+
if !dl.IsZero() {
173+
timer := time.NewTimer(time.Until(dl))
174+
defer timer.Stop()
175+
dlCh = timer.C
176+
}
177+
178+
select {
179+
case chunk, ok := <-c.inCh:
180+
if !ok {
181+
return 0, io.EOF
182+
}
183+
c.mu.Lock()
184+
c.readBuf = append(c.readBuf, chunk...)
185+
n := copy(p, c.readBuf)
186+
c.readBuf = c.readBuf[n:]
187+
c.mu.Unlock()
188+
return n, nil
189+
case <-dlCh:
190+
return 0, errFakeTimeout
191+
case <-c.done:
192+
return 0, io.EOF
193+
case <-c.peer.done:
194+
return 0, io.EOF
195+
}
196+
}
197+
198+
// Write copies the bytes, stamps the link arrival time, and queues for
199+
// delivery. Chunks arrive at the peer at (send_time + delay); back-to-back
200+
// Writes result in back-to-back arrivals, not serialized ones.
201+
func (c *fakeConn) Write(p []byte) (int, error) {
202+
select {
203+
case <-c.done:
204+
return 0, io.ErrClosedPipe
205+
case <-c.peer.done:
206+
return 0, io.ErrClosedPipe
207+
default:
208+
}
209+
b := make([]byte, len(p))
210+
copy(b, p)
211+
ch := fakeChunk{arriveAt: time.Now().Add(c.delay), data: b}
212+
select {
213+
case c.sendCh <- ch:
214+
case <-c.done:
215+
return 0, io.ErrClosedPipe
216+
case <-c.peer.done:
217+
return 0, io.ErrClosedPipe
218+
}
219+
return len(p), nil
220+
}
221+
222+
var errFakeTimeout = &fakeTimeoutError{}
223+
224+
type fakeTimeoutError struct{}
225+
226+
func (*fakeTimeoutError) Error() string { return "fakenet: i/o timeout" }
227+
func (*fakeTimeoutError) Timeout() bool { return true }
228+
func (*fakeTimeoutError) Temporary() bool { return true }
229+
230+
// fakeListener is a net.Listener backed by fakeConn pairs. Each dial creates
231+
// one pair; one end is returned to the dialer, the other end is enqueued for
232+
// the next Accept. delay is the per-direction one-way delay.
233+
type fakeListener struct {
234+
addr fakeAddr
235+
delay time.Duration
236+
accepts chan net.Conn
237+
closed chan struct{}
238+
once sync.Once
239+
}
240+
241+
func newFakeListener(addr string, oneWayDelay time.Duration) *fakeListener {
242+
return &fakeListener{
243+
addr: fakeAddr(addr),
244+
delay: oneWayDelay,
245+
accepts: make(chan net.Conn, 16),
246+
closed: make(chan struct{}),
247+
}
248+
}
249+
250+
// dial creates a new conn pair and hands the server side to the next Accept.
251+
// It returns the client side.
252+
func (l *fakeListener) dial() (net.Conn, error) {
253+
select {
254+
case <-l.closed:
255+
return nil, errors.New("fakenet: listener closed")
256+
default:
257+
}
258+
client, server := newFakePipe(l.delay, "client-"+string(l.addr), string(l.addr))
259+
select {
260+
case l.accepts <- server:
261+
return client, nil
262+
case <-l.closed:
263+
client.Close()
264+
server.Close()
265+
return nil, errors.New("fakenet: listener closed")
266+
}
267+
}
268+
269+
func (l *fakeListener) Accept() (net.Conn, error) {
270+
select {
271+
case c := <-l.accepts:
272+
return c, nil
273+
case <-l.closed:
274+
return nil, errors.New("fakenet: listener closed")
275+
}
276+
}
277+
278+
func (l *fakeListener) Close() error {
279+
l.once.Do(func() { close(l.closed) })
280+
return nil
281+
}
282+
283+
func (l *fakeListener) Addr() net.Addr { return l.addr }
284+
285+
// singleServerSelector is a ServerSelector that always picks the same addr.
286+
type singleServerSelector struct{ addr net.Addr }
287+
288+
func (s singleServerSelector) PickServer(key string) (net.Addr, error) { return s.addr, nil }
289+
func (s singleServerSelector) Each(f func(net.Addr) error) error { return f(s.addr) }
290+
291+
// Compile-time assertions.
292+
var (
293+
_ net.Conn = (*fakeConn)(nil)
294+
_ net.Addr = fakeAddr("")
295+
_ error = (*fakeTimeoutError)(nil)
296+
_ net.Listener = (*fakeListener)(nil)
297+
_ ServerSelector = singleServerSelector{}
298+
)

0 commit comments

Comments
 (0)