From db8aca8f1a25b785891e096279764aa58a44cc19 Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Fri, 8 May 2026 01:21:17 +0300 Subject: [PATCH] feat: add support for cluster mode --- CHANGELOG.md | 1 + README.md | 2 +- examples/cluster/compose.yaml | 165 +++++++++++++++++++++++++++++++++ examples/cluster/main.rs | 32 +++++++ examples/sentinel/compose.yaml | 3 - lua/list_all_workers.lua | 35 ++++--- src/lib.rs | 10 +- src/queries/list_workers.rs | 20 ++-- 8 files changed, 229 insertions(+), 39 deletions(-) create mode 100644 examples/cluster/compose.yaml create mode 100644 examples/cluster/main.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a4a1a7..37e7cf8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - *chore*: add sentinel example - *feat*: idempotency for tasks (#67) - *chore*: streamline vacuuming and examples (#68) +- *feat*: add support for cluster mode (#69) ## [1.0.0-rc.7] - 2026-04-09 diff --git a/README.md b/README.md index 06d912e..a371b2d 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Background task processing for rust using `apalis` and `redis` - **Reliable task queue** using any `redis` compatible service as the backend. - **Multiple storage types**: standard polling and `pubsub` based approaches. -- **Supports Redis Sentinel**: offering high availability. +- **Supports Sentinel and Cluster modes**: offering high availability. - **Customizable codecs** for serializing/deserializing task arguments such as `json`, `msgpack` and `bincode`. - **Heartbeat and orphaned tasks re-enqueueing** for consistent task processing. - **Integration with `apalis` workers and middleware** such as `retry`, `long_running` and `parallelize` diff --git a/examples/cluster/compose.yaml b/examples/cluster/compose.yaml new file mode 100644 index 0000000..e25d1c5 --- /dev/null +++ b/examples/cluster/compose.yaml @@ -0,0 +1,165 @@ +networks: + redis-cluster-compose: + driver: bridge + +services: + redis-node-1: + image: redis:latest + ports: ["6000:6000"] + hostname: redis-node-1 + networks: + - redis-cluster-compose + command: > + redis-server + --port 6000 + --cluster-enabled yes + --cluster-config-file nodes.conf + --cluster-node-timeout 5000 + --cluster-announce-ip redis-node-1 + --cluster-announce-port 6000 + --cluster-announce-bus-port 16000 + --appendonly yes + --bind 0.0.0.0 + healthcheck: + test: ["CMD-SHELL", "redis-cli -p 6000 ping || exit 1"] + interval: 5s + timeout: 3s + retries: 5 + start_period: 10s + + redis-node-2: + image: redis:latest + ports: ["6001:6001"] + hostname: redis-node-2 + networks: + - redis-cluster-compose + command: > + redis-server + --port 6001 + --cluster-enabled yes + --cluster-config-file nodes.conf + --cluster-node-timeout 5000 + --cluster-announce-ip redis-node-2 + --cluster-announce-port 6001 + --cluster-announce-bus-port 16001 + --appendonly yes + --bind 0.0.0.0 + healthcheck: + test: ["CMD-SHELL", "redis-cli -p 6001 ping || exit 1"] + interval: 5s + timeout: 3s + retries: 5 + start_period: 10s + + redis-node-3: + image: redis:latest + ports: ["6002:6002"] + hostname: redis-node-3 + networks: + - redis-cluster-compose + command: > + redis-server + --port 6002 + --cluster-enabled yes + --cluster-config-file nodes.conf + --cluster-node-timeout 5000 + --cluster-announce-ip redis-node-3 + --cluster-announce-port 6002 + --cluster-announce-bus-port 16002 + --appendonly yes + --bind 0.0.0.0 + healthcheck: + test: ["CMD-SHELL", "redis-cli -p 6002 ping || exit 1"] + interval: 5s + timeout: 3s + retries: 5 + start_period: 10s + + redis-node-4: + image: redis:latest + ports: ["6003:6003"] + hostname: redis-node-4 + networks: + - redis-cluster-compose + command: > + redis-server + --port 6003 + --cluster-enabled yes + --cluster-config-file nodes.conf + --cluster-node-timeout 5000 + --cluster-announce-ip redis-node-4 + --cluster-announce-port 6003 + --cluster-announce-bus-port 16003 + --appendonly yes + --bind 0.0.0.0 + healthcheck: + test: ["CMD-SHELL", "redis-cli -p 6003 ping || exit 1"] + interval: 5s + timeout: 3s + retries: 5 + start_period: 10s + + redis-node-5: + image: redis:latest + ports: ["6004:6004"] + hostname: redis-node-5 + networks: + - redis-cluster-compose + command: > + redis-server + --port 6004 + --cluster-enabled yes + --cluster-config-file nodes.conf + --cluster-node-timeout 5000 + --cluster-announce-ip redis-node-5 + --cluster-announce-port 6004 + --cluster-announce-bus-port 16004 + --appendonly yes + --bind 0.0.0.0 + healthcheck: + test: ["CMD-SHELL", "redis-cli -p 6004 ping || exit 1"] + interval: 5s + timeout: 3s + retries: 5 + start_period: 10s + + redis-node-6: + image: redis:latest + ports: ["6005:6005"] + hostname: redis-node-6 + networks: + - redis-cluster-compose + command: > + redis-server + --port 6005 + --cluster-enabled yes + --cluster-config-file nodes.conf + --cluster-node-timeout 5000 + --cluster-announce-ip redis-node-6 + --cluster-announce-port 6005 + --cluster-announce-bus-port 16005 + --appendonly yes + --bind 0.0.0.0 + healthcheck: + test: ["CMD-SHELL", "redis-cli -p 6005 ping || exit 1"] + interval: 5s + timeout: 3s + retries: 5 + start_period: 10s + + redis-cluster-creator: + image: redis:latest + networks: + - redis-cluster-compose + command: > + redis-cli -p 6000 --cluster create + redis-node-1:6000 redis-node-2:6001 redis-node-3:6002 + redis-node-4:6003 redis-node-5:6004 redis-node-6:6005 + --cluster-replicas 1 --cluster-yes + depends_on: + redis-node-1: { condition: service_healthy } + redis-node-2: { condition: service_healthy } + redis-node-3: { condition: service_healthy } + redis-node-4: { condition: service_healthy } + redis-node-5: { condition: service_healthy } + redis-node-6: { condition: service_healthy } diff --git a/examples/cluster/main.rs b/examples/cluster/main.rs new file mode 100644 index 0000000..36d1541 --- /dev/null +++ b/examples/cluster/main.rs @@ -0,0 +1,32 @@ +use std::env; + +use apalis::prelude::*; +use apalis_redis::{RedisConfig, RedisContext, RedisStorage}; +use redis::Client; + +#[tokio::main] +async fn main() { + let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap(); + let conn = client.get_connection_manager().await.unwrap(); + let mut backend = RedisStorage::new_with_config( + conn, + RedisConfig::default() + .set_namespace("{clustered_queue}") // Remember to wrap in "{}" for cross slot + .set_buffer_size(100), + ); + backend.push(42).await.unwrap(); + async fn task(task: u32, ctx: RedisContext, wrk: WorkerContext) -> Result<(), BoxDynError> { + let handle = std::thread::current(); + println!("{task:?}, {ctx:?}, Thread: {:?}", handle.id()); + wrk.stop().unwrap(); + Ok(()) + } + + let worker = WorkerBuilder::new("rango-tango") + .backend(backend) + .on_event(|ctx, ev| { + println!("CTX {:?}, On Event = {:?}", ctx.name(), ev); + }) + .build(task); + worker.run().await.unwrap(); +} diff --git a/examples/sentinel/compose.yaml b/examples/sentinel/compose.yaml index 7c4cb80..c8e91a6 100644 --- a/examples/sentinel/compose.yaml +++ b/examples/sentinel/compose.yaml @@ -1,6 +1,3 @@ -version: "3.8" - - services: redis-master: image: redis:latest diff --git a/lua/list_all_workers.lua b/lua/list_all_workers.lua index e26dd4f..6b40541 100644 --- a/lua/list_all_workers.lua +++ b/lua/list_all_workers.lua @@ -1,36 +1,33 @@ --- KEYS: list of active workers sorted sets (one per queue) --- ARGV[1]: metadata prefix key (e.g. "worker:") --- Returns: JSON array of objects: --- [ --- { --- queue: , --- worker: , --- last_seen: , --- backend_name: , --- service: --- }, --- ... --- ] -local meta_prefix = ARGV[1] +local queues = redis.call("zrange", "core::apalis::queues::list", 0, -1) local result = {} -for _, queue_key in ipairs(KEYS) do +for _, queue_key in ipairs(queues) do local workers = redis.call("zrange", queue_key, 0, -1, "WITHSCORES") + -- Derive metadata key from queue key, same pattern as registration + local meta_key = queue_key .. ":workers:metadata" for i = 1, #workers, 2 do local name = workers[i] local last_seen = tonumber(workers[i + 1]) - local meta_key = meta_prefix .. name - local meta = redis.call("hmget", meta_key, "backend_name", "service") + local meta_json = redis.call("hget", meta_key, name) + local backend = "" + local service = "" + if meta_json then + local ok, decoded = pcall(cjson.decode, meta_json) + if ok then + backend = decoded.storage or "" + service = decoded.service or "" + end + end table.insert(result, { queue = queue_key, id = name, last_heartbeat = last_seen, started_at = 0, - backend = meta[1], - layers = meta[2] + backend = backend, + layers = service, }) end end diff --git a/src/lib.rs b/src/lib.rs index ea621a2..7047361 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -180,9 +180,12 @@ where let now: i64 = Utc::now().timestamp(); + let worker_metadata_key = + format!("{}:workers:metadata", config.get_namespace().as_ref()); + let res = register_worker .key(workers_set) - .key("core::apalis::workers:metadata::") + .key(worker_metadata_key) .arg(now) .arg(inflight_set) .arg(config.get_keep_alive().as_secs()) @@ -269,9 +272,12 @@ where let now: i64 = Utc::now().timestamp(); + let worker_metadata_key = + format!("{}:workers:metadata", config.get_namespace().as_ref()); + register_worker .key(workers_set) - .key("core::apalis::workers:metadata::") + .key(worker_metadata_key) .arg(now) .arg(inflight_set) .arg(config.get_keep_alive().as_secs()) diff --git a/src/queries/list_workers.rs b/src/queries/list_workers.rs index dc35a54..d4f471c 100644 --- a/src/queries/list_workers.rs +++ b/src/queries/list_workers.rs @@ -1,5 +1,5 @@ use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker, codec::Codec}; -use redis::Script; +use redis::{RedisError, Script}; use ulid::Ulid; use crate::{RedisContext, RedisStorage}; @@ -21,9 +21,10 @@ where let queue = self.config.get_namespace().to_string(); let mut conn = self.conn.clone(); async move { + let worker_metadata_key = format!("{}:workers:metadata", queue); let json: String = Script::new(include_str!("../../lua/list_workers.lua")) .key(format!("{}:workers", queue)) - .key("core::apalis::workers:metadata::") + .key(worker_metadata_key) .invoke_async(&mut conn) .await?; let workers: Vec = serde_json::from_str(&json).map_err(|e| { @@ -39,21 +40,12 @@ where ) -> impl Future, Self::Error>> + Send { let mut conn = self.conn.clone(); async move { - let queues = redis::cmd("ZRANGE") - .arg("core::apalis::queues::list") - .arg(0) - .arg(-1) - .query_async::>(&mut conn) + let json: String = Script::new(include_str!("../../lua/list_all_workers.lua")) + .invoke_async(&mut conn) .await?; - let script = Script::new(include_str!("../../lua/list_all_workers.lua")); - let mut script = script.key(queues); - let script = script.arg("core::apalis::workers:metadata::"); - - let json: String = script.invoke_async(&mut conn).await?; - let workers: Vec = serde_json::from_str(&json).map_err(|e| { - redis::RedisError::from((redis::ErrorKind::Parse, "invalid JSON", e.to_string())) + RedisError::from((redis::ErrorKind::Parse, "invalid JSON", e.to_string())) })?; Ok(workers)