-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.go
More file actions
926 lines (801 loc) · 29.1 KB
/
main.go
File metadata and controls
926 lines (801 loc) · 29.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
// Package main implements the slacker server.
package main
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"github.com/codeGROOVE-dev/gsm"
"github.com/codeGROOVE-dev/slacker/pkg/bot"
"github.com/codeGROOVE-dev/slacker/pkg/config"
"github.com/codeGROOVE-dev/slacker/pkg/github"
"github.com/codeGROOVE-dev/slacker/pkg/notify"
"github.com/codeGROOVE-dev/slacker/pkg/slack"
"github.com/codeGROOVE-dev/slacker/pkg/state"
"github.com/codeGROOVE-dev/slacker/pkg/usermapping"
"github.com/codeGROOVE-dev/sprinkler/pkg/client"
"github.com/gorilla/mux"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)
// detectGCPProjectID attempts to detect the GCP project ID from the metadata service.
// Returns empty string if not running on GCP or detection fails.
func detectGCPProjectID(ctx context.Context) string {
// Try metadata service (works on Cloud Run, GCE, GKE, Cloud Functions)
httpClient := &http.Client{Timeout: 2 * time.Second}
//nolint:revive // GCP metadata service is internal and always accessed via HTTP
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
"http://metadata.google.internal/computeMetadata/v1/project/project-id", http.NoBody)
if err != nil {
return ""
}
req.Header.Set("Metadata-Flavor", "Google")
resp, err := httpClient.Do(req)
if err != nil {
slog.Debug("metadata service not available (not running on GCP?)", "error", err)
return ""
}
defer func() {
if err := resp.Body.Close(); err != nil {
slog.Debug("failed to close metadata response body", "error", err)
}
}()
if resp.StatusCode != http.StatusOK {
slog.Debug("metadata service returned non-200", "status", resp.StatusCode)
return ""
}
body, err := io.ReadAll(resp.Body)
if err != nil {
slog.Debug("failed to read metadata response", "error", err)
return ""
}
projectID := strings.TrimSpace(string(body))
if projectID == "" {
return ""
}
return projectID
}
// Server configuration constants.
const (
serverReadTimeout = 15 * time.Second
serverWriteTimeout = 15 * time.Second
oauthRateLimiterBurst = 20 // Maximum burst of OAuth requests allowed
)
func main() {
// Configure logging with source locations and instance ID for better debugging
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
AddSource: true,
Level: slog.LevelInfo,
ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr {
// Use basename for source file for cleaner logs
if a.Key == slog.SourceKey {
if source, ok := a.Value.Any().(*slog.Source); ok {
source.File = filepath.Base(source.File)
}
}
return a
},
})
// Create logger with hostname as a default attribute
// In Cloud Run, hostname uniquely identifies each instance (e.g., slacker-abc123-xyz789)
// This is critical for disambiguating instances during rolling deployments
logger := slog.New(logHandler).With("instance", hostname)
slog.SetDefault(logger)
// Load configuration from environment.
cfg, err := loadConfig()
if err != nil {
slog.Error("failed to load configuration", "error", err)
os.Exit(1)
}
ctx, cancel := context.WithCancel(context.Background())
// Run the server and handle exit code
exitCode := run(ctx, cancel, cfg)
cancel() // Cancel context before exit
os.Exit(exitCode)
}
//nolint:revive,maintidx // Complex initialization requires length for clarity and maintainability
func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfig) int {
// Handle graceful shutdown.
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigChan
slog.Warn("🚨 SHUTDOWN SIGNAL RECEIVED 🚨",
"signal", sig.String(),
"signal_number", sig,
"reason", "Cloud Run is shutting down this instance (likely due to inactivity or new deployment)",
"action", "initiating fast shutdown")
cancel()
}()
// Log configuration without secrets.
slog.Info("configuration loaded",
"data_dir", cfg.DataDir,
"sprinkler_url", cfg.SprinklerURL,
"github_app_id", cfg.GitHubAppID,
"has_slack_signing_secret", cfg.SlackSigningSecret != "",
"has_github_private_key", cfg.GitHubPrivateKey != "",
"startup_message", "Starting Slacker server...")
// Initialize config manager for repo configs.
configManager := config.New()
// Initialize GitHub installation manager.
githubManager, err := github.NewManager(ctx, cfg.GitHubAppID, cfg.GitHubPrivateKey, cfg.AllowPersonalAccounts)
if err != nil {
slog.Error("failed to initialize GitHub installation manager", "error", err)
cancel() // Ensure cleanup happens before exit
return 1
}
// Initialize Slack manager for multi-workspace support.
// Tokens are fetched from GSM based on team_id from org configs.
slackManager := slack.NewManager(cfg.SlackSigningSecret)
// Initialize state store (in-memory + Datastore or JSON for persistence).
var stateStore state.Store
// Check if Datastore should be used via DATASTORE=<database-id>
// Examples:
// DATASTORE=slacker -> Use Datastore with database ID "slacker"
// DATASTORE=(default) -> Use default Datastore database
// DATASTORE= -> JSON-only mode (no Datastore)
// (unset) -> JSON-only mode (no Datastore)
datastoreDB := os.Getenv("DATASTORE")
projectID := os.Getenv("GCP_PROJECT")
// Auto-detect project ID from GCP metadata service if Datastore requested
// This works when running on Cloud Run, GCE, GKE, etc.
if datastoreDB != "" && projectID == "" {
projectID = detectGCPProjectID(ctx)
if projectID != "" {
slog.Info("detected GCP project from metadata service",
"project_id", projectID,
"source", "metadata.google.internal")
}
}
if datastoreDB != "" && projectID != "" {
slog.Info("initializing Cloud Datastore for persistent state (with in-memory cache)",
"project_id", projectID,
"database", datastoreDB,
"cache", "in-memory")
var err error
stateStore, err = state.NewDatastoreStore(ctx, projectID, datastoreDB)
if err != nil {
// FATAL: If DATASTORE is explicitly configured, fail startup on initialization errors.
// This prevents silent fallback to memory-only mode which causes duplicate messages
// during rolling deployments (no cross-instance event deduplication).
slog.Error("FATAL: failed to initialize Cloud Datastore - DATASTORE variable is set but initialization failed",
"project_id", projectID,
"database", datastoreDB,
"error", err,
"note", "Set DATASTORE='' to use JSON files instead")
cancel()
return 1
}
slog.Info("successfully initialized Cloud Datastore with in-memory cache",
"project_id", projectID,
"database", datastoreDB,
"mode", "hybrid: in-memory + Datastore")
} else {
var reason string
if datastoreDB == "" {
reason = "DATASTORE not set"
} else {
reason = "GCP_PROJECT not set and could not auto-detect"
}
slog.Info("using JSON files for persistent state (with in-memory cache)",
"path", "os.UserCacheDir()/slacker/state",
"reason", reason,
"mode", "hybrid: in-memory + JSON files")
var err error
stateStore, err = state.NewJSONStore()
if err != nil {
slog.Error("failed to initialize JSON store", "error", err)
cancel()
return 1
}
}
// Ensure state store is closed on exit
defer func() {
if err := stateStore.Close(); err != nil {
slog.Warn("failed to close state store", "error", err)
}
}()
// Set state store on Slack manager for DM message tracking
slackManager.SetStateStore(stateStore)
slog.Info("configured Slack manager with state store for DM tracking")
// Initialize notification manager for multi-workspace notifications.
notifier := notify.New(notify.WrapSlackManager(slackManager), configManager, stateStore)
// Initialize event router for multi-workspace event handling.
eventRouter := slack.NewEventRouter(slackManager)
// Initialize reverse user mapping service (Slack → GitHub)
// Get GitHub token from one of the installations
var githubToken string
for _, org := range githubManager.AllOrgs() {
if ghClient, ok := githubManager.ClientForOrg(org); ok {
githubToken = ghClient.InstallationToken(ctx)
break
}
}
if githubToken == "" {
slog.Warn("no GitHub installations found - reverse user mapping will not work")
}
// Pass nil for Slack client - it will be provided per-request in HomeHandler
reverseMapping := usermapping.NewReverseService(nil, githubToken)
slog.Info("initialized reverse user mapping service (Slack → GitHub)")
// Initialize home view handler
homeHandler := slack.NewHomeHandler(slackManager, githubManager, configManager, stateStore, reverseMapping)
slackManager.SetHomeViewHandler(homeHandler.HandleAppHomeOpened)
// Initialize report handler for /r2r report slash command
reportHandler := slack.NewReportHandler(slackManager, githubManager, stateStore, reverseMapping)
slackManager.SetReportHandler(reportHandler.HandleReportCommand)
// Initialize OAuth handler for Slack app installation.
// These credentials are needed for the OAuth flow.
slackClientID := os.Getenv("SLACK_CLIENT_ID")
slackClientSecret := os.Getenv("SLACK_CLIENT_SECRET")
if slackClientSecret == "" {
// Try fetching from Secret Manager
var err error
slackClientSecret, err = gsm.Fetch(ctx, "SLACK_CLIENT_SECRET")
if err != nil {
slog.Warn("SLACK_CLIENT_SECRET not found - OAuth installation will not work",
"error", err)
}
}
var oauthHandler *slack.OAuthHandler
if slackClientID != "" && slackClientSecret != "" {
oauthHandler = slack.NewOAuthHandler(slackManager, slackClientID, slackClientSecret)
slog.Info("OAuth handler initialized",
"client_id", slackClientID)
} else {
slog.Warn("OAuth not configured - app installation via web will not work",
"has_client_id", slackClientID != "",
"has_client_secret", slackClientSecret != "")
}
// Setup HTTP routes with security middleware.
router := mux.NewRouter()
router.Use(securityHeadersMiddleware)
// Root endpoint - blank
router.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}).Methods("GET")
// Health endpoints
router.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
if _, err := w.Write([]byte("OK")); err != nil {
slog.Error("failed to write health response", "error", err)
}
}).Methods("GET")
router.HandleFunc("/healthz", makeHealthzHandler(githubManager)).Methods("GET")
// Slack OAuth endpoints - for app installation with rate limiting
if oauthHandler != nil {
// SECURITY: Rate limiter for OAuth endpoints: 10 requests per second, burst of oauthRateLimiterBurst
// This prevents abuse while allowing legitimate installation flows
oauthLimiter := rate.NewLimiter(10, oauthRateLimiterBurst)
router.Handle("/slack/install", rateLimitMiddleware(oauthLimiter)(http.HandlerFunc(oauthHandler.HandleInstall))).Methods("GET")
router.Handle("/slack/oauth/callback", rateLimitMiddleware(oauthLimiter)(http.HandlerFunc(oauthHandler.HandleCallback))).Methods("GET")
slog.Info("registered OAuth endpoints with rate limiting",
"install_url", "/slack/install",
"callback_url", "/slack/oauth/callback",
"rate_limit", "10/s burst 20")
}
// Slack endpoints - routed to workspace-specific clients
router.HandleFunc("/slack/events", eventRouter.HandleEvents).Methods("POST")
router.HandleFunc("/slack/interactive-endpoint", eventRouter.HandleInteractions).Methods("POST")
router.HandleFunc("/slack/slash", eventRouter.HandleSlashCommand).Methods("POST")
// Determine port.
port := os.Getenv("PORT")
if port == "" {
port = "9119"
}
// Start server and bot services.
eg, ctx := errgroup.WithContext(ctx)
// HTTP server.
server := &http.Server{
Addr: ":" + port,
Handler: router,
ReadTimeout: serverReadTimeout,
WriteTimeout: serverWriteTimeout,
IdleTimeout: 60 * time.Second,
}
eg.Go(func() error {
slog.Info("starting server", "port", port)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return err
}
return nil
})
eg.Go(func() error {
<-ctx.Done()
slog.Info("shutting down HTTP server")
// Quick shutdown - Cloud Run gives us ~10 seconds, use 2 seconds for HTTP
shutdownCtx, shutdownCancel := context.WithTimeout(context.WithoutCancel(ctx), 2*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
slog.Warn("HTTP server shutdown timeout - forcing close", "error", err)
return nil // Don't fail the errgroup, just move on
}
slog.Info("HTTP server stopped cleanly")
return nil
})
// Start bot coordinators for each GitHub installation.
// This runs indefinitely and handles its own retries - should never return an error
// unless the context is cancelled (clean shutdown).
eg.Go(func() error {
if err := runBotCoordinators(ctx, slackManager, githubManager, configManager, notifier, stateStore, cfg.SprinklerURL); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
// Log unexpected error but don't propagate to errgroup
// (would trigger shutdown of entire server)
slog.Error("bot coordinators stopped unexpectedly", "error", err)
}
return nil
})
// Start notification scheduler.
eg.Go(func() error {
slog.Debug("starting notifier goroutine")
err := notifier.Run(ctx)
slog.Debug("notifier goroutine ended", "error", err)
if errors.Is(err, context.Canceled) {
return nil
}
return err
})
// Wait for all services.
slog.Debug("waiting for all services to complete")
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
slog.Error("server error", "error", err)
return 1
}
slog.Warn("✅ SERVER STOPPED CLEANLY - all services shut down gracefully")
return 0
}
// coordinatorManager holds state for managing bot coordinators across orgs.
type coordinatorManager struct {
slackManager *slack.Manager
githubManager *github.Manager
configManager *config.Manager
notifier *notify.Manager
stateStore state.Store
active map[string]context.CancelFunc
failed map[string]time.Time
lastHealthCheck time.Time
sprinklerURL string
mu sync.Mutex
}
// handleCoordinatorExit cleans up after a coordinator exits.
func (cm *coordinatorManager) handleCoordinatorExit(org, sprinklerURL string, err error) {
// Acquire lock once for all state modifications to prevent race conditions
cm.mu.Lock()
defer cm.mu.Unlock()
if err != nil {
if errors.Is(err, context.Canceled) {
slog.Info("coordinator stopped due to context cancellation", "org", org)
} else {
slog.Error("coordinator exited unexpectedly - THIS SHOULD NOT HAPPEN",
"org", org,
"error", err,
"error_type", fmt.Sprintf("%T", err))
cm.failed[org] = time.Now()
}
} else {
slog.Error("coordinator exited with nil error - THIS SHOULD NOT HAPPEN",
"org", org,
"sprinkler_url", sprinklerURL)
cm.failed[org] = time.Now()
}
delete(cm.active, org)
}
// startSingleCoordinator starts a coordinator for one org.
func (cm *coordinatorManager) startSingleCoordinator(ctx context.Context, org string) bool {
// Skip if already running
if _, exists := cm.active[org]; exists {
return true
}
// Get GitHub client for this org
githubClient, exists := cm.githubManager.ClientForOrg(org)
if !exists {
slog.Warn("no GitHub client for org", "org", org)
cm.failed[org] = time.Now()
return false
}
// Set GitHub client in config manager for this org
cm.configManager.SetGitHubClient(org, githubClient.Client())
// Load config to check if Slack is configured
if err := cm.configManager.LoadConfig(ctx, org); err != nil {
slog.Warn("failed to load config for org", "org", org, "error", err)
return false
}
cfg, exists := cm.configManager.Config(org)
if !exists || cfg.Global.TeamID == "" {
slog.Debug("skipping org without Slack configuration", "org", org)
return false
}
teamID := cfg.Global.TeamID
// Get Slack client for this workspace
slackClient, err := cm.slackManager.Client(ctx, teamID)
if err != nil {
slog.Error("failed to get Slack client for workspace",
"org", org,
"team_id", teamID,
"error", err)
cm.failed[org] = time.Now()
return false
}
// Start coordinator in goroutine with org-specific context
orgCtx, cancel := context.WithCancel(ctx)
cm.active[org] = cancel
// Clear from failed list since we're starting it
delete(cm.failed, org)
// Create coordinator for this org with org-specific context
coordinator := bot.New(
orgCtx,
slackClient,
githubClient,
cm.configManager,
cm.notifier,
cm.sprinklerURL,
cm.stateStore,
)
// Run startup reconciliation to catch up on missed notifications
go func() {
coordinator.StartupReconciliation(orgCtx)
}()
go func(org, teamID string, coord *bot.Coordinator, orgCtx context.Context) {
slog.Info("starting coordinator for org",
"org", org,
"team_id", teamID,
"sprinkler_url", cm.sprinklerURL)
// Start polling goroutine for this coordinator
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-orgCtx.Done():
slog.Info("stopping polling for org", "org", org)
return
case <-ticker.C:
coord.PollAndReconcile(orgCtx)
}
}
}()
err := coord.RunWithSprinklerClient(orgCtx)
cm.handleCoordinatorExit(org, cm.sprinklerURL, err)
}(org, teamID, coordinator, orgCtx)
return true
}
// startCoordinators creates coordinators for all orgs that don't already have one.
func (cm *coordinatorManager) startCoordinators(ctx context.Context) {
cm.mu.Lock()
defer cm.mu.Unlock()
orgs := cm.githubManager.AllOrgs()
slog.Info("checking GitHub installations", "total_orgs", len(orgs))
// Create map of current orgs for quick lookup
currentOrgs := make(map[string]bool)
for _, org := range orgs {
currentOrgs[org] = true
}
// Stop coordinators for orgs that no longer exist
for org, cancel := range cm.active {
if !currentOrgs[org] {
slog.Info("stopping coordinator for removed org", "org", org)
cancel()
delete(cm.active, org)
}
}
// Start coordinators for new orgs
for _, org := range orgs {
cm.startSingleCoordinator(ctx, org)
}
}
// handleShutdown stops all coordinators and returns context error.
func (cm *coordinatorManager) handleShutdown(ctx context.Context) error {
slog.Info("shutdown initiated - stopping all bot coordinators")
cm.mu.Lock()
coordinatorCount := len(cm.active)
for org, cancel := range cm.active {
slog.Info("stopping coordinator", "org", org)
cancel()
}
cm.mu.Unlock()
slog.Info("all coordinators stopped", "count", coordinatorCount)
return ctx.Err()
}
// handleHealthCheck performs health monitoring and fails if unhealthy too long.
func (cm *coordinatorManager) handleHealthCheck() error {
const maxDowntime = 1 * time.Minute
cm.mu.Lock()
activeCount := len(cm.active)
failedCount := len(cm.failed)
totalOrgs := len(cm.githubManager.AllOrgs())
cm.mu.Unlock()
if activeCount == 0 && totalOrgs > 0 {
if !cm.lastHealthCheck.IsZero() && time.Since(cm.lastHealthCheck) > maxDowntime {
slog.Error("FATAL: no active coordinators for too long",
"total_orgs", totalOrgs,
"failed_coordinators", failedCount,
"last_active", cm.lastHealthCheck,
"downtime", time.Since(cm.lastHealthCheck))
return errors.New("no active coordinators for extended period")
}
slog.Warn("no active coordinators - will fail soon",
"total_orgs", totalOrgs,
"failed_coordinators", failedCount,
"time_until_failure", maxDowntime-time.Since(cm.lastHealthCheck))
} else if activeCount > 0 {
cm.lastHealthCheck = time.Now()
slog.Debug("coordinator health check passed",
"active_coordinators", activeCount,
"failed_coordinators", failedCount,
"total_orgs", totalOrgs)
}
return nil
}
// handleRetryFailed retries starting failed coordinators.
func (cm *coordinatorManager) handleRetryFailed(ctx context.Context) {
cm.mu.Lock()
failedCount := len(cm.failed)
cm.mu.Unlock()
if failedCount == 0 {
return
}
slog.Info("retrying failed coordinators", "failed_count", failedCount)
if err := cm.githubManager.RefreshInstallations(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
slog.Error("failed to refresh installations during retry", "error", err)
}
return
}
cm.startCoordinators(ctx)
cm.mu.Lock()
activeCount := len(cm.active)
cm.mu.Unlock()
slog.Info("retry attempt complete", "active_coordinators", activeCount)
}
// handleRefreshInstallations refreshes GitHub installations and restarts coordinators.
func (cm *coordinatorManager) handleRefreshInstallations(ctx context.Context) {
cm.mu.Lock()
activeCount := len(cm.active)
cm.mu.Unlock()
slog.Info("refreshing GitHub installations", "active_coordinators", activeCount)
if err := cm.githubManager.RefreshInstallations(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
slog.Error("failed to refresh installations", "error", err)
}
return
}
cm.startCoordinators(ctx)
cm.mu.Lock()
newActiveCount := len(cm.active)
cm.mu.Unlock()
slog.Info("refresh complete", "active_coordinators", newActiveCount)
}
// runBotCoordinators manages bot coordinators for all GitHub installations.
// It spawns one coordinator per org and refreshes the list every 5 minutes.
// Failed coordinators are automatically restarted every minute.
func runBotCoordinators(
ctx context.Context,
slackManager *slack.Manager,
githubManager *github.Manager,
configManager *config.Manager,
notifier *notify.Manager,
stateStore state.Store,
sprinklerURL string,
) error {
cm := &coordinatorManager{
active: make(map[string]context.CancelFunc),
failed: make(map[string]time.Time),
slackManager: slackManager,
githubManager: githubManager,
configManager: configManager,
notifier: notifier,
stateStore: stateStore,
sprinklerURL: sprinklerURL,
lastHealthCheck: time.Now(),
}
// Start initial coordinators
cm.startCoordinators(ctx)
// Refresh installations every 5 minutes
installationTicker := time.NewTicker(5 * time.Minute)
defer installationTicker.Stop()
// Retry failed coordinators every minute
retryTicker := time.NewTicker(1 * time.Minute)
defer retryTicker.Stop()
// Health check: fail if no coordinators are active for too long
healthCheckTicker := time.NewTicker(15 * time.Second)
defer healthCheckTicker.Stop()
// Poll for PRs every 5 minutes (safety net for missed sprinkler events)
// Daily reports are checked during each poll cycle (6am-11:30am window)
pollTicker := time.NewTicker(5 * time.Minute)
defer pollTicker.Stop()
// Setup state cleanup ticker (hourly)
cleanupTicker := time.NewTicker(1 * time.Hour)
defer cleanupTicker.Stop()
// Run cleanup once on startup
//nolint:contextcheck // Background cleanup should complete even during shutdown
go func() {
if err := stateStore.Cleanup(context.Background()); err != nil {
slog.Warn("initial state cleanup failed", "error", err)
}
}()
for {
select {
case <-ctx.Done():
return cm.handleShutdown(ctx)
case <-healthCheckTicker.C:
if err := cm.handleHealthCheck(); err != nil {
return err
}
case <-retryTicker.C:
cm.handleRetryFailed(ctx)
case <-installationTicker.C:
cm.handleRefreshInstallations(ctx)
case <-pollTicker.C:
// Poll all active coordinators (includes daily report checks)
cm.handlePolling(ctx)
case <-cleanupTicker.C:
// Periodic cleanup of old state data
//nolint:contextcheck // Background cleanup should complete even during shutdown
go func() {
if err := stateStore.Cleanup(context.Background()); err != nil {
slog.Warn("state cleanup failed", "error", err)
} else {
slog.Debug("state cleanup completed successfully")
}
}()
}
}
}
// handlePolling triggers polling for all active coordinators.
func (cm *coordinatorManager) handlePolling(_ context.Context) {
cm.mu.Lock()
activeCount := len(cm.active)
cm.mu.Unlock()
if activeCount == 0 {
slog.Debug("no active coordinators to poll")
return
}
slog.Debug("triggering PR polling for all coordinators",
"active_count", activeCount)
// Polling is handled per-coordinator in their own goroutines
// We rely on each coordinator to implement pollAndReconcile
// For now, this is a placeholder - actual implementation would need
// access to coordinators or a different architecture
}
func loadConfig() (*config.ServerConfig, error) {
ctx := context.Background()
// Helper function to get secret values
// Environment variables take precedence, then Secret Manager
getSecretValue := func(name string) string {
// Environment variable takes precedence
if value := os.Getenv(name); value != "" {
slog.Info("using environment variable",
"name", name,
"source", "environment")
return value
}
// Try Secret Manager using gsm library
slog.Info("attempting to fetch secret from Secret Manager",
"name", name)
value, err := gsm.Fetch(ctx, name)
if err != nil {
slog.Error("failed to fetch secret from Secret Manager",
"name", name,
"error", err)
return ""
}
slog.Info("successfully fetched secret from Secret Manager",
"name", name,
"has_value", value != "")
return value
}
// Load GitHub private key from environment, file, or Secret Manager
githubPrivateKey := getSecretValue("GITHUB_PRIVATE_KEY")
if githubPrivateKey == "" {
if keyPath := os.Getenv("GITHUB_PRIVATE_KEY_PATH"); keyPath != "" {
keyData, err := os.ReadFile(keyPath)
if err != nil {
return nil, fmt.Errorf("failed to read GitHub private key from %s: %w", keyPath, err)
}
githubPrivateKey = string(keyData)
}
}
// Set defaults for optional config
dataDir := os.Getenv("DATA_DIR")
if dataDir == "" {
configDir, err := os.UserConfigDir()
if err != nil {
return nil, fmt.Errorf("failed to get user config directory: %w", err)
}
dataDir = filepath.Join(configDir, "slacker")
}
sprinklerURL := os.Getenv("SPRINKLER_URL")
if sprinklerURL == "" {
sprinklerURL = "wss://" + client.DefaultServerAddress + "/ws"
}
slog.Info("loading configuration values")
// Parse personal accounts flag (default: false for DoS protection)
allowPersonalAccounts := os.Getenv("ALLOW_PERSONAL_ACCOUNTS") == "true"
cfg := &config.ServerConfig{
DataDir: dataDir,
SlackSigningSecret: getSecretValue("SLACK_SIGNING_SECRET"),
GitHubAppID: os.Getenv("GITHUB_APP_ID"), // Not a secret, just config
GitHubPrivateKey: githubPrivateKey,
SprinklerURL: sprinklerURL,
AllowPersonalAccounts: allowPersonalAccounts,
}
slog.Info("configuration loaded",
"has_slack_signing_secret", cfg.SlackSigningSecret != "",
"has_github_app_id", cfg.GitHubAppID != "",
"has_github_private_key", cfg.GitHubPrivateKey != "",
"allow_personal_accounts", cfg.AllowPersonalAccounts)
// Validate required fields
if cfg.SlackSigningSecret == "" {
return nil, errors.New("missing required configuration: SLACK_SIGNING_SECRET (env var or secret)")
}
if cfg.GitHubAppID == "" {
return nil, errors.New("missing required environment variable: GITHUB_APP_ID")
}
if cfg.GitHubPrivateKey == "" {
return nil, errors.New("missing required configuration: GITHUB_PRIVATE_KEY (env var, file, or secret)")
}
return cfg, nil
}
// makeHealthzHandler creates a more detailed health check that verifies coordinators are running.
// This is useful for Cloud Run liveness checks.
func makeHealthzHandler(githubManager *github.Manager) http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
orgs := githubManager.AllOrgs()
// If we have GitHub installations configured, we should have coordinators
if len(orgs) == 0 {
// No orgs configured yet - this is OK during startup
w.WriteHeader(http.StatusOK)
if _, err := w.Write([]byte("OK - no orgs configured")); err != nil {
slog.Error("failed to write healthz response", "error", err)
}
return
}
// We have orgs - assume coordinators should be running
// (This is a basic check - the coordinator health check ticker provides more detailed monitoring)
w.WriteHeader(http.StatusOK)
response := fmt.Sprintf("OK - %d orgs configured", len(orgs))
if _, err := w.Write([]byte(response)); err != nil {
slog.Error("failed to write healthz response", "error", err)
}
}
}
// securityHeadersMiddleware adds security headers to all responses.
func securityHeadersMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Security headers to prevent common attacks.
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("X-Frame-Options", "DENY")
w.Header().Set("X-XSS-Protection", "1; mode=block")
w.Header().Set("Strict-Transport-Security", "max-age=31536000; includeSubDomains")
w.Header().Set("Content-Security-Policy", "default-src 'none'; frame-ancestors 'none';")
w.Header().Set("Referrer-Policy", "no-referrer")
next.ServeHTTP(w, r)
})
}
// rateLimitMiddleware applies rate limiting to prevent abuse.
func rateLimitMiddleware(limiter *rate.Limiter) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
http.Error(w, "Too many requests - please try again later", http.StatusTooManyRequests)
slog.Warn("rate limit exceeded", "path", r.URL.Path, "remote_addr", r.RemoteAddr)
return
}
next.ServeHTTP(w, r)
})
}
}