-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain_admin.go
More file actions
978 lines (923 loc) · 36.8 KB
/
main_admin.go
File metadata and controls
978 lines (923 loc) · 36.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
package main
import (
"context"
"crypto/tls"
"log/slog"
"net"
"net/http"
"os"
"strings"
"time"
"github.com/bootjp/elastickv/adapter"
"github.com/bootjp/elastickv/internal/admin"
"github.com/bootjp/elastickv/internal/raftengine"
"github.com/bootjp/elastickv/keyviz"
"github.com/bootjp/elastickv/kv"
"github.com/cockroachdb/errors"
"golang.org/x/sync/errgroup"
)
// Environment variables that the admin listener consults before
// falling back to the command-line flag values. Exposing secrets via
// env vars / file paths keeps them out of /proc/<pid>/cmdline.
const (
envAdminSessionSigningKey = "ELASTICKV_ADMIN_SESSION_SIGNING_KEY"
envAdminSessionSigningKeyPrevious = "ELASTICKV_ADMIN_SESSION_SIGNING_KEY_PREVIOUS"
)
const (
adminReadHeaderTimeout = 5 * time.Second
adminWriteTimeout = 10 * time.Second
adminIdleTimeout = 30 * time.Second
adminShutdownTimeout = 5 * time.Second
// adminBuildVersion is surfaced in GET /admin/api/v1/cluster. Until
// we wire real ldflags-injected build info, a placeholder is fine.
adminBuildVersion = "dev"
)
// buildVersion returns the elastickv binary version for admin purposes.
// It is intentionally a function, not a constant, so build tooling can
// link-replace it via -ldflags in the future.
func buildVersion() string { return adminBuildVersion }
// adminListenerConfig is the subset of startup inputs that goes into the
// admin listener. Collecting them in a struct keeps the main.go call site
// compact and makes unit testing the builder easier.
type adminListenerConfig struct {
enabled bool
listen string
tlsCertFile string
tlsKeyFile string
allowPlaintextNonLoopback bool
allowInsecureDevCookie bool
sessionSigningKey string
sessionSigningKeyPrevious string
readOnlyAccessKeys []string
fullAccessKeys []string
}
// startAdminFromFlags is the single entrypoint main.run() uses to stand
// up the admin listener. It owns the flag → config translation and the
// credentials loading so run() does not inherit that complexity.
//
// keyVizFanoutConfig bundles the operator-supplied fan-out flags.
// Empty Nodes leaves the keyviz handler in single-node mode.
type keyVizFanoutConfig struct {
Nodes []string
Timeout time.Duration
}
// When admin is disabled (the default) the function returns immediately
// without touching --s3CredentialsFile: pulling the admin feature into
// a hard dependency on that file would break deployments that never
// intended to use it.
func startAdminFromFlags(
ctx context.Context,
lc *net.ListenConfig,
eg *errgroup.Group,
runtimes []*raftGroupRuntime,
dynamoServer *adapter.DynamoDBServer,
s3Server *adapter.S3Server,
sqsServer *adapter.SQSServer,
coordinate kv.Coordinator,
connCache *kv.GRPCConnCache,
keyvizSampler *keyviz.MemSampler,
keyvizFanoutCfg keyVizFanoutConfig,
) error {
if !*adminEnabled {
return nil
}
staticCreds, err := loadS3StaticCredentials(*s3CredsFile)
if err != nil {
return errors.Wrapf(err, "load static credentials for admin listener")
}
// An admin listener with zero credentials would accept logins
// only to reject every one of them with invalid_credentials, so a
// missing or empty credentials file is a wiring bug rather than a
// valid "locked down" state. Failing fast here also guards against
// the typed-nil MapCredentialStore case inside NewServer (an
// untyped `== nil` check cannot detect a nil-map-valued interface
// on its own).
if len(staticCreds) == 0 {
return errors.New("admin listener is enabled but no static credentials are configured; " +
"set -s3CredentialsFile to a file with at least one entry")
}
primaryKey, err := resolveSigningKey(*adminSessionSigningKey, *adminSessionSigningKeyFile, envAdminSessionSigningKey)
if err != nil {
return errors.Wrap(err, "resolve -adminSessionSigningKey")
}
previousKey, err := resolveSigningKey(*adminSessionSigningKeyPrevious, *adminSessionSigningKeyPreviousFile, envAdminSessionSigningKeyPrevious)
if err != nil {
return errors.Wrap(err, "resolve -adminSessionSigningKeyPrevious")
}
cfg := adminListenerConfig{
enabled: *adminEnabled,
listen: *adminListen,
tlsCertFile: *adminTLSCertFile,
tlsKeyFile: *adminTLSKeyFile,
allowPlaintextNonLoopback: *adminAllowPlaintextNonLoopback,
allowInsecureDevCookie: *adminAllowInsecureDevCookie,
sessionSigningKey: primaryKey,
sessionSigningKeyPrevious: previousKey,
readOnlyAccessKeys: parseCSV(*adminReadOnlyAccessKeys),
fullAccessKeys: parseCSV(*adminFullAccessKeys),
}
clusterSrc := newClusterInfoSource(*raftId, buildVersion(), runtimes)
tablesSrc := newDynamoTablesSource(dynamoServer)
bucketsSrc := newBucketsSource(s3Server)
queuesSrc := newSqsQueuesSource(sqsServer)
forwarder, err := buildAdminLeaderForwarder(coordinate, connCache, *raftId)
if err != nil {
return errors.Wrap(err, "build admin leader forwarder")
}
leaderProbe := newAdminLeaderProbe(coordinate)
_, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, bucketsSrc, queuesSrc, forwarder, leaderProbe, keyvizSampler, keyvizFanoutCfg, buildVersion())
return err
}
// newAdminLeaderProbe builds the LeaderProbe consumed by
// /admin/healthz/leader. It mirrors the verified-leader pattern S3 and
// DynamoDB use on their own /healthz/leader endpoints
// (adapter/s3.go:isVerifiedS3Leader,
// adapter/dynamodb.go:isVerifiedDynamoLeader): a cheap IsLeader check
// short-circuits non-leaders, and only nodes claiming leadership pay
// the ReadIndex round-trip that confirms the claim is still valid.
//
// Crucially the probe is scoped to the **default Raft group** (via
// coordinate.IsLeader / coordinate.VerifyLeader) — the same group the
// admin write paths key off (kv/sharded_coordinator.go), which the
// AdminForward proxy and the SQS admin write path
// (adapter/sqs_admin.go) both treat as authoritative. An earlier
// design returned true on any local-group leadership; in a multi-group
// deployment that could surface 200 on a node leading only a non-
// default group while admin writes there still 503'd or forwarded.
// Codex P1 on PR #689 caught this; using the coordinator keeps the
// healthz contract aligned with the actual admin-write leader.
//
// Returns nil when no coordinator is wired so the router answers
// /admin/healthz/leader with the standard JSON 404 (matches the
// "feature off" pattern Tables / Buckets / Queues already use).
func newAdminLeaderProbe(coordinate kv.Coordinator) admin.LeaderProbe {
if coordinate == nil {
return nil
}
return admin.LeaderProbeFunc(func() bool {
if !coordinate.IsLeader() {
return false
}
// VerifyLeader is the same ReadIndex round-trip lease reads
// use; under the hood it carries an engine-bounded deadline,
// so a stalled cluster surfaces 503 here on its own without
// the probe needing an outer timeout.
return coordinate.VerifyLeader() == nil
})
}
// newSqsQueuesSource adapts *adapter.SQSServer to admin.QueuesSource.
// Same architectural reasoning as newDynamoTablesSource and
// newBucketsSource: the bridge stays in this file (rather than
// internal/admin) so the admin package stays free of the heavy
// adapter-package dependency tree. Returns nil when sqsServer is nil
// so admin.NewServer leaves /admin/api/v1/sqs/* off the wire.
func newSqsQueuesSource(sqsServer *adapter.SQSServer) admin.QueuesSource {
if sqsServer == nil {
return nil
}
return &sqsQueuesBridge{server: sqsServer}
}
// sqsQueuesBridge re-shapes adapter.AdminQueueSummary into
// admin.QueueSummary. The two structs are deliberately isomorphic so
// this translation does no allocation more than necessary; if a
// future field is added on one side, the build breaks here, which
// is the drift signal we want.
type sqsQueuesBridge struct {
server *adapter.SQSServer
}
func (b *sqsQueuesBridge) AdminListQueues(ctx context.Context) ([]string, error) {
return b.server.AdminListQueues(ctx) //nolint:wrapcheck // pure pass-through; adapter owns the error context.
}
func (b *sqsQueuesBridge) AdminDescribeQueue(ctx context.Context, name string) (*admin.QueueSummary, bool, error) {
summary, exists, err := b.server.AdminDescribeQueue(ctx, name)
if err != nil {
return nil, false, translateAdminQueuesError(err)
}
if !exists {
return nil, false, nil
}
return convertAdminQueueSummary(summary), true, nil
}
func (b *sqsQueuesBridge) AdminDeleteQueue(ctx context.Context, principal admin.AuthPrincipal, name string) error {
if err := b.server.AdminDeleteQueue(ctx, convertAdminPrincipal(principal), name); err != nil {
return translateAdminQueuesError(err)
}
return nil
}
// convertAdminQueueSummary mirrors adapter.AdminQueueSummary into
// admin.QueueSummary. Counter fields are int64 on both sides; if
// either side grows a field, this function should be extended in the
// same commit so a compile error catches the drift.
//
// CreatedAt collapses a zero time.Time on the adapter side (queues
// stored before CreatedAtMillis was populated, or never populated)
// to a nil pointer on the admin side so omitempty actually drops the
// field at the wire layer. Without this collapse the SPA would see
// "0001-01-01T00:00:00Z" and render an ancient date.
func convertAdminQueueSummary(in *adapter.AdminQueueSummary) *admin.QueueSummary {
if in == nil {
return nil
}
var createdAt *time.Time
if !in.CreatedAt.IsZero() {
t := in.CreatedAt
createdAt = &t
}
return &admin.QueueSummary{
Name: in.Name,
IsFIFO: in.IsFIFO,
Generation: in.Generation,
CreatedAt: createdAt,
Attributes: in.Attributes,
Counters: admin.QueueCounters{
Visible: in.Counters.Visible,
NotVisible: in.Counters.NotVisible,
Delayed: in.Counters.Delayed,
},
}
}
// translateAdminQueuesError maps the adapter's SQS error vocabulary
// onto the admin-package sentinels the SQS handler matches against.
// Anything not recognised is forwarded as-is and answered with 500
// + a sanitised body, matching the dynamo / s3 bridges' behaviour.
func translateAdminQueuesError(err error) error {
switch {
case err == nil:
return nil
case errors.Is(err, adapter.ErrAdminForbidden):
return admin.ErrQueuesForbidden
case errors.Is(err, adapter.ErrAdminNotLeader):
return admin.ErrQueuesNotLeader
case errors.Is(err, adapter.ErrAdminSQSNotFound):
return admin.ErrQueuesNotFound
case errors.Is(err, adapter.ErrAdminSQSValidation):
return admin.ErrQueuesValidation
case isLeaderChurnError(err):
// Leadership can be lost between AdminDeleteQueue's upfront
// isVerifiedSQSLeader check and the coordinator dispatch
// inside deleteQueueWithRetry. The kv coordinator surfaces
// that as ErrLeaderNotFound / ErrNotLeader (or a wrapped
// "not leader" / "leader not found" suffix), and the
// retry loop's isRetryableTransactWriteError does not catch
// them. Without this arm the error falls to default and the
// admin handler renders a generic 500. Mirrors the same arm
// in translateAdminTablesError on the Dynamo side.
return admin.ErrQueuesNotLeader
default:
return err //nolint:wrapcheck // forwarded so the handler logs but does not surface it.
}
}
// buildAdminLeaderForwarder constructs the production LeaderForwarder
// for the dynamo HTTP handler when the wiring is complete enough to
// reach a remote leader. The bridge tolerates a nil connCache (and a
// nil coordinate) so single-node / leader-only builds — where the
// dashboard always hits a leader — can ship without paying the
// forwarder's wiring cost. tablesSrc itself can be nil for cluster-
// only builds; that's handled higher up by ServerDeps.Tables == nil.
func buildAdminLeaderForwarder(coordinate kv.Coordinator, connCache *kv.GRPCConnCache, nodeID string) (admin.LeaderForwarder, error) {
if coordinate == nil || connCache == nil {
// Returning (nil, nil) is the explicit "no forwarder" signal
// — the handler falls back to 503 + Retry-After:1 on
// ErrTablesNotLeader. The function-level doc comment above
// describes this contract; the nilnil linter is not enabled
// in .golangci.yaml so no suppression directive is needed
// (Claude review on #648).
return nil, nil
}
if nodeID == "" {
// admin.NewGRPCForwardClient enforces this too; surfacing
// it here keeps the misconfiguration message in the wiring
// layer rather than buried under a Wrap chain.
return nil, errors.New("admin forward bridge: --raftId is required")
}
return buildLeaderForwarder(coordinate, connCache, nodeID)
}
// newDynamoTablesSource adapts *adapter.DynamoDBServer to the
// admin.TablesSource interface. The bridge stays in this file (rather
// than internal/admin) so the admin package stays free of the heavy
// adapter-package dependency tree (gRPC, Raft, store).
//
// Returns nil when dynamoServer is nil; admin.NewServer handles a nil
// Tables field by leaving the dynamo paths off the wire entirely,
// which is the right behaviour for builds that ship without the
// Dynamo adapter.
func newDynamoTablesSource(dynamoServer *adapter.DynamoDBServer) admin.TablesSource {
if dynamoServer == nil {
return nil
}
return &dynamoTablesBridge{server: dynamoServer}
}
// newBucketsSource is the S3 counterpart of newDynamoTablesSource.
// Returns nil when s3Server is nil so a build that ships without the
// S3 adapter (--s3Address empty) silently disables the
// /admin/api/v1/s3/buckets routes.
func newBucketsSource(s3Server *adapter.S3Server) admin.BucketsSource {
if s3Server == nil {
return nil
}
return &bucketsBridge{server: s3Server}
}
// bucketsBridge re-shapes the adapter's AdminBucketSummary DTO into
// the admin package's BucketSummary, threading through the
// (result, present, error) tuple semantics for the describe path.
// CreatedAtHLC is formatted into the SPA's expected ISO-8601 string
// here rather than in the adapter — formatting is a UI concern, not
// a storage one.
type bucketsBridge struct {
server *adapter.S3Server
}
func (b *bucketsBridge) AdminListBuckets(ctx context.Context) ([]admin.BucketSummary, error) {
rows, err := b.server.AdminListBuckets(ctx)
if err != nil {
// Wrap with the bridge frame so an operator debugging a 500
// from /admin/api/v1/s3/buckets sees the bridge in the error
// chain (Claude Issue 5 on PR #658). cockroachdb/errors
// already preserves the adapter's stack trace; this just
// adds the call-site context.
return nil, errors.Wrap(err, "admin buckets bridge: list")
}
out := make([]admin.BucketSummary, len(rows))
for i, r := range rows {
out[i] = bucketSummaryFromAdapter(r)
}
return out, nil
}
func (b *bucketsBridge) AdminDescribeBucket(ctx context.Context, name string) (*admin.BucketSummary, bool, error) {
row, exists, err := b.server.AdminDescribeBucket(ctx, name)
if err != nil {
return nil, false, errors.Wrapf(err, "admin buckets bridge: describe %q", name)
}
if !exists || row == nil {
return nil, false, nil
}
summary := bucketSummaryFromAdapter(*row)
return &summary, true, nil
}
func (b *bucketsBridge) AdminCreateBucket(ctx context.Context, principal admin.AuthPrincipal, in admin.CreateBucketRequest) (*admin.BucketSummary, error) {
row, err := b.server.AdminCreateBucket(ctx, convertAdminPrincipal(principal), in.BucketName, in.ACL)
if err != nil {
return nil, translateAdminBucketsError(err)
}
if row == nil {
// AdminCreateBucket guarantees a non-nil summary on success;
// nil here would be an adapter regression. Surface as a typed
// error so the handler logs it as 500 rather than panicking
// on the de-reference at the call site.
return nil, errors.New("admin buckets bridge: adapter returned nil summary on create success")
}
summary := bucketSummaryFromAdapter(*row)
return &summary, nil
}
func (b *bucketsBridge) AdminPutBucketAcl(ctx context.Context, principal admin.AuthPrincipal, name, acl string) error {
if err := b.server.AdminPutBucketAcl(ctx, convertAdminPrincipal(principal), name, acl); err != nil {
return translateAdminBucketsError(err)
}
return nil
}
func (b *bucketsBridge) AdminDeleteBucket(ctx context.Context, principal admin.AuthPrincipal, name string) error {
if err := b.server.AdminDeleteBucket(ctx, convertAdminPrincipal(principal), name); err != nil {
return translateAdminBucketsError(err)
}
return nil
}
func bucketSummaryFromAdapter(in adapter.AdminBucketSummary) admin.BucketSummary {
return admin.BucketSummary{
Name: in.Name,
ACL: in.ACL,
CreatedAt: admin.FormatBucketCreatedAt(in.CreatedAtHLC),
Generation: in.Generation,
Region: in.Region,
Owner: in.Owner,
}
}
// translateAdminBucketsError maps the adapter's S3 admin error
// vocabulary onto the admin-package sentinels the HTTP handler
// matches against. Mirrors translateAdminTablesError on the Dynamo
// side: structured failures (forbidden, not-leader, validation,
// already-exists, etc.) become typed sentinels; everything else
// is forwarded as-is and answered with 500 + a sanitised body.
func translateAdminBucketsError(err error) error {
switch {
case err == nil:
return nil
case errors.Is(err, adapter.ErrAdminForbidden):
return admin.ErrBucketsForbidden
case errors.Is(err, adapter.ErrAdminNotLeader):
return admin.ErrBucketsNotLeader
case errors.Is(err, adapter.ErrAdminBucketAlreadyExists):
return admin.ErrBucketsAlreadyExists
case errors.Is(err, adapter.ErrAdminBucketNotFound):
return admin.ErrBucketsNotFound
case errors.Is(err, adapter.ErrAdminBucketNotEmpty):
return admin.ErrBucketsNotEmpty
case errors.Is(err, adapter.ErrAdminInvalidBucketName),
errors.Is(err, adapter.ErrAdminInvalidACL):
// Surface the adapter's wrapped message via *ValidationError
// so the HTTP handler emits 400 invalid_request with a
// useful explanation instead of leaking raw err.Error().
return &admin.ValidationError{Message: err.Error()}
case isLeaderChurnError(err):
// Mid-dispatch leadership churn looks like an internal error
// from the kv coordinator; mapping it to the Bucket-side
// not-leader sentinel keeps the SPA's retry contract intact.
return admin.ErrBucketsNotLeader
default:
return err //nolint:wrapcheck // forwarded so the handler logs but does not surface it.
}
}
// dynamoTablesBridge is the thin adapter that re-shapes the adapter's
// AdminTableSummary DTO into the admin package's DynamoTableSummary.
// The two structs are deliberately isomorphic so this translation
// does no allocation more than necessary; if a future GSI field is
// added on one side, the build breaks here, which is exactly the
// drift signal we want.
type dynamoTablesBridge struct {
server *adapter.DynamoDBServer
}
func (b *dynamoTablesBridge) AdminListTables(ctx context.Context) ([]string, error) {
return b.server.AdminListTables(ctx) //nolint:wrapcheck // pure pass-through; the adapter owns the error context.
}
func (b *dynamoTablesBridge) AdminDescribeTable(ctx context.Context, name string) (*admin.DynamoTableSummary, bool, error) {
summary, exists, err := b.server.AdminDescribeTable(ctx, name)
if err != nil {
return nil, false, err //nolint:wrapcheck // adapter wraps internally.
}
if !exists {
return nil, false, nil
}
return convertAdminTableSummary(summary), true, nil
}
func (b *dynamoTablesBridge) AdminCreateTable(ctx context.Context, principal admin.AuthPrincipal, in admin.CreateTableRequest) (*admin.DynamoTableSummary, error) {
summary, err := b.server.AdminCreateTable(ctx, convertAdminPrincipal(principal), convertCreateTableInput(in))
if err != nil {
return nil, translateAdminTablesError(err)
}
return convertAdminTableSummary(summary), nil
}
func (b *dynamoTablesBridge) AdminDeleteTable(ctx context.Context, principal admin.AuthPrincipal, name string) error {
if err := b.server.AdminDeleteTable(ctx, convertAdminPrincipal(principal), name); err != nil {
return translateAdminTablesError(err)
}
return nil
}
// convertAdminPrincipal mirrors admin.AuthPrincipal onto the
// adapter's parallel struct. Both packages keep the principal type
// independent so the adapter stays free of internal/admin
// dependencies, but the role / access-key fields are deliberately
// 1:1 — any drift is a wiring bug, not a feature.
func convertAdminPrincipal(p admin.AuthPrincipal) adapter.AdminPrincipal {
role := adapter.AdminRoleReadOnly
if p.Role.AllowsWrite() {
role = adapter.AdminRoleFull
}
return adapter.AdminPrincipal{AccessKey: p.AccessKey, Role: role}
}
// convertCreateTableInput translates the admin-handler request DTO
// into the adapter's parallel input struct. We do this here — not
// in the admin package — to keep `internal/admin` free of any
// adapter import.
func convertCreateTableInput(in admin.CreateTableRequest) adapter.AdminCreateTableInput {
out := adapter.AdminCreateTableInput{
TableName: in.TableName,
PartitionKey: adapter.AdminAttribute{Name: in.PartitionKey.Name, Type: in.PartitionKey.Type},
}
if in.SortKey != nil {
out.SortKey = &adapter.AdminAttribute{Name: in.SortKey.Name, Type: in.SortKey.Type}
}
if len(in.GSI) == 0 {
return out
}
out.GSI = make([]adapter.AdminCreateGSI, len(in.GSI))
for i, g := range in.GSI {
gsi := adapter.AdminCreateGSI{
Name: g.Name,
PartitionKey: adapter.AdminAttribute{Name: g.PartitionKey.Name, Type: g.PartitionKey.Type},
ProjectionType: g.Projection.Type,
NonKeyAttributes: append([]string(nil), g.Projection.NonKeyAttributes...),
}
if g.SortKey != nil {
gsi.SortKey = &adapter.AdminAttribute{Name: g.SortKey.Name, Type: g.SortKey.Type}
}
out.GSI[i] = gsi
}
return out
}
// translateAdminTablesError maps the adapter's error vocabulary
// onto the admin-package sentinels the HTTP handler matches against.
// Anything not recognised is forwarded as-is and answered with 500
// + a sanitised body, so a future adapter error mode does not leak
// raw text to clients while we are still adding the translation.
func translateAdminTablesError(err error) error {
switch {
case err == nil:
return nil
case errors.Is(err, adapter.ErrAdminForbidden):
return admin.ErrTablesForbidden
case errors.Is(err, adapter.ErrAdminNotLeader):
return admin.ErrTablesNotLeader
// Check structured adapter errors BEFORE the leader-churn
// matcher: a ValidationException whose message contains
// "not leader" (e.g., a user-supplied attribute name like
// `not leader` triggering the conflicting-attribute-type
// validator) must be classified as 400 invalid_request, not
// 503 leader_unavailable. The kv-internal sentinel checks
// in isLeaderChurnError still catch real leadership churn
// because they are typed-sentinel matches, not substring.
case adapter.IsAdminTableAlreadyExists(err):
return admin.ErrTablesAlreadyExists
case adapter.IsAdminTableNotFound(err):
return admin.ErrTablesNotFound
case adapter.IsAdminValidation(err):
msg := adapter.AdminErrorMessage(err)
if msg == "" {
msg = "validation failed"
}
return &admin.ValidationError{Message: msg}
case isLeaderChurnError(err):
// Cover leader-churn that surfaces between the up-front
// isVerifiedDynamoLeader check and createTableWithRetry's
// dispatch — the kv coordinator can drop leadership in
// that window and the resulting transient error should
// surface as 503 leader_unavailable + Retry-After: 1
// rather than a generic 500. Codex P2 on PR #634.
return admin.ErrTablesNotLeader
default:
return err //nolint:wrapcheck // forwarded so the handler logs but does not surface it.
}
}
// isLeaderChurnError reports whether err looks like one of the
// transient leader sentinels the kv coordinator and adapter
// internals emit during a leadership change. The set mirrors the
// closed list in kv.leaderErrorPhrases — keep them in sync if a
// new sentinel is added on the kv side.
//
// Phrase matching uses HasSuffix (not Contains) on the standard
// canonical strings because every kv-internal sentinel emits the
// phrase at the END of its error chain (e.g.,
// "raft engine: not leader", "dispatch failed: leader not found").
// A user-supplied string that happens to contain a leader phrase
// in the MIDDLE of an unrelated error message therefore does not
// false-positive — Codex P2 on PR #634 flagged the original
// strings.Contains form for misclassifying validation messages
// like "conflicting attribute type for <user-name>" when the
// name itself was "not leader".
func isLeaderChurnError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, kv.ErrLeaderNotFound) ||
errors.Is(err, adapter.ErrNotLeader) ||
errors.Is(err, adapter.ErrLeaderNotFound) {
return true
}
msg := err.Error()
return strings.HasSuffix(msg, "not leader") ||
strings.HasSuffix(msg, "leader not found") ||
strings.HasSuffix(msg, "leadership lost") ||
strings.HasSuffix(msg, "leadership transfer in progress")
}
func convertAdminTableSummary(in *adapter.AdminTableSummary) *admin.DynamoTableSummary {
out := &admin.DynamoTableSummary{
Name: in.Name,
PartitionKey: in.PartitionKey,
SortKey: in.SortKey,
Generation: in.Generation,
}
if len(in.GlobalSecondaryIndexes) == 0 {
return out
}
out.GlobalSecondaryIndexes = make([]admin.DynamoGSISummary, len(in.GlobalSecondaryIndexes))
for i, g := range in.GlobalSecondaryIndexes {
out.GlobalSecondaryIndexes[i] = admin.DynamoGSISummary{
Name: g.Name,
PartitionKey: g.PartitionKey,
SortKey: g.SortKey,
ProjectionType: g.ProjectionType,
}
}
return out
}
// buildAdminConfig translates flag values into an admin.Config.
func buildAdminConfig(in adminListenerConfig) admin.Config {
return admin.Config{
Enabled: in.enabled,
Listen: in.listen,
TLSCertFile: in.tlsCertFile,
TLSKeyFile: in.tlsKeyFile,
AllowPlaintextNonLoopback: in.allowPlaintextNonLoopback,
SessionSigningKey: in.sessionSigningKey,
SessionSigningKeyPrevious: in.sessionSigningKeyPrevious,
ReadOnlyAccessKeys: in.readOnlyAccessKeys,
FullAccessKeys: in.fullAccessKeys,
AllowInsecureDevCookie: in.allowInsecureDevCookie,
}
}
// startAdminServer validates the admin configuration, constructs the admin
// server, and attaches its lifecycle to eg. It is a no-op when the admin
// listener is disabled. Errors at this point are hard startup failures:
// the design doc mandates ハードエラーで起動失敗 for every invalid
// configuration, and we honour that uniformly.
//
// The returned address is the actual host:port the listener bound to; it
// differs from adminCfg.Listen only when the caller passed a port of 0,
// but tests rely on this to avoid the bind-close-rebind race that a
// pre-allocated free-port helper would otherwise introduce. When admin
// is disabled the returned address is empty.
func startAdminServer(
ctx context.Context,
lc *net.ListenConfig,
eg *errgroup.Group,
cfg adminListenerConfig,
creds map[string]string,
cluster admin.ClusterInfoSource,
tables admin.TablesSource,
buckets admin.BucketsSource,
queues admin.QueuesSource,
forwarder admin.LeaderForwarder,
leaderProbe admin.LeaderProbe,
keyvizSampler *keyviz.MemSampler,
keyvizFanoutCfg keyVizFanoutConfig,
version string,
) (string, error) {
adminCfg := buildAdminConfig(cfg)
enabled, err := checkAdminConfig(&adminCfg, cluster)
if err != nil || !enabled {
return "", err
}
server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, buckets, queues, forwarder, leaderProbe, keyvizSampler, keyvizFanoutCfg)
if err != nil {
return "", err
}
httpSrv := newAdminHTTPServer(server)
listener, err := lc.Listen(ctx, "tcp", adminCfg.Listen)
if err != nil {
return "", errors.Wrapf(err, "failed to listen on admin address %s", adminCfg.Listen)
}
tlsEnabled := strings.TrimSpace(adminCfg.TLSCertFile) != "" && strings.TrimSpace(adminCfg.TLSKeyFile) != ""
if tlsEnabled {
httpSrv.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
}
actualAddr := listener.Addr().String()
// Use the real bound address in log lines and in the lifecycle
// task so the shutdown banner matches startup.
boundCfg := adminCfg
boundCfg.Listen = actualAddr
registerAdminLifecycle(ctx, eg, httpSrv, listener, &boundCfg, tlsEnabled, version)
return actualAddr, nil
}
// checkAdminConfig validates adminCfg; returns (enabled=false, nil) when
// admin is disabled and requires no further work.
func checkAdminConfig(adminCfg *admin.Config, cluster admin.ClusterInfoSource) (bool, error) {
if err := adminCfg.Validate(); err != nil {
if !adminCfg.Enabled {
return false, nil
}
return false, errors.Wrap(err, "admin config is invalid")
}
if !adminCfg.Enabled {
return false, nil
}
if cluster == nil {
return false, errors.New("admin: cluster info source is required")
}
return true, nil
}
func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, queues admin.QueuesSource, forwarder admin.LeaderForwarder, leaderProbe admin.LeaderProbe, keyvizSampler *keyviz.MemSampler, keyvizFanoutCfg keyVizFanoutConfig) (*admin.Server, error) {
primaryKeys, err := adminCfg.DecodedSigningKeys()
if err != nil {
return nil, errors.Wrap(err, "decode admin signing keys")
}
signer, err := admin.NewSigner(primaryKeys[0], nil)
if err != nil {
return nil, errors.Wrap(err, "build admin signer")
}
verifier, err := admin.NewVerifier(primaryKeys, nil)
if err != nil {
return nil, errors.Wrap(err, "build admin verifier")
}
staticFS, err := admin.StaticFS()
if err != nil {
return nil, errors.Wrap(err, "open embedded admin SPA")
}
server, err := admin.NewServer(admin.ServerDeps{
Signer: signer,
Verifier: verifier,
Credentials: admin.MapCredentialStore(creds),
Roles: adminCfg.RoleIndex(),
ClusterInfo: cluster,
Tables: tables,
Buckets: buckets,
Queues: queues,
Forwarder: forwarder,
KeyViz: keyvizSourceFromSampler(keyvizSampler),
KeyVizFanout: buildKeyVizFanout(adminCfg.Listen, keyvizFanoutCfg),
StaticFS: staticFS,
LeaderProbe: leaderProbe,
AuthOpts: admin.AuthServiceOpts{
InsecureCookie: adminCfg.AllowInsecureDevCookie,
},
Logger: slog.Default().With(slog.String("component", "admin")),
})
if err != nil {
return nil, errors.Wrap(err, "build admin server")
}
return server, nil
}
func newAdminHTTPServer(server *admin.Server) *http.Server {
return &http.Server{
Handler: server.Handler(),
ReadHeaderTimeout: adminReadHeaderTimeout,
WriteTimeout: adminWriteTimeout,
IdleTimeout: adminIdleTimeout,
}
}
func registerAdminLifecycle(
ctx context.Context,
eg *errgroup.Group,
httpSrv *http.Server,
listener net.Listener,
adminCfg *admin.Config,
tlsEnabled bool,
version string,
) {
addr := adminCfg.Listen
eg.Go(func() error {
<-ctx.Done()
slog.Info("shutting down admin listener", "address", addr, "reason", ctx.Err())
shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), adminShutdownTimeout)
defer cancel()
err := httpSrv.Shutdown(shutdownCtx)
if err == nil || errors.Is(err, http.ErrServerClosed) || errors.Is(err, net.ErrClosed) {
return nil
}
return errors.WithStack(err)
})
eg.Go(func() error {
slog.Info("starting admin listener", "address", addr, "tls", tlsEnabled, "version", version)
var serveErr error
if tlsEnabled {
serveErr = httpSrv.ServeTLS(listener, adminCfg.TLSCertFile, adminCfg.TLSKeyFile)
} else {
serveErr = httpSrv.Serve(listener)
}
if serveErr == nil || errors.Is(serveErr, http.ErrServerClosed) || errors.Is(serveErr, net.ErrClosed) {
return nil
}
return errors.Wrapf(serveErr, "admin listener on %s stopped with error", addr)
})
}
// newClusterInfoSource builds a ClusterInfoSource that reads from the
// runtime raftGroupRuntime slice. It lives here (rather than
// internal/admin) so the admin package stays free of main-package types.
//
// Membership is fetched via engine.Configuration(ctx); the call is
// best-effort — if it fails (for instance because the engine is in the
// middle of a leadership transition) we leave Members empty rather
// than fail the whole cluster snapshot.
func newClusterInfoSource(nodeID, version string, runtimes []*raftGroupRuntime) admin.ClusterInfoSource {
return admin.ClusterInfoFunc(func(ctx context.Context) (admin.ClusterInfo, error) {
groups := make([]admin.GroupInfo, 0, len(runtimes))
for _, rt := range runtimes {
if rt == nil || rt.engine == nil {
continue
}
status := rt.engine.Status()
// Seed as an empty-but-non-nil slice so a
// Configuration() failure still JSON-encodes as `[]`
// rather than `null`; API consumers that treat
// members as an always-array field rely on this.
members := []string{}
if cfg, err := rt.engine.Configuration(ctx); err == nil {
members = make([]string, 0, len(cfg.Servers))
for _, srv := range cfg.Servers {
members = append(members, srv.ID)
}
}
groups = append(groups, admin.GroupInfo{
GroupID: rt.spec.id,
LeaderID: status.Leader.ID,
IsLeader: status.State == raftengine.StateLeader,
Members: members,
})
}
return admin.ClusterInfo{
NodeID: nodeID,
Version: version,
Groups: groups,
}, nil
})
}
// resolveSigningKey picks the effective admin signing key from, in
// priority order: the --*File flag (file contents), the env var, and
// finally the --*Flag argv value. Preferring the file/env paths keeps
// the raw base64 out of /proc/<pid>/cmdline on Linux. Returns the empty
// string when every source is unset — callers that require a value
// (validated elsewhere) must handle that case themselves.
func resolveSigningKey(flagValue, filePath, envVar string) (string, error) {
if strings.TrimSpace(filePath) != "" {
b, err := os.ReadFile(filePath)
if err != nil {
return "", errors.Wrapf(err, "read admin signing key file %q", filePath)
}
return strings.TrimSpace(string(b)), nil
}
if v := strings.TrimSpace(os.Getenv(envVar)); v != "" {
return v, nil
}
return strings.TrimSpace(flagValue), nil
}
// keyvizSourceFromSampler boxes a *keyviz.MemSampler into the
// admin.KeyVizSource interface understood by ServerDeps. Returning a
// nil interface (not a typed-nil) when the sampler is disabled is
// load-bearing: the admin handler's "keyviz disabled → 503" branch
// only fires on an interface-nil; a typed-nil *MemSampler stored as
// a non-nil interface would silently return an empty matrix instead
// of the explicit "feature off" signal the SPA expects.
func keyvizSourceFromSampler(s *keyviz.MemSampler) admin.KeyVizSource {
if s == nil {
return nil
}
return s
}
// buildKeyVizFanout assembles the Phase 2-C fan-out aggregator from
// the operator-supplied flag values. selfListen is the local admin
// listener address (used to filter the local node out of the peer
// list so symmetric `--keyvizFanoutNodes=node1,node2,node3` configs
// stamped onto every host do not loop back over HTTP). Returns nil
// when no peers remain after filtering, leaving the keyviz handler
// in single-node mode.
//
// The matching rule is conservative: a peer is treated as "self"
// when its host:port equals selfListen literally OR when it equals
// selfListen with the host normalised to 127.0.0.1 (operators
// commonly bind admin to 0.0.0.0 but list 127.0.0.1 as the
// per-host fan-out entry). Anything else is treated as a peer.
func buildKeyVizFanout(selfListen string, cfg keyVizFanoutConfig) *admin.KeyVizFanout {
if len(cfg.Nodes) == 0 {
return nil
}
peers := make([]string, 0, len(cfg.Nodes))
for _, n := range cfg.Nodes {
if isSelfFanoutNode(selfListen, n) {
continue
}
peers = append(peers, n)
}
if len(peers) == 0 {
return nil
}
f := admin.NewKeyVizFanout(selfListen, peers)
if cfg.Timeout > 0 {
f = f.WithTimeout(cfg.Timeout)
}
return f.WithLogger(slog.Default().With(slog.String("component", "admin.keyviz.fanout")))
}
// isSelfFanoutNode returns true when n names this node's own admin
// listener. A relaxed match handles the common bind-vs-advertise
// asymmetry: bind on 0.0.0.0:8080 but advertise (and list) as
// 127.0.0.1:8080.
func isSelfFanoutNode(selfListen, n string) bool {
n = strings.TrimSpace(n)
if n == "" {
return true
}
stripped := stripScheme(n)
if stripped == selfListen {
return true
}
host, port, err := net.SplitHostPort(stripped)
if err != nil {
return false
}
selfHost, selfPort, err := net.SplitHostPort(selfListen)
if err != nil || port != selfPort {
return false
}
if isWildcardHost(selfHost) {
return isLoopbackHost(host)
}
return host == selfHost
}
func isWildcardHost(h string) bool { return h == "0.0.0.0" || h == "::" || h == "" }
func isLoopbackHost(h string) bool {
return h == "127.0.0.1" || h == "localhost" || h == "::1"
}
func stripScheme(raw string) string {
if i := strings.Index(raw, "://"); i >= 0 {
return raw[i+3:]
}
return raw
}
// parseCSV splits a flag value like "a,b,c" into a slice with empty and
// whitespace-only entries dropped. It is not in shard_config.go because
// admin's comma-separated list format is simpler than raft groups.
func parseCSV(raw string) []string {
parts := strings.Split(raw, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
if trim := strings.TrimSpace(p); trim != "" {
out = append(out, trim)
}
}
return out
}