|
| 1 | +package admin |
| 2 | + |
| 3 | +import ( |
| 4 | + "bytes" |
| 5 | + "context" |
| 6 | + "errors" |
| 7 | + "log/slog" |
| 8 | + "net/http" |
| 9 | + "strings" |
| 10 | + |
| 11 | + pb "github.com/bootjp/elastickv/proto" |
| 12 | + "github.com/goccy/go-json" |
| 13 | +) |
| 14 | + |
| 15 | +// adminForwardPayloadLimit caps the JSON payload the leader will |
| 16 | +// decode for any Forward operation. Mirrors defaultBodyLimit on the |
| 17 | +// HTTP path (64 KiB) so a single Forward call cannot consume more |
| 18 | +// memory than the same operation would over /admin/api/v1/dynamo/. |
| 19 | +// gRPC has its own 4 MiB max-message default, but that is way too |
| 20 | +// permissive for admin: a follower-forwarded request must obey the |
| 21 | +// same 64 KiB ceiling we promise on the public API surface. |
| 22 | +const adminForwardPayloadLimit = 64 << 10 |
| 23 | + |
| 24 | +// ForwardServer is the leader-side gRPC handler for the AdminForward |
| 25 | +// RPC (design Section 3.3). The follower's admin HTTP layer calls it |
| 26 | +// when the local node is not the Raft leader; this server then |
| 27 | +// re-validates the principal, dispatches the operation against the |
| 28 | +// local TablesSource, and serialises the result back to the |
| 29 | +// follower in the same JSON shape the SPA would have received from a |
| 30 | +// leader-direct call. |
| 31 | +// |
| 32 | +// The server is deliberately kept independent of the dynamo HTTP |
| 33 | +// handler: it runs in the gRPC server's goroutine pool, not in the |
| 34 | +// HTTP server's, and shares only the TablesSource interface (which |
| 35 | +// the bridge in main_admin.go already implements for the local |
| 36 | +// adapter). |
| 37 | +type ForwardServer struct { |
| 38 | + pb.UnimplementedAdminForwardServer |
| 39 | + |
| 40 | + source TablesSource |
| 41 | + roles RoleStore |
| 42 | + logger *slog.Logger |
| 43 | +} |
| 44 | + |
| 45 | +// NewForwardServer wires a TablesSource and a RoleStore behind the |
| 46 | +// gRPC AdminForward service. logger may be nil; defaults to |
| 47 | +// slog.Default(). |
| 48 | +func NewForwardServer(source TablesSource, roles RoleStore, logger *slog.Logger) *ForwardServer { |
| 49 | + if logger == nil { |
| 50 | + logger = slog.Default() |
| 51 | + } |
| 52 | + return &ForwardServer{source: source, roles: roles, logger: logger} |
| 53 | +} |
| 54 | + |
| 55 | +// Forward is the gRPC entrypoint. It performs the principal |
| 56 | +// re-evaluation the design mandates, then dispatches by operation. |
| 57 | +// Errors that the SPA can act on are returned as a structured |
| 58 | +// AdminForwardResponse with status_code + JSON payload; only fatal |
| 59 | +// gRPC-layer errors (decode failure, unknown operation) come back as |
| 60 | +// status.Errorf to the follower. |
| 61 | +func (s *ForwardServer) Forward(ctx context.Context, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { |
| 62 | + if req == nil || req.GetPrincipal() == nil { |
| 63 | + return rejectForward(http.StatusBadRequest, "invalid_request", "missing principal") |
| 64 | + } |
| 65 | + // Sanitise forwarded_from before it ever reaches a slog |
| 66 | + // handler. With JSON output the encoder escapes newlines on |
| 67 | + // our behalf, but with a text-format handler an attacker who |
| 68 | + // controlled the follower side could embed `\n` in the value |
| 69 | + // and split a single audit line into two — defeating |
| 70 | + // log-aggregation parsing or spoofing a synthetic entry. |
| 71 | + // Replacing CR/LF with spaces at the entry point keeps every |
| 72 | + // downstream call site on the leader trivially safe (Claude |
| 73 | + // review on PR #635). |
| 74 | + forwardedFrom := sanitiseForwardedFrom(req.GetForwardedFrom()) |
| 75 | + principal, ok := s.validatePrincipal(req.GetPrincipal()) |
| 76 | + if !ok { |
| 77 | + // Don't leak why the principal failed — the follower may |
| 78 | + // have a different view of the cluster's role config and |
| 79 | + // we want operators to investigate from the audit log on |
| 80 | + // the leader, not the follower's response body. |
| 81 | + s.logger.LogAttrs(ctx, slog.LevelWarn, "admin_forward_principal_rejected", |
| 82 | + slog.String("forwarded_from", forwardedFrom), |
| 83 | + slog.String("claimed_access_key", req.GetPrincipal().GetAccessKey()), |
| 84 | + slog.String("claimed_role", req.GetPrincipal().GetRole()), |
| 85 | + ) |
| 86 | + return rejectForward(http.StatusForbidden, "forbidden", |
| 87 | + "this endpoint requires a full-access role") |
| 88 | + } |
| 89 | + switch req.GetOperation() { |
| 90 | + case pb.AdminOperation_ADMIN_OP_CREATE_TABLE: |
| 91 | + return s.handleCreate(ctx, principal, forwardedFrom, req) |
| 92 | + case pb.AdminOperation_ADMIN_OP_DELETE_TABLE: |
| 93 | + return s.handleDelete(ctx, principal, forwardedFrom, req) |
| 94 | + case pb.AdminOperation_ADMIN_OP_UNSPECIFIED: |
| 95 | + return rejectForward(http.StatusBadRequest, "invalid_request", "unknown admin operation") |
| 96 | + default: |
| 97 | + return rejectForward(http.StatusBadRequest, "invalid_request", "unknown admin operation") |
| 98 | + } |
| 99 | +} |
| 100 | + |
| 101 | +func (s *ForwardServer) validatePrincipal(p *pb.AdminPrincipal) (AuthPrincipal, bool) { |
| 102 | + accessKey := p.GetAccessKey() |
| 103 | + if accessKey == "" { |
| 104 | + return AuthPrincipal{}, false |
| 105 | + } |
| 106 | + role, ok := s.roles.LookupRole(accessKey) |
| 107 | + if !ok { |
| 108 | + return AuthPrincipal{}, false |
| 109 | + } |
| 110 | + // Critical re-evaluation: if the leader sees this access key as |
| 111 | + // read-only, the operation is forbidden even if the follower |
| 112 | + // thought it was full. The reverse — leader sees full, follower |
| 113 | + // sees read-only — would have been short-circuited at the |
| 114 | + // follower already, so we do not need to check it here. |
| 115 | + if !role.AllowsWrite() { |
| 116 | + return AuthPrincipal{}, false |
| 117 | + } |
| 118 | + return AuthPrincipal{AccessKey: accessKey, Role: role}, true |
| 119 | +} |
| 120 | + |
| 121 | +func (s *ForwardServer) handleCreate(ctx context.Context, principal AuthPrincipal, forwardedFrom string, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { |
| 122 | + payload := req.GetPayload() |
| 123 | + if len(payload) > adminForwardPayloadLimit { |
| 124 | + return rejectForward(http.StatusRequestEntityTooLarge, "payload_too_large", |
| 125 | + "forwarded payload exceeds the 64 KiB admin limit") |
| 126 | + } |
| 127 | + // Reuse the HTTP handler's strict decoder so the forwarded |
| 128 | + // path enforces the same shape contract — DisallowUnknownFields, |
| 129 | + // trailing-token rejection, slash-in-name rejection, and the |
| 130 | + // rest of validateCreateTableRequest. Bypassing it here would |
| 131 | + // let a hostile follower (or a misbehaving SPA on the follower |
| 132 | + // side) sneak past validations the leader-direct path enforces. |
| 133 | + body, err := decodeCreateTableRequest(bytes.NewReader(payload)) |
| 134 | + if err != nil { |
| 135 | + return rejectForward(http.StatusBadRequest, "invalid_body", err.Error()) |
| 136 | + } |
| 137 | + summary, err := s.source.AdminCreateTable(ctx, principal, body) |
| 138 | + if err != nil { |
| 139 | + s.logUnexpectedSourceError(ctx, "create_table", body.TableName, forwardedFrom, err) |
| 140 | + return forwardErrorResponse("create", err), nil |
| 141 | + } |
| 142 | + s.logger.LogAttrs(ctx, slog.LevelInfo, "admin_audit", |
| 143 | + slog.String("actor", principal.AccessKey), |
| 144 | + slog.String("role", string(principal.Role)), |
| 145 | + slog.String("forwarded_from", forwardedFrom), |
| 146 | + slog.String("operation", "create_table"), |
| 147 | + slog.String("table", body.TableName), |
| 148 | + ) |
| 149 | + return jsonForwardResponse(http.StatusCreated, summary) |
| 150 | +} |
| 151 | + |
| 152 | +func (s *ForwardServer) handleDelete(ctx context.Context, principal AuthPrincipal, forwardedFrom string, req *pb.AdminForwardRequest) (*pb.AdminForwardResponse, error) { |
| 153 | + // Delete carries the table name in the payload as JSON so the |
| 154 | + // proto stays operation-agnostic — there is no operation-specific |
| 155 | + // field in AdminForwardRequest, by design (adding one per op |
| 156 | + // would couple every new admin endpoint to the proto schema). |
| 157 | + payload := req.GetPayload() |
| 158 | + if len(payload) > adminForwardPayloadLimit { |
| 159 | + return rejectForward(http.StatusRequestEntityTooLarge, "payload_too_large", |
| 160 | + "forwarded payload exceeds the 64 KiB admin limit") |
| 161 | + } |
| 162 | + // Mirror decodeCreateTableRequest's NUL-byte guard: goccy/go-json |
| 163 | + // treats raw NUL as end-of-input so dec.More() would otherwise |
| 164 | + // miss `{"name":"users"}\x00{"extra":1}` payloads. Codex P2 on |
| 165 | + // PR #635 flagged this as the same smuggling vector that the |
| 166 | + // HTTP create path already covers. |
| 167 | + if bytes.IndexByte(payload, 0) >= 0 { |
| 168 | + return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload contains a NUL byte") |
| 169 | + } |
| 170 | + dec := json.NewDecoder(bytes.NewReader(payload)) |
| 171 | + dec.DisallowUnknownFields() |
| 172 | + var body struct { |
| 173 | + Name string `json:"name"` |
| 174 | + } |
| 175 | + if err := dec.Decode(&body); err != nil { |
| 176 | + return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload is not valid JSON") |
| 177 | + } |
| 178 | + if dec.More() { |
| 179 | + return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload has trailing data") |
| 180 | + } |
| 181 | + if body.Name == "" { |
| 182 | + return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload missing name") |
| 183 | + } |
| 184 | + // Reject slash-bearing names symmetrically with the HTTP |
| 185 | + // handleDelete and handleDescribe paths. Without this, a |
| 186 | + // forwarded call could act on `foo/bar` while a leader-direct |
| 187 | + // call would 404 — divergent behaviour Codex P2 flagged on |
| 188 | + // PR #635. |
| 189 | + if strings.ContainsRune(body.Name, '/') { |
| 190 | + return rejectForward(http.StatusBadRequest, "invalid_body", "delete payload name must not contain '/'") |
| 191 | + } |
| 192 | + if err := s.source.AdminDeleteTable(ctx, principal, body.Name); err != nil { |
| 193 | + s.logUnexpectedSourceError(ctx, "delete_table", body.Name, forwardedFrom, err) |
| 194 | + return forwardErrorResponse("delete", err), nil |
| 195 | + } |
| 196 | + s.logger.LogAttrs(ctx, slog.LevelInfo, "admin_audit", |
| 197 | + slog.String("actor", principal.AccessKey), |
| 198 | + slog.String("role", string(principal.Role)), |
| 199 | + slog.String("forwarded_from", forwardedFrom), |
| 200 | + slog.String("operation", "delete_table"), |
| 201 | + slog.String("table", body.Name), |
| 202 | + ) |
| 203 | + return &pb.AdminForwardResponse{StatusCode: http.StatusNoContent}, nil |
| 204 | +} |
| 205 | + |
| 206 | +// sanitiseForwardedFrom strips CR/LF from a follower-supplied |
| 207 | +// node id so a malicious value cannot split a single audit log |
| 208 | +// line into two when slog is using a text-format handler. JSON |
| 209 | +// handlers escape these characters automatically; this is a |
| 210 | +// defence-in-depth pass for handler-format-agnostic safety. |
| 211 | +// Other control characters are deliberately preserved — only the |
| 212 | +// line-splitting characters matter for log spoofing. |
| 213 | +func sanitiseForwardedFrom(s string) string { |
| 214 | + return strings.Map(func(r rune) rune { |
| 215 | + if r == '\n' || r == '\r' { |
| 216 | + return ' ' |
| 217 | + } |
| 218 | + return r |
| 219 | + }, s) |
| 220 | +} |
| 221 | + |
| 222 | +// forwardErrorResponse re-encodes a TablesSource error in the |
| 223 | +// structured shape the follower's handler can re-emit verbatim. This |
| 224 | +// is the leader-side counterpart of writeTablesError: every status / |
| 225 | +// JSON code the HTTP handler chooses is mirrored here so a forwarded |
| 226 | +// call is indistinguishable to the SPA from a leader-direct call. |
| 227 | +// |
| 228 | +// op is "create" or "delete" so the unmapped 500 fallthrough emits |
| 229 | +// dynamo_create_failed / dynamo_delete_failed — the same |
| 230 | +// operation-specific codes the leader-direct HTTP path produces in |
| 231 | +// writeTablesError. Without this, forwarded write failures showed |
| 232 | +// up to clients as a generic "internal" code, breaking parity with |
| 233 | +// the leader-direct path (Codex P2 on PR #635). |
| 234 | +func forwardErrorResponse(op string, err error) *pb.AdminForwardResponse { |
| 235 | + switch { |
| 236 | + case errors.Is(err, ErrTablesForbidden): |
| 237 | + return mustForwardJSON(http.StatusForbidden, errorResponse{Error: "forbidden", Message: "this endpoint requires a full-access role"}) |
| 238 | + case errors.Is(err, ErrTablesNotLeader): |
| 239 | + // Should never happen on the leader path — the leader |
| 240 | + // just verified itself — but if a leadership transfer |
| 241 | + // races with the dispatch, surface it consistently. |
| 242 | + // Carry retry_after_seconds=1 so the follower's bridge |
| 243 | + // translates it back into the same HTTP Retry-After |
| 244 | + // header the leader-direct path emits (Codex P2 on |
| 245 | + // PR #635 — without this the forwarded 503 would lose |
| 246 | + // its retry timing). |
| 247 | + resp := mustForwardJSON(http.StatusServiceUnavailable, errorResponse{Error: "leader_unavailable", Message: "leader stepped down mid-request"}) |
| 248 | + resp.RetryAfterSeconds = 1 |
| 249 | + return resp |
| 250 | + case errors.Is(err, ErrTablesNotFound): |
| 251 | + return mustForwardJSON(http.StatusNotFound, errorResponse{Error: "not_found", Message: "table does not exist"}) |
| 252 | + case errors.Is(err, ErrTablesAlreadyExists): |
| 253 | + return mustForwardJSON(http.StatusConflict, errorResponse{Error: "already_exists", Message: "table already exists"}) |
| 254 | + } |
| 255 | + var verr *ValidationError |
| 256 | + if errors.As(err, &verr) { |
| 257 | + return mustForwardJSON(http.StatusBadRequest, errorResponse{Error: "invalid_request", Message: verr.Error()}) |
| 258 | + } |
| 259 | + return mustForwardJSON(http.StatusInternalServerError, errorResponse{ |
| 260 | + Error: "dynamo_" + op + "_failed", |
| 261 | + Message: "failed to " + op + " table; see leader logs", |
| 262 | + }) |
| 263 | +} |
| 264 | + |
| 265 | +// logUnexpectedSourceError emits an error log for non-sentinel |
| 266 | +// source failures so operators have a breadcrumb when forwarded |
| 267 | +// writes 500. Sentinel errors that map to specific HTTP statuses |
| 268 | +// (forbidden, not-found, validation, ...) are deliberately |
| 269 | +// silent: those are routine client-side failures, not server |
| 270 | +// regressions, and logging them at LevelError would drown the |
| 271 | +// operational signal. The HTTP path's writeTablesError applies |
| 272 | +// the same policy (Codex P2 on PR #635 flagged the silent path). |
| 273 | +func (s *ForwardServer) logUnexpectedSourceError(ctx context.Context, op, table, forwardedFrom string, err error) { |
| 274 | + if isStructuredSourceError(err) { |
| 275 | + return |
| 276 | + } |
| 277 | + s.logger.LogAttrs(ctx, slog.LevelError, "admin_forward_"+op+"_failed", |
| 278 | + slog.String("table", table), |
| 279 | + slog.String("forwarded_from", forwardedFrom), |
| 280 | + slog.String("error", err.Error()), |
| 281 | + ) |
| 282 | +} |
| 283 | + |
| 284 | +// isStructuredSourceError reports whether err is one of the |
| 285 | +// admin-package sentinels or a ValidationError — i.e., a known |
| 286 | +// failure mode the handler maps to a non-500 status. These are |
| 287 | +// expected and not log-worthy. |
| 288 | +func isStructuredSourceError(err error) bool { |
| 289 | + switch { |
| 290 | + case errors.Is(err, ErrTablesForbidden), |
| 291 | + errors.Is(err, ErrTablesNotLeader), |
| 292 | + errors.Is(err, ErrTablesNotFound), |
| 293 | + errors.Is(err, ErrTablesAlreadyExists): |
| 294 | + return true |
| 295 | + } |
| 296 | + var verr *ValidationError |
| 297 | + return errors.As(err, &verr) |
| 298 | +} |
| 299 | + |
| 300 | +func rejectForward(status int, code, msg string) (*pb.AdminForwardResponse, error) { |
| 301 | + return mustForwardJSON(status, errorResponse{Error: code, Message: msg}), nil |
| 302 | +} |
| 303 | + |
| 304 | +func mustForwardJSON(status int, body any) *pb.AdminForwardResponse { |
| 305 | + payload, err := json.Marshal(body) |
| 306 | + if err != nil { |
| 307 | + // json.Marshal on a struct of strings cannot fail in |
| 308 | + // practice; a 500 with a bare string body is the safest |
| 309 | + // fallback if it ever does. |
| 310 | + return &pb.AdminForwardResponse{ |
| 311 | + StatusCode: http.StatusInternalServerError, |
| 312 | + Payload: []byte(`{"error":"internal","message":"failed to encode response"}`), |
| 313 | + ContentType: "application/json; charset=utf-8", |
| 314 | + } |
| 315 | + } |
| 316 | + return &pb.AdminForwardResponse{ |
| 317 | + StatusCode: int32(status), //nolint:gosec // status fits in int32; net/http codes are 100-599. |
| 318 | + Payload: payload, |
| 319 | + ContentType: "application/json; charset=utf-8", |
| 320 | + } |
| 321 | +} |
| 322 | + |
| 323 | +func jsonForwardResponse(status int, body any) (*pb.AdminForwardResponse, error) { |
| 324 | + return mustForwardJSON(status, body), nil |
| 325 | +} |
0 commit comments