Skip to content

Commit 3b2a840

Browse files
committed
Back to using a regular channel for iter with an ugly hack to support breaking early
Added Swap and CompareAndSwap
1 parent 31a4dff commit 3b2a840

4 files changed

Lines changed: 50 additions & 205 deletions

File tree

cmap.go

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ const DefaultShardCount = 1 << 4 // 16
1919
// ForeachFunc is a function that gets passed to Foreach, returns true to break early
2020
type ForEachFunc func(key string, val interface{}) (BreakEarly bool)
2121

22+
type CompareAndSwapFunc func(a, b interface{}) bool
23+
2224
type lockedMap struct {
2325
m map[string]interface{}
2426
l sync.RWMutex
@@ -30,6 +32,25 @@ func (ms *lockedMap) Set(key string, v interface{}) {
3032
ms.l.Unlock()
3133
}
3234

35+
func (ms *lockedMap) Swap(key string, v interface{}) interface{} {
36+
ms.l.Lock()
37+
ov := ms.m[key]
38+
ms.m[key] = v
39+
ms.l.Unlock()
40+
return ov
41+
}
42+
43+
func (ms *lockedMap) CompareAndSwap(key string, v interface{}, casFn CompareAndSwapFunc) bool {
44+
ms.l.Lock()
45+
ov := ms.m[key]
46+
ok := casFn(v, ov)
47+
if ok {
48+
ms.m[key] = v
49+
}
50+
ms.l.Unlock()
51+
return ok
52+
}
53+
3354
func (ms *lockedMap) Get(key string) interface{} {
3455
ms.l.RLock()
3556
v := ms.m[key]
@@ -80,7 +101,7 @@ func (ms *lockedMap) iter(ch KeyValueChan, wg *sync.WaitGroup) {
80101
ms.l.RLock()
81102
for k, v := range ms.m {
82103
kv.Key, kv.Value = k, v
83-
if !ch.v.Send(&kv, true) {
104+
if !ch.send(&kv) {
84105
break
85106
}
86107
}
@@ -160,12 +181,18 @@ func (cm CMap) shard(key string) *lockedMap {
160181
return &cm.shards[h&cm.l]
161182
}
162183

163-
func (cm CMap) Set(key string, val interface{}) { cm.shard(key).Set(key, val) }
184+
func (cm CMap) Set(key string, val interface{}) { cm.shard(key).Set(key, val) }
185+
164186
func (cm CMap) Get(key string) interface{} { return cm.shard(key).Get(key) }
165187
func (cm CMap) Has(key string) bool { return cm.shard(key).Has(key) }
166188
func (cm CMap) Delete(key string) { cm.shard(key).Delete(key) }
167189
func (cm CMap) DeleteAndGet(key string) interface{} { return cm.shard(key).DeleteAndGet(key) }
168190

191+
func (cm CMap) Swap(key string, val interface{}) interface{} { return cm.shard(key).Swap(key, val) }
192+
func (cm CMap) CompareAndSwap(key string, val interface{}, eqFn CompareAndSwapFunc) bool {
193+
return cm.shard(key).CompareAndSwap(key, val, eqFn)
194+
}
195+
169196
func (cm CMap) Foreach(fn ForEachFunc) {
170197
for i := range cm.shards {
171198
cm.shards[i].ForEach(fn)
@@ -209,7 +236,7 @@ func (cm CMap) Iter() KeyValueChan { return cm.IterBuffered(1) }
209236
// note that calling breakLoop() will show as a race on the race detector but it's more or less a "safe" race,
210237
// and it is the only clean way to break out of a channel.
211238
func (cm CMap) IterBuffered(sz int) KeyValueChan {
212-
ch := KeyValueChan{newSizeKeyValueChan(sz)}
239+
ch := make(KeyValueChan, sz)
213240
go func() {
214241
var wg sync.WaitGroup
215242
wg.Add(len(cm.shards))
@@ -246,24 +273,3 @@ func (cm CMap) MarshalJSON() ([]byte, error) {
246273
buf.WriteByte('}')
247274
return buf.Bytes(), nil
248275
}
249-
250-
type KeyValueChan struct {
251-
v Chan
252-
}
253-
254-
func (ch KeyValueChan) Recv() *KeyValue {
255-
v, ok := ch.v.Recv(true)
256-
if !ok {
257-
return nil
258-
}
259-
return v
260-
}
261-
262-
func (ch KeyValueChan) Break() {
263-
ch.v.Close()
264-
for {
265-
if _, ok := ch.v.Recv(true); !ok {
266-
return
267-
}
268-
}
269-
}

cmap_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestIter(t *testing.T) {
3535
}
3636
ch := cm.IterBuffered(20)
3737
cnt := 0
38-
for v := ch.Recv(); v != nil; v = ch.Recv() {
38+
for range ch {
3939
cnt++
4040
ch.Break()
4141
}

keyvalue_lfchan.go

Lines changed: 0 additions & 180 deletions
This file was deleted.

utils.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,22 @@ func FNV32aString(s string) uint32 {
2626
}
2727
return h
2828
}
29+
30+
type KeyValueChan chan *KeyValue
31+
32+
func (ch KeyValueChan) send(v *KeyValue) (ok bool) {
33+
defer func() {
34+
if recover() != nil {
35+
ok = false
36+
}
37+
}()
38+
ch <- v
39+
return true
40+
}
41+
42+
func (ch KeyValueChan) Break() {
43+
defer func() { recover() }()
44+
close(ch)
45+
for range ch {
46+
}
47+
}

0 commit comments

Comments
 (0)