diff --git a/cmd/mpcium/main.go b/cmd/mpcium/main.go index 95ae4224..85383f99 100644 --- a/cmd/mpcium/main.go +++ b/cmd/mpcium/main.go @@ -15,6 +15,7 @@ import ( "github.com/fystack/mpcium/pkg/constant" "github.com/fystack/mpcium/pkg/event" "github.com/fystack/mpcium/pkg/eventconsumer" + "github.com/fystack/mpcium/pkg/healthcheck" "github.com/fystack/mpcium/pkg/identity" "github.com/fystack/mpcium/pkg/infra" "github.com/fystack/mpcium/pkg/keyinfo" @@ -241,6 +242,21 @@ func runNode(ctx context.Context, c *cli.Command) error { } logger.Info("[READY] Node is ready", "nodeID", nodeID) + // Start health check server (disabled by default) + var healthServer *healthcheck.Server + if viper.GetBool("healthcheck.enabled") { + healthAddr := viper.GetString("healthcheck.address") + if healthAddr == "" { + healthAddr = ":8080" + } + healthServer = healthcheck.NewServer(healthAddr, peerRegistry, natsConn, consulClient) + go func() { + if err := healthServer.Start(); err != nil { + logger.Error("Health check server error", err) + } + }() + } + logger.Info("Starting consumers", "nodeID", nodeID) appContext, cancel := context.WithCancel(context.Background()) //Setup signal handling to cancel context on termination signals. @@ -251,6 +267,15 @@ func runNode(ctx context.Context, c *cli.Command) error { logger.Warn("Shutdown signal received, canceling context...") cancel() + // Shutdown health check server if it was started + if healthServer != nil { + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() + if err := healthServer.Shutdown(shutdownCtx); err != nil { + logger.Error("Failed to shutdown health check server", err) + } + } + // Resign from peer registry first (before closing NATS) if err := peerRegistry.Resign(); err != nil { logger.Error("Failed to resign from peer registry", err) diff --git a/config.prod.yaml.template b/config.prod.yaml.template index 1c701201..e91f9e9c 100644 --- a/config.prod.yaml.template +++ b/config.prod.yaml.template @@ -23,3 +23,7 @@ backup_dir: backups max_concurrent_keygen: 2 max_concurrent_signing: 10 session_warm_up_delay_ms: 100 + +healthcheck: + enabled: false # disabled by default, set to true for cloud deployment + address: "127.0.0.1:8080" diff --git a/config.yaml.template b/config.yaml.template index d868f563..d9f6e603 100644 --- a/config.yaml.template +++ b/config.yaml.template @@ -15,6 +15,10 @@ backup_dir: backups max_concurrent_keygen: 2 max_concurrent_signing: 10 session_warm_up_delay_ms: 100 +healthcheck: + enabled: true # disabled by default, set to true for cloud deployment + address: "0.0.0.0:8080" + # Authorization (optional) # authorization: diff --git a/pkg/healthcheck/server.go b/pkg/healthcheck/server.go new file mode 100644 index 00000000..dcfbf5cc --- /dev/null +++ b/pkg/healthcheck/server.go @@ -0,0 +1,150 @@ +package healthcheck + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "time" + + "github.com/fystack/mpcium/pkg/logger" + "github.com/fystack/mpcium/pkg/mpc" + "github.com/hashicorp/consul/api" + "github.com/nats-io/nats.go" +) + +// Server provides HTTP health check endpoints for Kubernetes probes +type Server struct { + httpServer *http.Server + peerRegistry mpc.PeerRegistry + natsConn *nats.Conn + consulClient *api.Client +} + +// HealthResponse represents the JSON response for health check endpoints +type HealthResponse struct { + Status string `json:"status"` + Live bool `json:"live"` + Ready bool `json:"ready"` + Details map[string]any `json:"details,omitempty"` +} + +// NewServer creates a new health check HTTP server +func NewServer(addr string, peerRegistry mpc.PeerRegistry, natsConn *nats.Conn, consulClient *api.Client) *Server { + s := &Server{ + peerRegistry: peerRegistry, + natsConn: natsConn, + consulClient: consulClient, + } + + mux := http.NewServeMux() + mux.HandleFunc("/health", s.healthHandler) + + s.httpServer = &http.Server{ + Addr: addr, + Handler: mux, + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + IdleTimeout: 15 * time.Second, + } + + return s +} + +// Start begins serving health check endpoints +func (s *Server) Start() error { + addr := s.httpServer.Addr + + // Parse host and port from address + host, port, err := net.SplitHostPort(addr) + if err != nil { + // If parsing fails, just use the address as-is + logger.Info("Starting health check server", "address", addr) + } else { + // Replace empty host or 0.0.0.0 with localhost for display + if host == "" || host == "0.0.0.0" { + host = "localhost" + } + endpoint := fmt.Sprintf("http://%s:%s/health", host, port) + logger.Info("Starting health check server", "endpoint", endpoint) + } + + if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + return fmt.Errorf("health check server failed: %w", err) + } + return nil +} + +// Shutdown gracefully stops the health check server +func (s *Server) Shutdown(ctx context.Context) error { + logger.Info("Shutting down health check server") + return s.httpServer.Shutdown(ctx) +} + +// healthHandler responds to health check requests +// This endpoint checks both liveness and readiness in a single response +func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) { + details := make(map[string]any) + ready := true + live := true // Service is always live if it can respond + + // Check NATS connection + natsConnected := s.natsConn != nil && s.natsConn.IsConnected() + details["nats_connected"] = natsConnected + if !natsConnected { + ready = false + } + + // Check Consul connection + consulConnected := false + if s.consulClient != nil { + if leader, err := s.consulClient.Status().Leader(); err == nil && leader != "" { + consulConnected = true + } + } + details["consul_connected"] = consulConnected + if !consulConnected { + ready = false + } + + // Check peer registry readiness (includes ECDH completion) + if s.peerRegistry != nil { + peersReady := s.peerRegistry.ArePeersReady() + majorityReady := s.peerRegistry.AreMajorityReady() + readyCount := s.peerRegistry.GetReadyPeersCount() + totalCount := s.peerRegistry.GetTotalPeersCount() + + details["peers_ready_count"] = fmt.Sprintf("%d/%d", readyCount, totalCount) + details["all_peers_ready"] = peersReady + details["majority_ready"] = majorityReady + + // Node is ready if majority of peers are ready (allows for some fault tolerance) + if !majorityReady { + ready = false + } + } else { + details["peers_available"] = false + ready = false + } + + response := HealthResponse{ + Live: live, + Ready: ready, + Details: details, + } + + w.Header().Set("Content-Type", "application/json") + + if ready { + response.Status = "ready" + w.WriteHeader(http.StatusOK) + } else { + response.Status = "not_ready" + w.WriteHeader(http.StatusServiceUnavailable) + } + + if err := json.NewEncoder(w).Encode(response); err != nil { + logger.Error("Failed to encode health check response", err) + } +}