-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdist_keys.go
More file actions
281 lines (233 loc) · 7.99 KB
/
Copy pathdist_keys.go
File metadata and controls
281 lines (233 loc) · 7.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
package backend
import (
"context"
"log/slog"
"path"
"slices"
"strings"
"sync"
"github.com/hyp3rd/ewrap"
"golang.org/x/sync/errgroup"
"github.com/hyp3rd/hypercache/internal/cluster"
)
// keyGlobMetacharacters is the set of bytes that, when present in a
// pattern, switch the matcher from prefix-mode to glob-mode. Chosen
// to match path.Match's syntax: '*' (any sequence), '?' (single
// character), '[' (character class).
const keyGlobMetacharacters = "*?["
// localKeysInitialCap is the starting capacity for the per-node
// matching-keys slice. Picked to cover the common operator-debug
// shape (filter hits a few dozen keys per shard) while avoiding a
// large up-front allocation when the matcher rejects most of the
// shard.
const localKeysInitialCap = 64
// defaultListKeysMax is the fallback cap applied when neither the
// caller nor the WithDistListKeysMax option specifies one. Picked
// to comfortably cover the typical operator-debug "list everything
// matching this prefix" workload (5-node × tens of thousands of
// keys per node) while still bounding worst-case memory.
const defaultListKeysMax = 10000
// buildKeyMatcher returns a predicate that decides whether a given
// key matches `pattern`. Three modes:
//
// - empty pattern: matches every key (no filter).
// - pattern contains glob metacharacters: full glob match via
// path.Match. Validation is done up-front by feeding a probe
// through path.Match; an invalid pattern (e.g. unmatched `[`)
// surfaces as ErrBadPattern to the caller instead of every
// subsequent match failing silently.
// - otherwise: treated as a literal prefix via strings.HasPrefix.
//
// Glob syntax is matched with `path.Match`, not `filepath.Match`:
// arbitrary string keys don't have OS-specific path-separator
// semantics, so the platform-agnostic version is what we want.
func buildKeyMatcher(pattern string) (func(string) bool, error) {
if pattern == "" {
return func(string) bool { return true }, nil
}
if !strings.ContainsAny(pattern, keyGlobMetacharacters) {
return func(k string) bool { return strings.HasPrefix(k, pattern) }, nil
}
// Validate the glob up-front. path.Match returns
// path.ErrBadPattern for malformed inputs and a value-dependent
// error otherwise — running it against a fixed sentinel surfaces
// the structural error without taking a position on whether
// real keys would actually match.
_, err := path.Match(pattern, "")
if err != nil {
return nil, ewrap.Wrap(err, "list-keys: invalid glob pattern")
}
return func(k string) bool {
ok, mErr := path.Match(pattern, k)
// Pattern was already validated above; runtime mismatch
// errors here would be impossible. Treat as no-match to
// stay best-effort.
return mErr == nil && ok
}, nil
}
// ListKeysResult bundles a cluster-wide key enumeration with the
// best-effort accounting the caller needs to communicate partial
// results to the operator. `Keys` is sorted and deduplicated across
// owners. `Truncated` is true when the merged set hit `max` and we
// stopped pulling further pages. `PartialNodes` lists peers whose
// fan-out call failed — their keys may be missing from `Keys`.
type ListKeysResult struct {
Keys []string
Truncated bool
PartialNodes []string
}
// listKeysAccumulator is the cross-goroutine merge state for a
// single ListKeys fan-out call. Held by reference so each peer
// goroutine can lock the mutex and contribute its slice without
// closing over a stack-local map.
type listKeysAccumulator struct {
mu sync.Mutex
dedup map[string]struct{}
maxKeys int
partial []string
truncated bool
}
// tryAdd merges `keys` into the dedup set under the lock. Stops at
// maxKeys and flips truncated. Idempotent — a key already in the
// set doesn't change accounting.
func (a *listKeysAccumulator) tryAdd(keys []string) {
a.mu.Lock()
defer a.mu.Unlock()
for _, k := range keys {
if len(a.dedup) >= a.maxKeys {
a.truncated = true
return
}
a.dedup[k] = struct{}{}
}
}
// markPartial records a peer whose fan-out call failed. The peer's
// keys may be missing from the merged result.
func (a *listKeysAccumulator) markPartial(peer cluster.NodeID) {
a.mu.Lock()
defer a.mu.Unlock()
a.partial = append(a.partial, string(peer))
}
// result projects the accumulator into the public ListKeysResult.
// Sorts deterministically so paging across requests is stable.
func (a *listKeysAccumulator) result() ListKeysResult {
keys := make([]string, 0, len(a.dedup))
for k := range a.dedup {
keys = append(keys, k)
}
slices.Sort(keys)
return ListKeysResult{
Keys: keys,
Truncated: a.truncated,
PartialNodes: a.partial,
}
}
// ListKeys enumerates keys across every alive peer (including this
// node), deduplicates by string identity, sorts, and returns up to
// `max` results. `pattern` follows the same prefix/glob rules as
// `buildKeyMatcher`. A `max` of 0 falls back to defaultListKeysMax;
// the hard ceiling lives in the v1 handler that calls this method.
//
// Per-peer failures (transport error, unreachable, etc.) don't
// fail the whole call — best-effort matches the read-repair and
// hint-replay contracts elsewhere in the cluster. Failed peers are
// returned in PartialNodes so the caller can surface a banner to
// the operator.
func (dm *DistMemory) ListKeys(ctx context.Context, pattern string, maxResults int) (ListKeysResult, error) {
if maxResults <= 0 {
maxResults = defaultListKeysMax
}
matcher, err := buildKeyMatcher(pattern)
if err != nil {
return ListKeysResult{}, err
}
acc := &listKeysAccumulator{
dedup: make(map[string]struct{}, maxResults),
maxKeys: maxResults,
}
peers := dm.alivePeerIDs()
transport := dm.loadTransport()
g, gctx := errgroup.WithContext(ctx)
for _, peer := range peers {
g.Go(func() error {
dm.collectPeerKeys(gctx, peer, pattern, matcher, transport, acc)
return nil
})
}
_ = g.Wait() // every goroutine swallows its error; Wait can't fail.
return acc.result(), nil
}
// collectPeerKeys fetches one peer's matching keys and merges them
// into `acc`. Three branches:
// - self: walk local shards directly (no transport hop).
// - transport torn down: mark partial.
// - peer hop: best-effort fetch; failure → mark partial + log.
func (dm *DistMemory) collectPeerKeys(
ctx context.Context,
peer cluster.NodeID,
pattern string,
matcher func(string) bool,
transport DistTransport,
acc *listKeysAccumulator,
) {
if peer == dm.localNode.ID {
acc.tryAdd(dm.localMatchingKeys(matcher))
return
}
if transport == nil { // cluster torn down mid-call
acc.markPartial(peer)
return
}
keys, err := transport.ListKeys(ctx, string(peer), pattern)
if err != nil {
if dm.logger != nil {
dm.logger.Debug(
"list-keys: peer fan-out failed",
slog.String("peer", string(peer)),
slog.Any("err", err),
)
}
acc.markPartial(peer)
return
}
acc.tryAdd(keys)
}
// alivePeerIDs returns the membership snapshot's alive nodes
// (including self). Suspect/Dead nodes are excluded — their key
// sets aren't fresh enough to be worth the per-request HTTP cost.
// Falls back to a self-only list when membership isn't wired up
// (single-process / test scenarios).
func (dm *DistMemory) alivePeerIDs() []cluster.NodeID {
if dm.membership == nil {
if dm.localNode != nil {
return []cluster.NodeID{dm.localNode.ID}
}
return nil
}
nodes := dm.membership.List()
out := make([]cluster.NodeID, 0, len(nodes))
for _, n := range nodes {
if n == nil || n.State != cluster.NodeAlive {
continue
}
out = append(out, n.ID)
}
return out
}
// localMatchingKeys walks the local shards once, returning keys that
// satisfy the matcher. Used by ListKeys for the self-peer slice so
// we don't pay an HTTP roundtrip to talk to ourselves.
func (dm *DistMemory) localMatchingKeys(matcher func(string) bool) []string {
out := make([]string, 0, localKeysInitialCap)
for _, sh := range dm.shards {
if sh == nil {
continue
}
for k := range sh.items.All() {
if matcher(k) {
out = append(out, k)
}
}
}
return out
}