-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
1668 lines (1426 loc) · 54.9 KB
/
main.go
File metadata and controls
1668 lines (1426 loc) · 54.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Command hypercache-server runs a single HyperCache node configured
// for the distributed in-memory backend (DistMemory). It exposes three
// HTTP listeners:
//
// - Client REST API on HYPERCACHE_API_ADDR (default :8080) — apps
// PUT/GET/DELETE keys here.
// - Management HTTP on HYPERCACHE_MGMT_ADDR (default :8081) — admin
// and observability endpoints (/health, /stats, /config,
// /dist/metrics, /cluster/*).
// - Dist HTTP on HYPERCACHE_DIST_ADDR (default :7946) — peer-to-peer
// replication, anti-entropy, and heartbeat.
//
// Wires graceful shutdown on SIGTERM/SIGINT: drain (so /health flips
// to 503 and writes return ErrDraining), then Stop. Configurable via
// environment variables in the 12-factor style for k8s / docker
// compatibility.
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/goccy/go-json"
fiber "github.com/gofiber/fiber/v3"
"github.com/hyp3rd/ewrap"
"github.com/hyp3rd/hypercache"
"github.com/hyp3rd/hypercache/internal/constants"
"github.com/hyp3rd/hypercache/internal/sentinel"
"github.com/hyp3rd/hypercache/pkg/backend"
cache "github.com/hyp3rd/hypercache/pkg/cache/v2"
"github.com/hyp3rd/hypercache/pkg/httpauth"
)
// Defaults applied when the corresponding env var is unset. Centralized
// here so operators see one canonical reference and so the magic-number
// linter doesn't flag repeated literals at the env-parse sites.
const (
defaultReplication = 3
defaultCapacity = 100_000
defaultVirtualNodes = 64
defaultIndirectK = 2
suspectMultiplier = 3 // suspect after = N × heartbeat interval
deadMultiplier = 6 // dead after = N × heartbeat interval
defaultHintTTL = 30 * time.Second
defaultHintReplay = 200 * time.Millisecond
defaultHeartbeat = 1 * time.Second
defaultRebalance = 250 * time.Millisecond
// Membership gossip cadence. Without an enabled gossip loop the
// cluster has no path to re-introduce a previously-removed node:
// peers' heartbeats only probe nodes already in their membership
// list, and the Health endpoint is one-way. A graceful drain →
// restart (the canonical operator workflow) leaves the restarted
// node invisible to the rest of the cluster forever. Default 1s
// matches the heartbeat cadence — gossip+heartbeat together
// disseminate membership changes within a couple of ticks.
defaultGossip = 1 * time.Second
clientAPIReadTimeout = 5 * time.Second
clientAPIWriteTimeout = 5 * time.Second
clientAPIIdleTimeout = 60 * time.Second
shutdownDeadline = 30 * time.Second
)
// envConfig is the parsed runtime configuration. Defaults reflect a
// reasonable single-node demo posture; production deployments override
// every field via environment variables.
type envConfig struct {
NodeID string
APIAddr string
MgmtAddr string
DistAddr string
Seeds []string
Replication int
Capacity int
AuthPolicy httpauth.Policy
APITLSCert string
APITLSKey string
APITLSCA string
LogLevel slog.Level
HintTTL time.Duration
HintReplay time.Duration
Heartbeat time.Duration
IndirectK int
RebalanceInt time.Duration
GossipInt time.Duration
// Phase C OIDC. When OIDCIssuer is non-empty the binary
// constructs a ServerVerify closure (see oidc.go) and
// attaches it to AuthPolicy.ServerVerify so JWTs from the
// configured IdP authenticate alongside the static-bearer
// path. All four OIDC fields are required when any one is
// set; loadConfig fails-fast on partial configuration.
OIDCIssuer string
OIDCAudience string
OIDCIdentityClaim string // "sub" or "email"; default "sub"
OIDCScopeClaim string // "scope" (space-separated string) or a custom array claim; default "scope"
}
// loadConfig pulls every knob from the environment and applies sane
// defaults. The error return covers auth-policy load failures —
// either the operator set HYPERCACHE_AUTH_CONFIG to a missing/
// malformed file or set both HYPERCACHE_AUTH_CONFIG and
// HYPERCACHE_AUTH_TOKEN. The binary exits non-zero rather than
// silently fall through to open mode (fail-closed by design;
// documented in CHANGELOG as a behavioral change vs pre-v2 where
// any token/config error mapped to permissive open mode).
//
// Other knobs use silent fallbacks to defaults — they are tunables,
// not security boundaries.
func loadConfig() (envConfig, error) {
policy, err := httpauth.LoadFromEnv()
if err != nil {
return envConfig{}, fmt.Errorf("load auth policy: %w", err)
}
cfg := envConfig{
NodeID: envOr("HYPERCACHE_NODE_ID", hostnameOrDefault()),
APIAddr: envOr("HYPERCACHE_API_ADDR", ":8080"),
MgmtAddr: envOr("HYPERCACHE_MGMT_ADDR", ":8081"),
DistAddr: envOr("HYPERCACHE_DIST_ADDR", ":7946"),
Seeds: splitCSV(os.Getenv("HYPERCACHE_SEEDS")),
Replication: envInt("HYPERCACHE_REPLICATION", defaultReplication),
Capacity: envInt("HYPERCACHE_CAPACITY", defaultCapacity),
AuthPolicy: policy,
APITLSCert: os.Getenv("HYPERCACHE_API_TLS_CERT"),
APITLSKey: os.Getenv("HYPERCACHE_API_TLS_KEY"),
APITLSCA: os.Getenv("HYPERCACHE_API_TLS_CLIENT_CA"),
LogLevel: parseLogLevel(envOr("HYPERCACHE_LOG_LEVEL", "info")),
HintTTL: envDuration("HYPERCACHE_HINT_TTL", defaultHintTTL),
HintReplay: envDuration("HYPERCACHE_HINT_REPLAY", defaultHintReplay),
Heartbeat: envDuration("HYPERCACHE_HEARTBEAT", defaultHeartbeat),
IndirectK: envInt("HYPERCACHE_INDIRECT_PROBE_K", defaultIndirectK),
RebalanceInt: envDuration("HYPERCACHE_REBALANCE_INTERVAL", defaultRebalance),
GossipInt: envDuration("HYPERCACHE_GOSSIP_INTERVAL", defaultGossip),
OIDCIssuer: os.Getenv("HYPERCACHE_OIDC_ISSUER"),
OIDCAudience: os.Getenv("HYPERCACHE_OIDC_AUDIENCE"),
OIDCIdentityClaim: envOr("HYPERCACHE_OIDC_IDENTITY_CLAIM", "sub"),
OIDCScopeClaim: envOr("HYPERCACHE_OIDC_SCOPE_CLAIM", "scope"),
}
// Phase C: partial OIDC config is fail-fast. Either configure
// nothing (OIDC disabled) or every required field. Identity
// and scope claims have defaults; issuer + audience are
// required when OIDC is enabled.
err = validateOIDCConfig(cfg)
if err != nil {
return envConfig{}, err
}
return cfg, nil
}
// errOIDCMissingAudience / errOIDCMissingIssuer are the sentinel
// errors loadConfig wraps when the OIDC env vars are partially
// configured. Static sentinels (vs ad-hoc fmt.Errorf) keep err113
// happy and let callers `errors.Is` against the specific
// misconfiguration.
var (
errOIDCMissingAudience = ewrap.New("HYPERCACHE_OIDC_ISSUER set without HYPERCACHE_OIDC_AUDIENCE")
errOIDCMissingIssuer = ewrap.New("HYPERCACHE_OIDC_AUDIENCE set without HYPERCACHE_OIDC_ISSUER")
)
// validateOIDCConfig returns nil when OIDC is either fully
// configured or fully disabled. Returns a wrapped sentinel when
// only one of (issuer, audience) is set — partial config is the
// most common operator misconfiguration and we want it to surface
// at boot rather than as silent 401s on JWT-bearing requests.
func validateOIDCConfig(cfg envConfig) error {
if cfg.OIDCIssuer != "" && cfg.OIDCAudience == "" {
return fmt.Errorf("%w: both are required when OIDC is enabled", errOIDCMissingAudience)
}
if cfg.OIDCAudience != "" && cfg.OIDCIssuer == "" {
return fmt.Errorf("%w: both are required when OIDC is enabled", errOIDCMissingIssuer)
}
return nil
}
// attachOIDCVerifier builds the OIDC verifier (when configured)
// and attaches it to cfg.AuthPolicy.ServerVerify BEFORE
// buildHyperCache and registerClientRoutes capture the policy by
// value. The discovery RPC against the IdP happens here
// (blocking) so a misconfigured issuer URL surfaces as a
// fail-fast at startup rather than silent 401s on every
// JWT-bearing request later.
//
// Returns false when verifier construction failed (caller should
// exit non-zero); true on success or when OIDC is disabled.
//
// Extracted from run() to keep its function-length under revive's
// 75-line cap; the verifier-construction step is naturally
// self-contained (one input, one output, one side effect on
// cfg.AuthPolicy).
func attachOIDCVerifier(ctx context.Context, cfg *envConfig, logger *slog.Logger) bool {
if cfg.OIDCIssuer == "" {
return true
}
verifier, err := buildOIDCVerifier(ctx, *cfg)
if err != nil {
logger.Error("oidc verifier construction failed", slog.Any("err", err))
return false
}
cfg.AuthPolicy.ServerVerify = verifier
return true
}
// envOr returns os.Getenv(key) or fallback when unset/empty.
func envOr(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
// envInt parses an int from env, falling back when unset / invalid.
func envInt(key string, fallback int) int {
v := os.Getenv(key)
if v == "" {
return fallback
}
n, err := strconv.Atoi(v)
if err != nil {
return fallback
}
return n
}
// envDuration parses a Go time.Duration from env, falling back when
// unset / invalid.
func envDuration(key string, fallback time.Duration) time.Duration {
v := os.Getenv(key)
if v == "" {
return fallback
}
d, err := time.ParseDuration(v)
if err != nil {
return fallback
}
return d
}
// splitCSV trims spaces and splits a comma-separated string. Empty
// input returns nil so the dist seed list distinguishes "no seeds"
// from "[empty]".
func splitCSV(s string) []string {
if s == "" {
return nil
}
parts := strings.Split(s, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
t := strings.TrimSpace(p)
if t != "" {
out = append(out, t)
}
}
return out
}
// parseLogLevel maps a log-level env string to slog.Level. Unknown
// values fall back to Info; the caller can also set an explicit level
// via slog handler options.
func parseLogLevel(s string) slog.Level {
switch strings.ToLower(s) {
case "debug":
return slog.LevelDebug
case "warn", "warning":
return slog.LevelWarn
case "error":
return slog.LevelError
default:
return slog.LevelInfo
}
}
// hostnameOrDefault picks os.Hostname() or "node" as a last-resort
// node ID. Stable per-container in Docker (container id) and per-pod
// in k8s.
func hostnameOrDefault() string {
h, err := os.Hostname()
if err != nil || h == "" {
return "node"
}
return h
}
// buildHyperCache wires DistMemory + management HTTP into a HyperCache
// configured per the env config. The returned cache is started and
// owns the dist + management HTTP listeners; the caller adds the
// client API server separately and is responsible for graceful Stop.
func buildHyperCache(ctx context.Context, cfg envConfig, logger *slog.Logger) (*hypercache.HyperCache[backend.DistMemory], error) {
hcCfg, err := hypercache.NewConfig[backend.DistMemory](constants.DistMemoryBackend)
if err != nil {
return nil, fmt.Errorf("build hypercache config: %w", err)
}
hcCfg.DistMemoryOptions = []backend.DistMemoryOption{
backend.WithDistNode(cfg.NodeID, cfg.DistAddr),
backend.WithDistSeeds(cfg.Seeds),
backend.WithDistReplication(cfg.Replication),
backend.WithDistVirtualNodes(defaultVirtualNodes),
backend.WithDistReadConsistency(backend.ConsistencyOne),
backend.WithDistWriteConsistency(backend.ConsistencyQuorum),
backend.WithDistHeartbeat(cfg.Heartbeat, suspectMultiplier*cfg.Heartbeat, deadMultiplier*cfg.Heartbeat),
backend.WithDistIndirectProbes(cfg.IndirectK, cfg.Heartbeat/2),
backend.WithDistGossipInterval(cfg.GossipInt),
backend.WithDistHintTTL(cfg.HintTTL),
backend.WithDistHintReplayInterval(cfg.HintReplay),
backend.WithDistRebalanceInterval(cfg.RebalanceInt),
backend.WithDistLogger(logger),
}
// Dist transport auth is intentionally separate from the
// client API's multi-token policy: the cluster is one trust
// domain (every node holds the same peer token), so reading
// HYPERCACHE_AUTH_TOKEN directly here keeps the dist symmetry
// invariant when operators set HYPERCACHE_AUTH_CONFIG for the
// client API but still want peer auth on the wire.
if peerToken := os.Getenv(httpauth.EnvAuthToken); peerToken != "" {
hcCfg.DistMemoryOptions = append(
hcCfg.DistMemoryOptions,
backend.WithDistHTTPAuth(backend.DistHTTPAuth{Token: peerToken}),
)
}
// Phase C2: light up scope enforcement on the management port.
// /health stays public (k8s liveness probes carry no creds).
// Read-or-better is required for the observability surface
// (/stats, /config, /dist/*, /cluster/*); admin scope is
// required for the cluster-mutating control routes (/evict,
// /clear, /trigger-expiration). Closes a long-standing gap
// where the mgmt port was fully unauthenticated server-side
// while the monitor's proxy carried the only check.
//
// Closure captures cfg.AuthPolicy by value — Policy is value-
// semantic and safe for concurrent use after construction;
// see pkg/httpauth/policy.go.
policy := cfg.AuthPolicy
mgmtReadAuth := func(fiberCtx fiber.Ctx) error {
return policy.Verify(fiberCtx, httpauth.ScopeRead)
}
mgmtAdminAuth := func(fiberCtx fiber.Ctx) error {
return policy.Verify(fiberCtx, httpauth.ScopeAdmin)
}
hcCfg.HyperCacheOptions = append(
hcCfg.HyperCacheOptions,
hypercache.WithManagementHTTP[backend.DistMemory](
cfg.MgmtAddr,
hypercache.WithMgmtAuth(mgmtReadAuth),
hypercache.WithMgmtControlAuth(mgmtAdminAuth),
),
// Surfaces eviction/expiration loop start, per-tick activity,
// and the cluster-join startup summary in the binary's JSON
// log stream. Without this the HyperCache wrapper runs silent.
hypercache.WithLogger[backend.DistMemory](logger),
)
hc, err := hypercache.New(ctx, hypercache.GetDefaultManager(), hcCfg)
if err != nil {
return nil, fmt.Errorf("construct hypercache: %w", err)
}
return hc, nil
}
// nodeContext bundles the per-server values handlers need so they can
// surface routing information (this node's ID, the ring's owners for
// a key) in their responses without re-deriving from the raw fiber
// context every call.
type nodeContext struct {
hc *hypercache.HyperCache[backend.DistMemory]
nodeID string
}
// errorResponse is the canonical JSON error shape for the client API.
// Every 4xx / 5xx response carries this payload — operators can grep
// `code` to classify failures without parsing free-text messages.
type errorResponse struct {
Error string `json:"error"`
Code string `json:"code"`
}
// API error codes — kept as string constants for stable identity in
// machine-readable consumers (alerting rules, client SDKs).
const (
codeBadRequest = "BAD_REQUEST"
codeNotFound = "NOT_FOUND"
codeDraining = "DRAINING"
codeInternal = "INTERNAL"
)
// TLS-config sentinel errors returned by buildAPITLSConfig. Wrapped
// via fmt.Errorf at construction time so callers see the field
// name + path; matched via errors.Is for control-flow.
var (
errAPITLSPartial = ewrap.New("HYPERCACHE_API_TLS_CERT and HYPERCACHE_API_TLS_KEY must both be set")
errAPITLSNoPEMInCA = ewrap.New("HYPERCACHE_API_TLS_CLIENT_CA: no PEM certificates parsed from file")
)
// registerClientRoutes wires every client-API route onto the
// provided fiber app. Extracted from runClientAPI so tests
// (handlers_test.go, auth_test.go, openapi_test.go) drive the same
// wiring without spinning up a real listener — and so the drift
// test can introspect routes from the *exact* production
// registration rather than a hand-maintained mirror.
//
// Routes are scope-tagged: read endpoints (GET/HEAD/owners-lookup,
// batch-get) require ScopeRead; mutating endpoints (PUT/DELETE,
// batch-put/delete) require ScopeWrite. /healthz and
// /v1/openapi.yaml are deliberately scope-less so liveness probes
// and spec-discovery work without credentials.
//
// When the policy is unconfigured (zero Policy with AllowAnonymous
// false), every protected route 401s — fail-closed by design. The
// hypercache-server binary's loadConfig handles the legacy
// "neither env var set" path by flipping AllowAnonymous on with a
// startup warning, so the zero-config dev posture still works.
func registerClientRoutes(app *fiber.App, policy httpauth.Policy, nodeCtx *nodeContext) {
read := policy.Middleware(httpauth.ScopeRead)
write := policy.Middleware(httpauth.ScopeWrite)
app.Get("/healthz", func(c fiber.Ctx) error { return c.SendString("ok") })
// Self-describing — clients can discover the API surface
// without out-of-band docs. The spec is embedded at build
// time from cmd/hypercache-server/openapi.yaml so it stays
// in lockstep with whatever the binary was built against.
app.Get("/v1/openapi.yaml", func(c fiber.Ctx) error {
c.Set(fiber.HeaderContentType, "application/yaml")
return c.Send(openapiSpec)
})
// /v1/cache/keys must be registered BEFORE the parameterized
// /v1/cache/:key — Fiber matches in registration order and the
// literal-path route would otherwise be shadowed by the
// param-bound handler (handleGet would be invoked with
// `key="keys"` and return 404).
app.Get("/v1/cache/keys", read, func(c fiber.Ctx) error { return handleListKeys(c, nodeCtx) })
app.Put("/v1/cache/:key", write, func(c fiber.Ctx) error { return handlePut(c, nodeCtx) })
app.Get("/v1/cache/:key", read, func(c fiber.Ctx) error { return handleGet(c, nodeCtx) })
app.Head("/v1/cache/:key", read, func(c fiber.Ctx) error { return handleHead(c, nodeCtx) })
app.Delete("/v1/cache/:key", write, func(c fiber.Ctx) error { return handleDelete(c, nodeCtx) })
app.Get("/v1/owners/:key", read, func(c fiber.Ctx) error { return handleOwners(c, nodeCtx) })
app.Get("/v1/me", read, handleMe)
app.Get("/v1/me/can", read, handleCan)
app.Post("/v1/cache/batch/get", read, func(c fiber.Ctx) error { return handleBatchGet(c, nodeCtx) })
app.Post("/v1/cache/batch/put", write, func(c fiber.Ctx) error { return handleBatchPut(c, nodeCtx) })
app.Post("/v1/cache/batch/delete", write, func(c fiber.Ctx) error { return handleBatchDelete(c, nodeCtx) })
}
// runClientAPI builds and starts the client REST API. Returns the
// fiber app so main can shut it down on signal. The provided
// httpauth.Policy gates every protected route — see
// registerClientRoutes for the per-route scope mapping.
//
// TLS posture (controlled via cfg, mirroring dist's pattern):
//
// - cfg.APITLSCert + cfg.APITLSKey both set → standard TLS.
// - Adding cfg.APITLSCA → mTLS with RequireAndVerifyClientCert.
// The verified peer cert's Subject CN is what
// httpauth.Policy.CertIdentities matches against.
// - Either field empty → plaintext on cfg.APIAddr (preserves
// today's default behavior; dev mode and ingress-terminated TLS
// setups keep working).
//
// Listener-construction errors fail fast (the goroutine wouldn't
// have surfaced them anyway via app.Listener); operators see the
// failure at startup rather than silently bound on the wrong
// protocol.
func runClientAPI(
cfg envConfig,
hc *hypercache.HyperCache[backend.DistMemory],
logger *slog.Logger,
) (*fiber.App, error) {
app := fiber.New(fiber.Config{
AppName: "hypercache-server",
ReadTimeout: clientAPIReadTimeout,
WriteTimeout: clientAPIWriteTimeout,
IdleTimeout: clientAPIIdleTimeout,
})
registerClientRoutes(app, cfg.AuthPolicy, &nodeContext{hc: hc, nodeID: cfg.NodeID})
tlsCfg, err := buildAPITLSConfig(cfg)
if err != nil {
return nil, fmt.Errorf("build client API TLS config: %w", err)
}
if tlsCfg == nil {
go runPlaintextListener(app, cfg.APIAddr, logger)
return app, nil
}
ln, err := tls.Listen("tcp", cfg.APIAddr, tlsCfg)
if err != nil {
return nil, fmt.Errorf("client API tls listen %s: %w", cfg.APIAddr, err)
}
go runWrappedListener(app, ln, logger)
return app, nil
}
// runPlaintextListener serves on the bare addr — the standard
// non-TLS path that shipped pre-v2.
func runPlaintextListener(app *fiber.App, addr string, logger *slog.Logger) {
err := app.Listen(addr)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error("client API listener exited", slog.Any("err", err))
}
}
// runWrappedListener serves on a pre-built net.Listener (used by
// the TLS path so the tls.Config — including ClientAuth and
// ClientCAs — is fully under our control).
func runWrappedListener(app *fiber.App, ln net.Listener, logger *slog.Logger) {
err := app.Listener(ln)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error("client API listener exited", slog.Any("err", err))
}
}
// buildAPITLSConfig assembles the *tls.Config the API listener
// should use, or nil for the plaintext default. CERT+KEY are the
// minimum for TLS; adding CA upgrades to mTLS with
// RequireAndVerifyClientCert (the only mode that gives the auth
// middleware a verified peer cert to map to a CertIdentity).
//
// Returns an error when CERT or KEY is set but the other is missing
// — that shape is operator-misconfiguration, not "TLS off." Don't
// silently fall through to plaintext when the operator clearly
// asked for TLS but typo'd one of the paths.
func buildAPITLSConfig(cfg envConfig) (*tls.Config, error) {
if cfg.APITLSCert == "" && cfg.APITLSKey == "" {
return nil, nil //nolint:nilnil // documented "no TLS" sentinel
}
if cfg.APITLSCert == "" || cfg.APITLSKey == "" {
return nil, errAPITLSPartial
}
cert, err := tls.LoadX509KeyPair(cfg.APITLSCert, cfg.APITLSKey)
if err != nil {
return nil, fmt.Errorf("load TLS keypair: %w", err)
}
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
}
if cfg.APITLSCA == "" {
return tlsCfg, nil
}
caPEM, err := os.ReadFile(cfg.APITLSCA)
if err != nil {
return nil, fmt.Errorf("read client CA bundle %s: %w", cfg.APITLSCA, err)
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(caPEM) {
return nil, fmt.Errorf("%w: %s", errAPITLSNoPEMInCA, cfg.APITLSCA)
}
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert
tlsCfg.ClientCAs = pool
return tlsCfg, nil
}
// jsonErr writes the canonical errorResponse with the given status
// + code + message. Centralized so every error path emits the same
// shape regardless of which handler is failing.
func jsonErr(c fiber.Ctx, status int, code, msg string) error {
return c.Status(status).JSON(errorResponse{Error: msg, Code: code})
}
// classifyAndRespond maps a service-level error to the right HTTP
// status + code. Keeps the per-handler error-handling tight and
// guarantees that adding a new sentinel anywhere in the stack only
// needs one update site.
func classifyAndRespond(c fiber.Ctx, err error) error {
switch {
case errors.Is(err, sentinel.ErrDraining):
return jsonErr(c, fiber.StatusServiceUnavailable, codeDraining, "node is draining; redirect to a peer")
case errors.Is(err, sentinel.ErrNotOwner):
return jsonErr(c, fiber.StatusServiceUnavailable, codeInternal, "no ring owners for key (cluster initializing?)")
default:
return jsonErr(c, fiber.StatusInternalServerError, codeInternal, err.Error())
}
}
// putResponse documents the JSON shape returned on a successful PUT.
// Owners + Node let the operator immediately see where the value
// landed in the ring — invaluable when debugging cluster topology
// without having to chase /dist/owners on the management HTTP.
type putResponse struct {
Key string `json:"key"`
Stored bool `json:"stored"`
TTLMs int64 `json:"ttl_ms,omitempty"`
Bytes int `json:"bytes"`
Node string `json:"node"`
Owners []string `json:"owners"`
}
// deleteResponse mirrors putResponse for DELETE — owners are useful
// because the deletion fans out to every replica in the ring.
type deleteResponse struct {
Key string `json:"key"`
Deleted bool `json:"deleted"`
Node string `json:"node"`
Owners []string `json:"owners"`
}
// ownersResponse is the body of GET /v1/owners/:key — pure visibility
// endpoint that mirrors what the dist HTTP server reports to peers.
type ownersResponse struct {
Key string `json:"key"`
Owners []string `json:"owners"`
Node string `json:"node"`
}
// listKeysResponse is the body of GET /v1/cache/keys — operator-
// facing key browser. `NextCursor` is empty on the last page;
// `TotalMatched` is the full deduplicated matched set (capped by
// `max`). `Truncated` reports that the cluster-wide cap was hit
// and the operator should refine the pattern. `PartialNodes`
// lists peers whose fan-out failed; their keys may be missing.
type listKeysResponse struct {
Keys []string `json:"keys"`
NextCursor string `json:"next_cursor"`
TotalMatched int `json:"total_matched"`
Truncated bool `json:"truncated"`
Node string `json:"node"`
PartialNodes []string `json:"partial_nodes,omitempty"`
}
// list-keys query-parameter bounds. Defaults match the operator
// "browse / refine" workflow; the hard caps bound the worst-case
// memory and response size — operators needing a larger sweep
// script against the per-node /internal/keys path with their own
// paging instead of lifting these.
const (
listKeysDefaultLimit = 100
listKeysMaxLimit = 500
listKeysDefaultMax = 10000
listKeysHardMax = 50000
)
// handlePut implements PUT /v1/cache/:key.
// Body is the raw value (any content type). Optional ?ttl=<dur>
// applies a relative expiration; empty/absent means no expiration.
// Returns 200 with a putResponse body summarizing key, ttl, bytes
// stored, the writing node's ID, and the ring owners — the
// owners list is the operator's visibility into where the value
// actually landed across the cluster.
func handlePut(c fiber.Ctx, nodeCtx *nodeContext) error {
key := c.Params("key")
if key == "" {
return jsonErr(c, fiber.StatusBadRequest, codeBadRequest, "missing key in path")
}
ttl := time.Duration(0)
if raw := c.Query("ttl"); raw != "" {
parsed, err := time.ParseDuration(raw)
if err != nil {
return jsonErr(c, fiber.StatusBadRequest, codeBadRequest, "invalid ttl: "+err.Error())
}
ttl = parsed
}
body := c.Body()
value := make([]byte, len(body))
copy(value, body) // detach from fiber's pooled body buffer
err := nodeCtx.hc.Set(c.Context(), key, value, ttl)
if err != nil {
return classifyAndRespond(c, err)
}
return c.JSON(putResponse{
Key: key,
Stored: true,
TTLMs: ttl.Milliseconds(),
Bytes: len(value),
Node: nodeCtx.nodeID,
Owners: nodeCtx.hc.ClusterOwners(key),
})
}
// itemEnvelope is the JSON shape returned when the client asks for
// `Accept: application/json` on a single-key GET. Values are always
// emitted as base64 in the envelope so the response is binary-safe
// without the heuristic decode dance the raw-bytes path uses —
// callers that want the literal string can decode the base64
// themselves.
type itemEnvelope struct {
Key string `json:"key"`
Value string `json:"value"`
ValueEncoding string `json:"value_encoding"`
TTLMs int64 `json:"ttl_ms,omitempty"`
ExpiresAt string `json:"expires_at,omitempty"`
Version uint64 `json:"version"`
Origin string `json:"origin,omitempty"`
LastUpdated string `json:"last_updated,omitempty"`
Node string `json:"node"`
Owners []string `json:"owners"`
}
// wantsJSON reports whether the client explicitly asked for the JSON
// envelope via Accept. A bare `*/*` or absent header keeps the
// raw-bytes default — operators using `curl -X GET` with no Accept
// header continue to see the literal value, not a base64 envelope.
func wantsJSON(c fiber.Ctx) bool {
accept := c.Get(fiber.HeaderAccept)
if accept == "" {
return false
}
return strings.Contains(accept, fiber.MIMEApplicationJSON)
}
// itemValueAsBytes normalizes the cached value to its underlying
// byte representation regardless of how it round-tripped through
// the dist HTTP transport (writer-node []byte vs replica-node
// base64-string vs non-owner json.RawMessage). Reuses the same
// heuristics as writeValue so single-key and batch responses stay
// in agreement.
func itemValueAsBytes(v any) []byte {
switch x := v.(type) {
case []byte:
return x
case string:
if decoded, ok := decodeBase64Bytes(x); ok {
return decoded
}
return []byte(x)
case json.RawMessage:
var s string
err := json.Unmarshal(x, &s)
if err == nil {
if decoded, ok := decodeBase64Bytes(s); ok {
return decoded
}
return []byte(s)
}
return []byte(x)
default:
raw, err := json.Marshal(v)
if err != nil {
return nil
}
return raw
}
}
// itemRemainingTTL returns (ttl_ms, expires_at_iso) for an Item.
// Returns (0, "") when the item has no expiration. Negative
// remaining TTLs are clamped to 0 — a "currently expiring" item
// is reported as 0ms left, not as a negative number.
func itemRemainingTTL(it *cache.Item) (int64, string) {
if it.Expiration <= 0 {
return 0, ""
}
expiry := it.LastAccess.Add(it.Expiration)
remaining := max(time.Until(expiry).Milliseconds(), 0)
return remaining, expiry.UTC().Format(time.RFC3339)
}
// buildEnvelope constructs the JSON envelope for a cached item.
// Centralized so the single-key GET and the batch-get response
// emit identical shapes.
func buildEnvelope(key string, it *cache.Item, nodeCtx *nodeContext) itemEnvelope {
bytes := itemValueAsBytes(it.Value)
ttlMs, expiresAt := itemRemainingTTL(it)
env := itemEnvelope{
Key: key,
Value: base64.StdEncoding.EncodeToString(bytes),
ValueEncoding: "base64",
TTLMs: ttlMs,
ExpiresAt: expiresAt,
Version: it.Version,
Origin: it.Origin,
Node: nodeCtx.nodeID,
Owners: nodeCtx.hc.ClusterOwners(key),
}
if !it.LastUpdated.IsZero() {
env.LastUpdated = it.LastUpdated.UTC().Format(time.RFC3339)
}
return env
}
// setItemHeaders mirrors buildEnvelope onto response headers — the
// HEAD handler returns these without a body. Header names use the
// `X-Cache-*` convention; values are best-effort string forms.
func setItemHeaders(c fiber.Ctx, key string, it *cache.Item, nodeCtx *nodeContext) {
c.Set("X-Cache-Version", strconv.FormatUint(it.Version, 10))
if it.Origin != "" {
c.Set("X-Cache-Origin", it.Origin)
}
if !it.LastUpdated.IsZero() {
c.Set("X-Cache-Last-Updated", it.LastUpdated.UTC().Format(time.RFC3339))
}
ttlMs, expiresAt := itemRemainingTTL(it)
if ttlMs > 0 {
c.Set("X-Cache-TTL-Ms", strconv.FormatInt(ttlMs, 10))
c.Set("X-Cache-Expires-At", expiresAt)
}
owners := nodeCtx.hc.ClusterOwners(key)
if len(owners) > 0 {
c.Set("X-Cache-Owners", strings.Join(owners, ","))
}
c.Set("X-Cache-Node", nodeCtx.nodeID)
}
// handleGet implements GET /v1/cache/:key.
//
// Default response: raw bytes with Content-Type application/octet-stream
// (binary fidelity, current behavior).
//
// Accept: application/json: itemEnvelope JSON with TTL, version,
// owners, etc. Lets API clients fetch metadata in one round-trip
// instead of GET + HEAD.
func handleGet(c fiber.Ctx, nodeCtx *nodeContext) error {
key := c.Params("key")
if key == "" {
return jsonErr(c, fiber.StatusBadRequest, codeBadRequest, "missing key in path")
}
it, ok := nodeCtx.hc.GetWithInfo(c.Context(), key)
if !ok {
return jsonErr(c, fiber.StatusNotFound, codeNotFound, "key not found")
}
if wantsJSON(c) {
return c.JSON(buildEnvelope(key, it, nodeCtx))
}
return writeValue(c, it.Value)
}
// batchGetRequest documents the request shape for
// `POST /v1/cache/batch/get`. Empty `keys` returns an empty
// `results` array with status 200.
type batchGetRequest struct {
Keys []string `json:"keys"`
}
// batchGetResult is one entry in the batch-get response. `Found:
// false` results carry no metadata; `Found: true` results carry
// the same envelope shape as a single-key Accept:json GET.
type batchGetResult struct {
Key string `json:"key"`
Found bool `json:"found"`
Value string `json:"value,omitempty"`
ValueEncoding string `json:"value_encoding,omitempty"`
TTLMs int64 `json:"ttl_ms,omitempty"`
ExpiresAt string `json:"expires_at,omitempty"`
Version uint64 `json:"version,omitempty"`
Origin string `json:"origin,omitempty"`
LastUpdated string `json:"last_updated,omitempty"`
Owners []string `json:"owners,omitempty"`
}
// batchGetResponse is the top-level wrapper so a future caller can
// add cluster-wide stats (per-batch latency, owners-touched, etc.)
// without breaking the wire shape.
type batchGetResponse struct {
Results []batchGetResult `json:"results"`
Node string `json:"node"`
}
// batchPutItem is one entry in the batch-put request. `value` is
// either a UTF-8 string (default) or a base64-encoded byte payload
// when `value_encoding` is `"base64"` — the same convention the
// single-key Accept:json GET emits, so a batch-put can round-trip
// the result of an earlier batch-get verbatim.
type batchPutItem struct {
Key string `json:"key"`
Value string `json:"value"`
ValueEncoding string `json:"value_encoding,omitempty"`
TTLMs int64 `json:"ttl_ms,omitempty"`
}
type batchPutRequest struct {
Items []batchPutItem `json:"items"`
}
// batchPutResult is one entry in the batch-put response. On
// failure, `Stored` is false and `Error`/`Code` describe why —
// per-item granularity so a single failing item doesn't void
// the whole batch.
type batchPutResult struct {
Key string `json:"key"`
Stored bool `json:"stored"`
Bytes int `json:"bytes,omitempty"`
Owners []string `json:"owners,omitempty"`
Error string `json:"error,omitempty"`
Code string `json:"code,omitempty"`
}
type batchPutResponse struct {
Results []batchPutResult `json:"results"`
Node string `json:"node"`
}
// batchDeleteResult is one entry in the batch-delete response.
type batchDeleteResult struct {
Key string `json:"key"`
Deleted bool `json:"deleted"`
Owners []string `json:"owners,omitempty"`
Error string `json:"error,omitempty"`
Code string `json:"code,omitempty"`
}
type batchDeleteRequest struct {
Keys []string `json:"keys"`
}
type batchDeleteResponse struct {
Results []batchDeleteResult `json:"results"`
Node string `json:"node"`
}
// handleBatchGet implements POST /v1/cache/batch/get — fetches
// many keys in one round-trip with the same metadata envelope as
// the single-key Accept:json GET. Each key's lookup is
// independent: a missing key produces `{found: false}` rather
// than failing the whole batch.
func handleBatchGet(c fiber.Ctx, nodeCtx *nodeContext) error {
var req batchGetRequest
err := json.Unmarshal(c.Body(), &req)
if err != nil {
return jsonErr(c, fiber.StatusBadRequest, codeBadRequest, "invalid JSON: "+err.Error())
}
results := make([]batchGetResult, 0, len(req.Keys))
ctx := c.Context()
for _, key := range req.Keys {
if key == "" {
results = append(results, batchGetResult{Key: key, Found: false})
continue
}