-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.go
More file actions
126 lines (110 loc) · 4.56 KB
/
main.go
File metadata and controls
126 lines (110 loc) · 4.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
import (
"context"
"flag"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/bootjp/elastickv/proxy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const (
sentryFlushTimeout = 2 * time.Second
metricsShutdownTimeout = 5 * time.Second
)
func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}
func run() error {
cfg := proxy.DefaultConfig()
var modeStr string
flag.StringVar(&cfg.ListenAddr, "listen", cfg.ListenAddr, "Proxy listen address")
flag.StringVar(&cfg.PrimaryAddr, "primary", cfg.PrimaryAddr, "Primary (Redis) address")
flag.IntVar(&cfg.PrimaryDB, "primary-db", cfg.PrimaryDB, "Primary Redis DB number")
flag.StringVar(&cfg.PrimaryPassword, "primary-password", cfg.PrimaryPassword, "Primary Redis password")
flag.StringVar(&cfg.SecondaryAddr, "secondary", cfg.SecondaryAddr, "Secondary (ElasticKV) address")
flag.IntVar(&cfg.SecondaryDB, "secondary-db", cfg.SecondaryDB, "Secondary Redis DB number")
flag.StringVar(&cfg.SecondaryPassword, "secondary-password", cfg.SecondaryPassword, "Secondary Redis password")
flag.StringVar(&modeStr, "mode", "dual-write", "Proxy mode: redis-only, dual-write, dual-write-shadow, elastickv-primary, elastickv-only")
flag.DurationVar(&cfg.SecondaryTimeout, "secondary-timeout", cfg.SecondaryTimeout, "Secondary write timeout")
flag.DurationVar(&cfg.ShadowTimeout, "shadow-timeout", cfg.ShadowTimeout, "Shadow read timeout")
flag.StringVar(&cfg.SentryDSN, "sentry-dsn", cfg.SentryDSN, "Sentry DSN (empty = disabled)")
flag.StringVar(&cfg.SentryEnv, "sentry-env", cfg.SentryEnv, "Sentry environment")
flag.Float64Var(&cfg.SentrySampleRate, "sentry-sample", cfg.SentrySampleRate, "Sentry sample rate")
flag.StringVar(&cfg.MetricsAddr, "metrics", cfg.MetricsAddr, "Prometheus metrics address")
flag.Parse()
mode, ok := proxy.ParseProxyMode(modeStr)
if !ok {
return fmt.Errorf("unknown mode: %s", modeStr)
}
cfg.Mode = mode
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
// Sentry
sentryReporter := proxy.NewSentryReporter(cfg.SentryDSN, cfg.SentryEnv, cfg.SentrySampleRate, logger)
defer sentryReporter.Flush(sentryFlushTimeout)
// Prometheus
reg := prometheus.NewRegistry()
metrics := proxy.NewProxyMetrics(reg)
// Backends
primaryOpts := proxy.DefaultBackendOptions()
primaryOpts.DB = cfg.PrimaryDB
primaryOpts.Password = cfg.PrimaryPassword
secondaryOpts := proxy.DefaultBackendOptions()
secondaryOpts.DB = cfg.SecondaryDB
secondaryOpts.Password = cfg.SecondaryPassword
var primary, secondary proxy.Backend
switch cfg.Mode {
case proxy.ModeElasticKVPrimary, proxy.ModeElasticKVOnly:
primary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts)
secondary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts)
case proxy.ModeRedisOnly, proxy.ModeDualWrite, proxy.ModeDualWriteShadow:
primary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts)
secondary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts)
}
defer primary.Close()
defer secondary.Close()
dual := proxy.NewDualWriter(primary, secondary, cfg, metrics, sentryReporter, logger)
defer dual.Close() // wait for in-flight async goroutines
srv := proxy.NewProxyServer(cfg, dual, metrics, sentryReporter, logger)
// Context for graceful shutdown
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
// Start metrics server
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
var lc net.ListenConfig
ln, err := lc.Listen(ctx, "tcp", cfg.MetricsAddr)
if err != nil {
logger.Error("metrics listen failed", "addr", cfg.MetricsAddr, "err", err)
return
}
metricsSrv := &http.Server{Handler: mux, ReadHeaderTimeout: time.Second}
go func() {
<-ctx.Done()
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), metricsShutdownTimeout)
defer shutdownCancel()
if err := metricsSrv.Shutdown(shutdownCtx); err != nil {
logger.Warn("metrics server shutdown error", "err", err)
}
}()
logger.Info("metrics server starting", "addr", cfg.MetricsAddr)
if err := metricsSrv.Serve(ln); err != nil && err != http.ErrServerClosed {
logger.Error("metrics server error", "err", err)
}
}()
// Start proxy
if err := srv.ListenAndServe(ctx); err != nil {
return fmt.Errorf("proxy server: %w", err)
}
return nil
}