Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
3576c51
Add redis-proxy: dual-write Redis proxy with pub/sub forwarding
bootjp Mar 17, 2026
06736e9
Potential fix for pull request finding
bootjp Mar 17, 2026
2a3bd9f
Merge branch 'feature/redis' into feature/redis-proxy
bootjp Mar 17, 2026
f35c41d
Address PR #351 review feedback
bootjp Mar 17, 2026
4bc099d
Fix binary data handling, type safety, and Sentry flood protection
bootjp Mar 17, 2026
411cf06
Support normal command mode after pub/sub unsubscribe
bootjp Mar 17, 2026
7bc0300
Replace interface{} with any in codebase
bootjp Mar 17, 2026
fca3334
Merge branch 'feature/redis' into feature/redis-proxy
bootjp Mar 17, 2026
74e0f44
Potential fix for pull request finding
bootjp Mar 17, 2026
be85bca
Merge branch 'main' into feature/redis-proxy
bootjp Mar 17, 2026
534a39b
Fix race condition, error normalization, and security concerns
bootjp Mar 17, 2026
c005cf0
Add DB/password config for backends and ignore SELECT/AUTH commands
bootjp Mar 17, 2026
4308912
Enhance pubsub with idempotent subscription handling
bootjp Mar 17, 2026
18e1aaa
Refactor error handling in pubsub session
bootjp Mar 17, 2026
1a70ea0
Refactor pubsub test writes with tagged types
bootjp Mar 17, 2026
f065da7
Fix execTxn to prioritize pipeline-level errors over results
bootjp Mar 17, 2026
9346cee
Address review: writeMu separation, graceful shutdown, log truncation…
bootjp Mar 17, 2026
721087b
Truncate migration-gap log key and optimize truncateValue for large v…
bootjp Mar 17, 2026
1f9334f
Fix truncateValue []byte marker, bounded cleanup wait, mock Close race
bootjp Mar 17, 2026
69a4439
Bound exitPubSubMode fwdDone wait and use WriteInt64 for sub counts
bootjp Mar 17, 2026
11609b0
Potential fix for pull request finding
bootjp Mar 17, 2026
da95a8c
Potential fix for pull request finding
bootjp Mar 17, 2026
50343d9
Fix CmdPubSub comment, align SUBSCRIBE-in-txn test with new queuing b…
bootjp Mar 17, 2026
f1b0350
Bound metrics shutdown, return error on upstream unsub failure, fix m…
bootjp Mar 17, 2026
3d38cd3
Potential fix for pull request finding
bootjp Mar 17, 2026
001fbd3
Initial plan
Copilot Mar 17, 2026
2f2f0ce
proxy: fix exhaustive lint error in truncateValue by using if/else in…
Copilot Mar 17, 2026
1f2f00d
proxy/sentry: fix exhaustive lint error in truncateValue
Copilot Mar 17, 2026
20c798f
Merge pull request #352 from bootjp/copilot/sub-pr-351
bootjp Mar 17, 2026
7c634c1
Close dconn on fwd timeout to unblock writeMu, fix exhaustive lint
bootjp Mar 17, 2026
77ee8a1
Use switch for reflect.Kind to satisfy staticcheck with exhaustive no…
bootjp Mar 17, 2026
f389fc6
Fix Pipeline redis.Error wrapping and separate state from writeMu
bootjp Mar 18, 2026
70b14f4
Add shadow subscribe for pub/sub divergence detection
bootjp Mar 18, 2026
493f80c
Potential fix for pull request finding
bootjp Mar 18, 2026
a998315
Potential fix for pull request finding
bootjp Mar 18, 2026
ec3d20a
Fix shadow Close deadlock, shadow data race, remove unused method
bootjp Mar 18, 2026
ca0bea1
Potential fix for pull request finding
bootjp Mar 18, 2026
d7b675c
Potential fix for pull request finding
bootjp Mar 18, 2026
75a5209
Fix shadow pubsub review issues: msgKey Pattern, lock scope, sweepAll…
bootjp Mar 18, 2026
6b8da88
Potential fix for pull request finding
bootjp Mar 18, 2026
2746640
Potential fix for pull request finding
bootjp Mar 18, 2026
cd9ffed
Potential fix for pull request finding
bootjp Mar 18, 2026
3cce409
Potential fix for pull request finding
bootjp Mar 18, 2026
1568458
Review fixes: correctness, concurrency, performance, and test coverage
bootjp Mar 19, 2026
3785deb
Fix ExtraOnSecondary test for buffered matchSecondary
bootjp Mar 19, 2026
4d23559
Address review feedback: command table, shadow pubsub, mu comment
bootjp Mar 19, 2026
6aabb03
Fix bounded wait in cleanup/exitPubSubMode and sweepAll secondary drain
bootjp Mar 19, 2026
7516597
Fix compareLoop channel close leak and secondary isPattern reporting
bootjp Mar 19, 2026
3d9305b
Merge branch 'main' into feature/redis-proxy
bootjp Mar 19, 2026
0c6f3bc
proxy: address review — deterministic clock, sweep order, flush leak
bootjp Mar 19, 2026
d509459
Potential fix for pull request finding
bootjp Mar 19, 2026
5b37df0
Potential fix for pull request finding
bootjp Mar 19, 2026
0669dcf
Handle Flush errors consistently and guard async scheduling on shutdown
bootjp Mar 19, 2026
4396432
Fix reply ordering in reenterPubSub, guard Start with sync.Once, remo…
bootjp Mar 19, 2026
7667d27
Initial plan
Copilot Mar 19, 2026
8fc070d
fix: replace atomic closing flag with mutex to prevent WaitGroup Add/…
Copilot Mar 19, 2026
f775a3c
Merge pull request #355 from bootjp/copilot/sub-pr-351
bootjp Mar 19, 2026
a603e80
Initial plan
Copilot Mar 19, 2026
8fc06c6
fix: use incrementing subscription counts in SUBSCRIBE/PSUBSCRIBE rep…
Copilot Mar 19, 2026
957e134
Merge pull request #356 from bootjp/copilot/sub-pr-351
bootjp Mar 19, 2026
857e72e
Initial plan
Copilot Mar 19, 2026
ed3e70b
fix: enforce comparison window in matchSecondary, avoid alloc in swee…
Copilot Mar 19, 2026
73cf60e
Merge pull request #357 from bootjp/copilot/sub-pr-351
bootjp Mar 20, 2026
077581a
Initial plan
Copilot Mar 20, 2026
af1bc81
Fix reconcilePrimaries window enforcement, SELECT consistency, counte…
Copilot Mar 20, 2026
5e89aa7
Fix golangci-lint: extract handleProxySpecialCommand to reduce cyclop…
Copilot Mar 20, 2026
7b18629
Merge pull request #358 from bootjp/copilot/sub-pr-351
bootjp Mar 20, 2026
b3262b7
Initial plan
Copilot Mar 20, 2026
9d7377a
Fix RecordPrimary immediate reconciliation and misleading unsubscribe…
Copilot Mar 20, 2026
e8d274a
test: add TestShadowPubSub_SecondaryBeforePrimaryImmediateReconcile
Copilot Mar 20, 2026
13c3a3d
Merge pull request #359 from bootjp/copilot/sub-pr-351
bootjp Mar 20, 2026
b753466
Initial plan
Copilot Mar 20, 2026
b61d0d0
Merge branch 'main' into feature/redis-proxy
bootjp Mar 20, 2026
a06f845
Merge branch 'feature/redis-proxy' into copilot/sub-pr-351
bootjp Mar 20, 2026
760deef
Fix divergenceEvent pattern propagation, flush close flag, and window…
Copilot Mar 20, 2026
e42e4ba
refactor: extract reconcileWithBufferedSecondary to reduce nestif com…
Copilot Mar 20, 2026
5085aaa
Merge pull request #362 from bootjp/copilot/sub-pr-351
bootjp Mar 20, 2026
db225ad
Initial plan
Copilot Mar 20, 2026
ef0c97b
refactor: gate ShouldReport on enabled, move unmatchedSecondaries to …
Copilot Mar 20, 2026
28c1d9b
fix: gci formatting in proxy_test.go (double space before comment)
Copilot Mar 20, 2026
4866d15
Merge pull request #363 from bootjp/copilot/sub-pr-351
bootjp Mar 20, 2026
2787117
Initial plan
Copilot Mar 20, 2026
3432ab3
redis: leader-route demo reads and fail invalid Jepsen runs
bootjp Mar 20, 2026
693b307
fix: use deterministic fake clock in TestShadowPubSub_DuplicateSecond…
Copilot Mar 20, 2026
0dc4c64
Merge branch 'feature/redis-proxy' into copilot/sub-pr-351
bootjp Mar 20, 2026
13681ac
Merge pull request #364 from bootjp/copilot/sub-pr-351
bootjp Mar 20, 2026
68a9aef
Update proxy/pubsub.go
bootjp Mar 20, 2026
f6c255e
Update proxy/pubsub_test.go
bootjp Mar 20, 2026
0755d1b
Initial plan
Copilot Mar 20, 2026
9a322a5
Fix matchSecondary window scan, boundary consistency, and SELECT test…
Copilot Mar 20, 2026
1a76dd1
fix: apply gci formatting to shadow_pubsub_test.go
Copilot Mar 20, 2026
d3c415d
Merge pull request #365 from bootjp/copilot/sub-pr-351
bootjp Mar 20, 2026
2bd838e
Initial plan
Copilot Mar 20, 2026
96f2c4f
fix: sort writeUnsubAll names for determinism; fix MatchSecondaryMidd…
Copilot Mar 20, 2026
39bf86d
Merge pull request #366 from bootjp/copilot/sub-pr-351
bootjp Mar 20, 2026
dc10487
Update proxy/proxy.go
bootjp Mar 20, 2026
1cff1e8
Update proxy/pubsub.go
bootjp Mar 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions cmd/redis-proxy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package main

import (
"context"
"flag"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/bootjp/elastickv/proxy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

const sentryFlushTimeout = 2 * time.Second

func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}

func run() error {
cfg := proxy.DefaultConfig()
var modeStr string

flag.StringVar(&cfg.ListenAddr, "listen", cfg.ListenAddr, "Proxy listen address")
flag.StringVar(&cfg.PrimaryAddr, "primary", cfg.PrimaryAddr, "Primary (Redis) address")
flag.IntVar(&cfg.PrimaryDB, "primary-db", cfg.PrimaryDB, "Primary Redis DB number")
flag.StringVar(&cfg.PrimaryPassword, "primary-password", cfg.PrimaryPassword, "Primary Redis password")
flag.StringVar(&cfg.SecondaryAddr, "secondary", cfg.SecondaryAddr, "Secondary (ElasticKV) address")
flag.IntVar(&cfg.SecondaryDB, "secondary-db", cfg.SecondaryDB, "Secondary Redis DB number")
flag.StringVar(&cfg.SecondaryPassword, "secondary-password", cfg.SecondaryPassword, "Secondary Redis password")
flag.StringVar(&modeStr, "mode", "dual-write", "Proxy mode: redis-only, dual-write, dual-write-shadow, elastickv-primary, elastickv-only")
flag.DurationVar(&cfg.SecondaryTimeout, "secondary-timeout", cfg.SecondaryTimeout, "Secondary write timeout")
flag.DurationVar(&cfg.ShadowTimeout, "shadow-timeout", cfg.ShadowTimeout, "Shadow read timeout")
flag.StringVar(&cfg.SentryDSN, "sentry-dsn", cfg.SentryDSN, "Sentry DSN (empty = disabled)")
flag.StringVar(&cfg.SentryEnv, "sentry-env", cfg.SentryEnv, "Sentry environment")
flag.Float64Var(&cfg.SentrySampleRate, "sentry-sample", cfg.SentrySampleRate, "Sentry sample rate")
flag.StringVar(&cfg.MetricsAddr, "metrics", cfg.MetricsAddr, "Prometheus metrics address")
flag.Parse()

mode, ok := proxy.ParseProxyMode(modeStr)
if !ok {
return fmt.Errorf("unknown mode: %s", modeStr)
}
cfg.Mode = mode

logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))

// Sentry
sentryReporter := proxy.NewSentryReporter(cfg.SentryDSN, cfg.SentryEnv, cfg.SentrySampleRate, logger)
defer sentryReporter.Flush(sentryFlushTimeout)

// Prometheus
reg := prometheus.NewRegistry()
metrics := proxy.NewProxyMetrics(reg)

// Backends
primaryOpts := proxy.DefaultBackendOptions()
primaryOpts.DB = cfg.PrimaryDB
primaryOpts.Password = cfg.PrimaryPassword
secondaryOpts := proxy.DefaultBackendOptions()
secondaryOpts.DB = cfg.SecondaryDB
secondaryOpts.Password = cfg.SecondaryPassword

var primary, secondary proxy.Backend
switch cfg.Mode {
case proxy.ModeElasticKVPrimary, proxy.ModeElasticKVOnly:
primary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts)
secondary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts)
case proxy.ModeRedisOnly, proxy.ModeDualWrite, proxy.ModeDualWriteShadow:
primary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts)
secondary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts)
}
defer primary.Close()
defer secondary.Close()

dual := proxy.NewDualWriter(primary, secondary, cfg, metrics, sentryReporter, logger)
srv := proxy.NewProxyServer(cfg, dual, metrics, sentryReporter, logger)

// Context for graceful shutdown
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

// Start metrics server
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
var lc net.ListenConfig
ln, err := lc.Listen(ctx, "tcp", cfg.MetricsAddr)
if err != nil {
logger.Error("metrics listen failed", "addr", cfg.MetricsAddr, "err", err)
return
}
metricsSrv := &http.Server{Handler: mux, ReadHeaderTimeout: time.Second}
go func() {
<-ctx.Done()
if err := metricsSrv.Shutdown(context.Background()); err != nil {
Comment thread
bootjp marked this conversation as resolved.
Outdated
logger.Warn("metrics server shutdown error", "err", err)
}
}()
logger.Info("metrics server starting", "addr", cfg.MetricsAddr)
if err := metricsSrv.Serve(ln); err != nil && err != http.ErrServerClosed {
logger.Error("metrics server error", "err", err)
}
}()

// Start proxy
if err := srv.ListenAndServe(ctx); err != nil {
return fmt.Errorf("proxy server: %w", err)
}
return nil
}
114 changes: 114 additions & 0 deletions proxy/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package proxy

import (
"context"
"fmt"
"time"

"github.com/redis/go-redis/v9"
)

const (
defaultPoolSize = 128
defaultDialTimeout = 5 * time.Second
defaultReadTimeout = 3 * time.Second
defaultWriteTimeout = 3 * time.Second
)

// Backend abstracts a Redis-protocol endpoint (real Redis or ElasticKV).
type Backend interface {
// Do sends a single command and returns its result.
Do(ctx context.Context, args ...any) *redis.Cmd
// Pipeline sends multiple commands in a pipeline.
Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd, error)
// Close releases the underlying connection.
Close() error
// Name identifies this backend for logging and metrics.
Name() string
}

// BackendOptions configures the underlying go-redis connection pool.
type BackendOptions struct {
DB int
Password string
PoolSize int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
}

// DefaultBackendOptions returns reasonable defaults for a proxy backend.
func DefaultBackendOptions() BackendOptions {
return BackendOptions{
PoolSize: defaultPoolSize,
DialTimeout: defaultDialTimeout,
ReadTimeout: defaultReadTimeout,
WriteTimeout: defaultWriteTimeout,
}
}

// PubSubBackend is an optional interface for backends that support
// creating dedicated PubSub connections.
type PubSubBackend interface {
NewPubSub(ctx context.Context) *redis.PubSub
}

// RedisBackend connects to an upstream Redis instance via go-redis.
type RedisBackend struct {
client *redis.Client
name string
}

// NewRedisBackend creates a Backend targeting a Redis server with default pool options.
func NewRedisBackend(addr string, name string) *RedisBackend {
return NewRedisBackendWithOptions(addr, name, DefaultBackendOptions())
}

// NewRedisBackendWithOptions creates a Backend with explicit pool configuration.
func NewRedisBackendWithOptions(addr string, name string, opts BackendOptions) *RedisBackend {
return &RedisBackend{
client: redis.NewClient(&redis.Options{
Addr: addr,
DB: opts.DB,
Password: opts.Password,
PoolSize: opts.PoolSize,
DialTimeout: opts.DialTimeout,
ReadTimeout: opts.ReadTimeout,
WriteTimeout: opts.WriteTimeout,
}),
name: name,
}
}

func (b *RedisBackend) Do(ctx context.Context, args ...any) *redis.Cmd {
return b.client.Do(ctx, args...)
}

func (b *RedisBackend) Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd, error) {
pipe := b.client.Pipeline()
results := make([]*redis.Cmd, len(cmds))
for i, args := range cmds {
results[i] = pipe.Do(ctx, args...)
}
_, err := pipe.Exec(ctx)
if err != nil {
return results, fmt.Errorf("pipeline exec: %w", err)
}
return results, nil
Comment thread
bootjp marked this conversation as resolved.
}

func (b *RedisBackend) Close() error {
if err := b.client.Close(); err != nil {
return fmt.Errorf("close %s backend: %w", b.name, err)
}
return nil
}

func (b *RedisBackend) Name() string {
return b.name
}

// NewPubSub creates a dedicated PubSub connection (not from the pool).
func (b *RedisBackend) NewPubSub(ctx context.Context) *redis.PubSub {
return b.client.Subscribe(ctx)
}
146 changes: 146 additions & 0 deletions proxy/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package proxy

import "strings"

// CommandCategory classifies a Redis command for routing purposes.
type CommandCategory int

const (
CmdRead CommandCategory = iota // GET, HGET, LRANGE, ZRANGE, etc.
CmdWrite // SET, DEL, HSET, LPUSH, ZADD, etc.
CmdBlocking // BZPOPMIN, XREAD (with BLOCK)
CmdPubSub // SUBSCRIBE, PUBLISH, PUBSUB
Comment thread
bootjp marked this conversation as resolved.
Outdated
CmdAdmin // PING, INFO, CLIENT, SELECT, QUIT, DBSIZE, SCAN, AUTH
CmdTxn // MULTI, EXEC, DISCARD
CmdScript // EVAL, EVALSHA
)

var commandTable = map[string]CommandCategory{
// Read commands
"GET": CmdRead,
"GETDEL": CmdWrite, // read+write → write
"HGET": CmdRead,
"HGETALL": CmdRead,
"HEXISTS": CmdRead,
"HLEN": CmdRead,
"HMGET": CmdRead,
"EXISTS": CmdRead,
"KEYS": CmdRead,
"LINDEX": CmdRead,
"LLEN": CmdRead,
"LPOS": CmdRead,
"LRANGE": CmdRead,
"PTTL": CmdRead,
"TTL": CmdRead,
"TYPE": CmdRead,
"SCARD": CmdRead,
"SISMEMBER": CmdRead,
"SMEMBERS": CmdRead,
"XLEN": CmdRead,
"XRANGE": CmdRead,
"XREVRANGE": CmdRead,
"ZCARD": CmdRead,
"ZCOUNT": CmdRead,
"ZRANGE": CmdRead,
"ZRANGEBYSCORE": CmdRead,
"ZREVRANGE": CmdRead,
"ZREVRANGEBYSCORE": CmdRead,
"ZSCORE": CmdRead,
"PFCOUNT": CmdRead,

// Write commands
"SET": CmdWrite,
"SETEX": CmdWrite,
"SETNX": CmdWrite,
"DEL": CmdWrite,
"HSET": CmdWrite,
"HMSET": CmdWrite,
"HDEL": CmdWrite,
"HINCRBY": CmdWrite,
"INCR": CmdWrite,
"LPUSH": CmdWrite,
"LPOP": CmdWrite,
"RPUSH": CmdWrite,
"RPOP": CmdWrite,
"RPOPLPUSH": CmdWrite,
"LREM": CmdWrite,
"LSET": CmdWrite,
"LTRIM": CmdWrite,
"SADD": CmdWrite,
"SREM": CmdWrite,
"EXPIRE": CmdWrite,
"PEXPIRE": CmdWrite,
"RENAME": CmdWrite,
"XADD": CmdWrite,
"XTRIM": CmdWrite,
"ZADD": CmdWrite,
"ZINCRBY": CmdWrite,
"ZREM": CmdWrite,
"ZREMRANGEBYSCORE": CmdWrite,
"ZREMRANGEBYRANK": CmdWrite,
"ZPOPMIN": CmdWrite,
"PFADD": CmdWrite,
"FLUSHALL": CmdWrite,
"FLUSHDB": CmdWrite,
"PUBLISH": CmdWrite, // write to both backends

// Blocking commands
"BZPOPMIN": CmdBlocking,
// XREAD is handled specially in ClassifyCommand (BLOCK arg check)

// PubSub commands
"SUBSCRIBE": CmdPubSub,
"UNSUBSCRIBE": CmdPubSub,
"PSUBSCRIBE": CmdPubSub,
"PUNSUBSCRIBE": CmdPubSub,
"PUBSUB": CmdPubSub,

// Admin commands — forwarded to primary only
"PING": CmdAdmin,
"INFO": CmdAdmin,
"CLIENT": CmdAdmin,
"SELECT": CmdAdmin,
"QUIT": CmdAdmin,
"DBSIZE": CmdAdmin,
"SCAN": CmdAdmin,
"AUTH": CmdAdmin,
Comment thread
bootjp marked this conversation as resolved.
"HELLO": CmdAdmin,
"WAIT": CmdAdmin,
"CONFIG": CmdAdmin,
"OBJECT": CmdAdmin,
"DEBUG": CmdAdmin,
"CLUSTER": CmdAdmin,
"COMMAND": CmdAdmin,

// Transaction commands
"MULTI": CmdTxn,
"EXEC": CmdTxn,
"DISCARD": CmdTxn,

// Script commands
"EVAL": CmdScript,
"EVALSHA": CmdScript,
}

// ClassifyCommand returns the category for a Redis command name.
// XREAD is classified as CmdBlocking if args contain BLOCK, otherwise CmdRead.
// Unknown commands default to CmdWrite (sent to both backends).
func ClassifyCommand(name string, args [][]byte) CommandCategory {
upper := strings.ToUpper(name)

// Special case: XREAD with BLOCK
if upper == "XREAD" {
for _, arg := range args {
if strings.ToUpper(string(arg)) == "BLOCK" {
return CmdBlocking
}
}
return CmdRead
}

if cat, ok := commandTable[upper]; ok {
return cat
}
// Unknown commands → treat as write (safe default, sent to both backends)
return CmdWrite
}
Loading
Loading