Skip to content

Commit 730d451

Browse files
Merge pull request #80 from jacobecox/main
Updated cpln-task-runner
2 parents 7967a0f + ec6b53b commit 730d451

4 files changed

Lines changed: 123 additions & 42 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
.claude
22
docker-compose.yml
33
./cpln-task-runner
4-
4+
helm/
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Helm charts (generated/customized per deployment)
2+
helm/
3+
4+
# Binary
5+
cpln-task-runner
6+
7+
# IDE
8+
.idea/
9+
.vscode/
10+
*.swp
11+
*.swo
12+
13+
# OS
14+
.DS_Store
15+
16+
# Claude/Cursor
17+
.claude/
18+

examples/cpln-task-runner/README.md

Lines changed: 74 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
# Task Dispatcher
22

3-
A **self-hosted task queue and scheduler service** that mimics **Google Cloud Tasks** functionality, built with Go and Redis. This is designed as a drop-in replacement for Cloud Tasks when you want more control, flexibility, or cost savings.
3+
A **self-hosted task queue and scheduler service** that mimics **Google Cloud Tasks** functionality, built with Go and Redis Sentinel. This is designed as a drop-in replacement for Cloud Tasks when you want more control, flexibility, or cost savings.
44

55
## Overview
66

7-
Task Dispatcher accepts HTTP requests to enqueue background jobs, stores them in Redis, and processes them asynchronously by making HTTP calls to target endpoints. It supports delayed execution, priority queues, automatic retries, and concurrency control.
7+
Task Dispatcher accepts HTTP requests to enqueue background jobs, stores them in Redis (via Sentinel for high availability), and processes them asynchronously by making HTTP calls to target endpoints. It supports delayed execution, priority queues, automatic retries, and rate limiting per client.
88

99
## Features
1010

@@ -13,15 +13,19 @@ Task Dispatcher accepts HTTP requests to enqueue background jobs, stores them in
1313
-**Priority Queues** - critical, default, and low priority levels
1414
-**Automatic Retries** - up to 5 retries with exponential backoff
1515
-**Task Timeouts** - 30-minute default timeout per task
16+
-**Redis Sentinel** - high availability with automatic master failover
17+
-**Per-Client Rate Limiting** - sliding window rate limits per client
18+
-**Circuit Breaker** - per-host circuit breaker for downstream protection
1619
-**Web Dashboard** - Asynqmon UI for monitoring (port 8081)
17-
-**Redis Persistence** - AOF enabled for durability
1820
-**Docker & Docker Compose** support
19-
-**Health Check** endpoint
21+
-**Health Check** endpoints (liveness, readiness, detailed)
22+
-**Prometheus Metrics** - task and queue metrics at `/metrics`
23+
-**OpenTelemetry Tracing** - distributed tracing support
2024

2125
## Technology Stack
2226

2327
- **Go 1.22** with [Asynq](https://github.com/hibiken/asynq) library
24-
- **Redis 7** for task storage and queue management
28+
- **Redis Sentinel** for high-availability task storage and queue management
2529
- **Docker** with multi-stage builds
2630
- **Asynqmon** for task monitoring dashboard
2731

@@ -40,21 +44,27 @@ docker-compose logs -f dispatcherServices:
4044

4145
### Local Development
4246

47+
```bash
4348
# Install dependencies
4449
go mod download
4550

46-
# Run Redis locally
47-
docker run -d -p 6379:6379 redis:7-alpine
51+
# Run Redis Sentinel locally (see redis/sentinel example in this repo)
52+
# Or use docker-compose which includes Redis Sentinel
4853

4954
# Run the application
50-
go run main.go## API Usage
55+
REDIS_SENTINEL_ADDR=localhost:26379 REDIS_MASTER_NAME=mymaster go run main.go
56+
```
57+
58+
## API Usage
5159

5260
### Enqueue a Task
5361

5462
**Endpoint**: `POST /v1/enqueue`
5563

5664
**Request Body**:
65+
```json
5766
{
67+
"client_id": "my-service",
5868
"queue": "default",
5969
"delay": 0,
6070
"task": {
@@ -66,15 +76,22 @@ go run main.go## API Usage
6676
},
6777
"body": "{\"message\": \"Hello from Task Dispatcher\"}"
6878
}
69-
}**Response**:
79+
}
80+
```
81+
82+
**Response**:
83+
```json
7084
{
7185
"status": "enqueued",
7286
"task_id": "abc123-xyz789",
73-
"queue": "default"
74-
}### Parameters
87+
"queue": "default",
88+
"client_id": "my-service"
89+
}
90+
```### Parameters
7591

7692
| Field | Type | Description |
7793
|-------|------|-------------|
94+
| `client_id` | string | **Required.** Client identifier for rate limiting (3-64 alphanumeric, hyphens, underscores) |
7895
| `queue` | string | Queue name: `critical`, `default`, or `low` |
7996
| `delay` | int | Delay in seconds before execution (0 = immediate) |
8097
| `task.url` | string | Target URL to call when task executes |
@@ -104,8 +121,27 @@ Environment variables:
104121

105122
| Variable | Default | Description |
106123
|----------|---------|-------------|
107-
| `REDIS_ADDR` | `localhost:6379` | Redis server address |
124+
| `REDIS_SENTINEL_ADDR` | `localhost:26379` | Redis Sentinel address (use global internal endpoint) |
125+
| `REDIS_MASTER_NAME` | `mymaster` | Redis Sentinel master name |
126+
| `REDIS_PASSWORD` | `` | Redis authentication password |
127+
| `REDIS_SENTINEL_PASSWORD` | `` | Sentinel authentication password |
108128
| `PORT` | `8080` | API server port |
129+
| `MODE` | `both` | Run mode: `api`, `worker`, or `both` |
130+
| `WORKER_CONCURRENCY` | `10` | Number of concurrent workers |
131+
| `TASK_TIMEOUT_SEC` | `1800` | Task timeout in seconds (30 min default) |
132+
| `MAX_RETRY` | `5` | Maximum retry attempts per task |
133+
| `LOG_LEVEL` | `info` | Log level: `debug`, `info`, `warn`, `error` |
134+
| `ADMIN_API_KEY` | `` | API key for admin endpoints |
135+
| `OTEL_EXPORTER_OTLP_ENDPOINT` | `` | OpenTelemetry collector endpoint (enables tracing) |
136+
137+
### Circuit Breaker Configuration
138+
139+
| Variable | Default | Description |
140+
|----------|---------|-------------|
141+
| `CB_MAX_REQUESTS` | `3` | Max requests in half-open state |
142+
| `CB_INTERVAL_SEC` | `60` | Interval to clear failure counts |
143+
| `CB_TIMEOUT_SEC` | `30` | Time before circuit half-opens |
144+
| `CB_FAILURE_THRESHOLD` | `5` | Consecutive failures to open circuit |
109145

110146
## Task Processing Behavior
111147

@@ -127,16 +163,38 @@ Access the Asynqmon dashboard at http://localhost:8081 to:
127163

128164
## Deployment
129165

130-
### ControlPlane (cpln)
166+
### Control Plane (cpln)
131167

168+
```bash
132169
# Build and push images
133-
./cpln-build.sh### Custom Deployment
170+
./cpln-build.sh
171+
```
172+
173+
A Helm chart is provided in the `helm/` directory for deploying to Control Plane:
174+
175+
```bash
176+
# Deploy using helm template
177+
helm template task-runner ./helm -f ./helm/values.yaml | cpln apply -f -
178+
```
179+
180+
### Custom Deployment
134181

135182
The application can be deployed to any platform that supports Docker:
136183

137184
1. Build the image: `docker build -t task-dispatcher .`
138-
2. Deploy Redis with persistence enabled
139-
3. Set `REDIS_ADDR` environment variable
185+
2. Deploy Redis Sentinel (see `redis/sentinel` example in this repo)
186+
3. Set `REDIS_SENTINEL_ADDR` and `REDIS_MASTER_NAME` environment variables
140187
4. Expose port 8080
141188

189+
## Redis Sentinel
190+
191+
This application **requires Redis Sentinel** for high availability. It does not support standalone Redis.
192+
193+
Redis Sentinel provides:
194+
- **Automatic master discovery** - workers connect via Sentinel to find the current master
195+
- **Automatic failover** - if the master fails, Sentinel promotes a replica and workers reconnect automatically
196+
- **High availability** - no single point of failure for Redis
197+
198+
When deploying on Control Plane, use the global internal endpoint for Sentinel (e.g., `redis-sentinel.mygvc.cpln.local:26379`) which load balances across all Sentinel replicas.
199+
142200
## Architecture

examples/cpln-task-runner/main.go

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,10 @@ var (
163163

164164
// --- Configuration ---
165165
var (
166-
RedisAddr = getEnv("REDIS_ADDR", "localhost:6379")
167-
RedisPassword = getEnv("REDIS_PASSWORD", "")
166+
RedisPassword = getEnv("REDIS_PASSWORD", "")
167+
RedisMasterName = getEnv("REDIS_MASTER_NAME", "mymaster")
168+
RedisSentinelAddr = getEnv("REDIS_SENTINEL_ADDR", "localhost:26379")
169+
RedisSentinelPass = getEnv("REDIS_SENTINEL_PASSWORD", "")
168170
Port = getEnv("PORT", "8080")
169171
AdminAPIKey = getEnv("ADMIN_API_KEY", "")
170172
Concurrency = getEnvInt("WORKER_CONCURRENCY", 10)
@@ -665,33 +667,38 @@ func main() {
665667
"tracing", TracingEnabled,
666668
)
667669

668-
// 1. Initialize Redis client for rate limiting with connection pooling
669-
redisClient = redis.NewClient(&redis.Options{
670-
Addr: RedisAddr,
671-
Password: RedisPassword,
672-
PoolSize: 100,
673-
MinIdleConns: 10,
674-
DialTimeout: 5 * time.Second,
675-
ReadTimeout: 3 * time.Second,
676-
WriteTimeout: 3 * time.Second,
670+
// 1. Initialize Redis client for rate limiting with connection pooling (via Sentinel)
671+
redisClient = redis.NewFailoverClient(&redis.FailoverOptions{
672+
MasterName: RedisMasterName,
673+
SentinelAddrs: []string{RedisSentinelAddr},
674+
Password: RedisPassword,
675+
SentinelPassword: RedisSentinelPass,
676+
DB: 0,
677+
PoolSize: 100,
678+
MinIdleConns: 10,
679+
DialTimeout: 5 * time.Second,
680+
ReadTimeout: 3 * time.Second,
681+
WriteTimeout: 3 * time.Second,
677682
})
678683
defer redisClient.Close()
679684

680685
// Verify Redis connection
681686
if err := redisClient.Ping(ctx).Err(); err != nil {
682-
slog.Error("Failed to connect to Redis", "addr", RedisAddr, "error", err)
687+
slog.Error("Failed to connect to Redis via Sentinel", "master", RedisMasterName, "sentinel", RedisSentinelAddr, "error", err)
683688
os.Exit(1)
684689
}
685-
slog.Info("Connected to Redis", "addr", RedisAddr)
690+
slog.Info("Connected to Redis via Sentinel", "master", RedisMasterName, "sentinel", RedisSentinelAddr)
686691

687-
// 2. Setup Asynq Redis Connection
688-
redisOpt := asynq.RedisClientOpt{
689-
Addr: RedisAddr,
690-
Password: RedisPassword,
692+
// 2. Setup Asynq Redis Connection (via Sentinel)
693+
asynqOpt := asynq.RedisFailoverClientOpt{
694+
MasterName: RedisMasterName,
695+
SentinelAddrs: []string{RedisSentinelAddr},
696+
Password: RedisPassword,
697+
SentinelPassword: RedisSentinelPass,
691698
}
692699

693700
// 3. Create Asynq Inspector for queue metrics
694-
asynqInspector = asynq.NewInspector(redisOpt)
701+
asynqInspector = asynq.NewInspector(asynqOpt)
695702
defer asynqInspector.Close()
696703

697704
// Register queue metrics collector
@@ -702,16 +709,16 @@ func main() {
702709
var srv *asynq.Server
703710
var httpServer *http.Server
704711

705-
// 3. Setup Asynq Client (needed for API mode)
712+
// 4. Setup Asynq Client (needed for API mode)
706713
if Mode == "api" || Mode == "both" {
707-
client = asynq.NewClient(redisOpt)
714+
client = asynq.NewClient(asynqOpt)
708715
defer client.Close()
709716
}
710717

711-
// 4. Setup Asynq Server (needed for Worker mode)
718+
// 5. Setup Asynq Server (needed for Worker mode)
712719
if Mode == "worker" || Mode == "both" {
713720
srv = asynq.NewServer(
714-
redisOpt,
721+
asynqOpt,
715722
asynq.Config{
716723
Concurrency: Concurrency,
717724
Queues: map[string]int{
@@ -1020,13 +1027,11 @@ func handleEnqueue(w http.ResponseWriter, r *http.Request, client *asynq.Client)
10201027
return
10211028
}
10221029

1023-
// Define Task Options with Group-based concurrency control
1030+
// Define Task Options
10241031
opts := []asynq.Option{
10251032
asynq.Queue(req.Queue),
10261033
asynq.MaxRetry(MaxRetry),
10271034
asynq.Timeout(time.Duration(TaskTimeout) * time.Second),
1028-
// Group by client_id for per-client concurrency control
1029-
asynq.Group(fmt.Sprintf("client:%s", req.ClientID)),
10301035
}
10311036

10321037
if req.DelaySec > 0 {

0 commit comments

Comments
 (0)