Skip to content

Commit d10bc12

Browse files
committed
Exponential backoff and dial retrying. Use more efficient pooling
1 parent 24af94b commit d10bc12

1 file changed

Lines changed: 59 additions & 47 deletions

File tree

memcache/memcache.go

Lines changed: 59 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ import (
2424
"errors"
2525
"fmt"
2626
"io"
27+
"math"
2728
"net"
29+
30+
"gopkg.in/fatih/pool.v2"
31+
2832
"strconv"
2933
"strings"
3034
"sync"
@@ -70,6 +74,9 @@ const (
7074
// DefaultMaxIdleConns is the default maximum number of idle connections
7175
// kept for any single address.
7276
DefaultMaxIdleConns = 2
77+
78+
// maximum number of times to attempt to reconnect
79+
maxRetries = 10
7380
)
7481

7582
const buffered = 8 // arbitrary buffered channel size, for readability
@@ -126,7 +133,7 @@ func New(server ...string) *Client {
126133

127134
// NewFromSelector returns a new Client using the provided ServerSelector.
128135
func NewFromSelector(ss ServerSelector) *Client {
129-
return &Client{selector: ss}
136+
return &Client{selector: ss, pools: make(map[string]pool.Pool)}
130137
}
131138

132139
// Client is a memcache client.
@@ -154,8 +161,8 @@ type Client struct {
154161

155162
selector ServerSelector
156163

157-
lk sync.Mutex
158-
freeconn map[string][]*conn
164+
lk sync.Mutex
165+
pools map[string]pool.Pool
159166
}
160167

161168
// Item is an item to be got or stored in a memcached server.
@@ -184,7 +191,7 @@ type Item struct {
184191

185192
// conn is a connection to a server.
186193
type conn struct {
187-
nc net.Conn
194+
nc *pool.PoolConn
188195
rw *bufio.ReadWriter
189196
addr net.Addr
190197
c *Client
@@ -211,33 +218,25 @@ func (cn *conn) condRelease(err *error) {
211218
}
212219
}
213220

214-
func (c *Client) putFreeConn(addr net.Addr, cn *conn) {
215-
c.lk.Lock()
216-
defer c.lk.Unlock()
217-
if c.freeconn == nil {
218-
c.freeconn = make(map[string][]*conn)
219-
}
220-
freelist := c.freeconn[addr.String()]
221-
if len(freelist) >= c.maxIdleConns() {
222-
cn.nc.Close()
223-
return
221+
func (c *Client) withAddrPool(addr net.Addr, fn func(pool.Pool)) {
222+
pl := c.pools[addr.String()]
223+
if pl == nil {
224+
c.lk.Lock()
225+
// See if pool is still nil
226+
if c.pools[addr.String()] == nil {
227+
pl, _ = pool.NewChannelPool(1, 10, func() (net.Conn, error) {
228+
return c.dial(addr)
229+
})
230+
c.pools[addr.String()] = pl
231+
}
232+
c.lk.Unlock()
233+
pl = c.pools[addr.String()]
224234
}
225-
c.freeconn[addr.String()] = append(freelist, cn)
235+
fn(pl)
226236
}
227237

228-
func (c *Client) getFreeConn(addr net.Addr) (cn *conn, ok bool) {
229-
c.lk.Lock()
230-
defer c.lk.Unlock()
231-
if c.freeconn == nil {
232-
return nil, false
233-
}
234-
freelist, ok := c.freeconn[addr.String()]
235-
if !ok || len(freelist) == 0 {
236-
return nil, false
237-
}
238-
cn = freelist[len(freelist)-1]
239-
c.freeconn[addr.String()] = freelist[:len(freelist)-1]
240-
return cn, true
238+
func (c *Client) putFreeConn(addr net.Addr, cn *conn) {
239+
cn.nc.Close()
241240
}
242241

243242
func (c *Client) netTimeout() time.Duration {
@@ -289,18 +288,16 @@ func (c *Client) dial(addr net.Addr) (net.Conn, error) {
289288
return nil, err
290289
}
291290

292-
func (c *Client) getConn(addr net.Addr) (*conn, error) {
293-
cn, ok := c.getFreeConn(addr)
294-
if ok {
295-
cn.extendDeadline()
296-
return cn, nil
297-
}
298-
nc, err := c.dial(addr)
291+
func (c *Client) getConn(addr net.Addr) (cn *conn, err error) {
292+
var nc net.Conn
293+
c.withAddrPool(addr, func(pl pool.Pool) {
294+
nc, err = pl.Get()
295+
})
299296
if err != nil {
300-
return nil, err
297+
return
301298
}
302299
cn = &conn{
303-
nc: nc,
300+
nc: nc.(*pool.PoolConn),
304301
addr: addr,
305302
rw: bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)),
306303
c: c,
@@ -369,7 +366,25 @@ func (c *Client) withAddrRw(addr net.Addr, fn func(*bufio.ReadWriter) error) (er
369366
return err
370367
}
371368
defer cn.condRelease(&err)
372-
return fn(cn.rw)
369+
// Exponential backoff (based on Ethernet)
370+
retries := 0
371+
err = fn(cn.rw)
372+
if err == io.EOF { // Bad connection
373+
cn.nc.Close()
374+
for err != nil && retries < maxRetries {
375+
retries++
376+
backoffCoefficient := int(math.Pow(float64(2), float64(retries))) - 1
377+
sleepFor := time.Nanosecond * time.Duration(50000*backoffCoefficient)
378+
time.Sleep(sleepFor)
379+
cn, err = c.getConn(addr)
380+
if err != nil {
381+
continue
382+
}
383+
err = fn(cn.rw)
384+
}
385+
}
386+
387+
return
373388
}
374389

375390
func (c *Client) withKeyRw(key string, fn func(*bufio.ReadWriter) error) error {
@@ -756,21 +771,18 @@ func (c *Client) incrDecr(verb, key string, delta uint64) (uint64, error) {
756771

757772
// Close closes any open connections.
758773
//
759-
// It returns the first error encountered closing connections, but always
760-
// closes all connections.
761-
//
762774
// After Close, the Client may still be used.
763775
func (c *Client) Close() error {
764776
c.lk.Lock()
765777
defer c.lk.Unlock()
766778
var ret error
767-
for _, conns := range c.freeconn {
768-
for _, c := range conns {
769-
if err := c.nc.Close(); err != nil && ret == nil {
770-
ret = err
771-
}
779+
780+
for addr, connPool := range c.pools {
781+
if connPool != nil {
782+
connPool.Close()
772783
}
784+
c.pools[addr] = nil
773785
}
774-
c.freeconn = nil
786+
775787
return ret
776788
}

0 commit comments

Comments
 (0)