Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- *chore*: add sentinel example
- *feat*: idempotency for tasks (#67)
- *chore*: streamline vacuuming and examples (#68)
- *feat*: add support for cluster mode (#69)

## [1.0.0-rc.7] - 2026-04-09

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Background task processing for rust using `apalis` and `redis`

- **Reliable task queue** using any `redis` compatible service as the backend.
- **Multiple storage types**: standard polling and `pubsub` based approaches.
- **Supports Redis Sentinel**: offering high availability.
- **Supports Sentinel and Cluster modes**: offering high availability.
- **Customizable codecs** for serializing/deserializing task arguments such as `json`, `msgpack` and `bincode`.
- **Heartbeat and orphaned tasks re-enqueueing** for consistent task processing.
- **Integration with `apalis` workers and middleware** such as `retry`, `long_running` and `parallelize`
Expand Down
165 changes: 165 additions & 0 deletions examples/cluster/compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
networks:
redis-cluster-compose:
driver: bridge

services:
redis-node-1:
image: redis:latest
ports: ["6000:6000"]
hostname: redis-node-1
networks:
- redis-cluster-compose
command: >
redis-server
--port 6000
--cluster-enabled yes
--cluster-config-file nodes.conf
--cluster-node-timeout 5000
--cluster-announce-ip redis-node-1
--cluster-announce-port 6000
--cluster-announce-bus-port 16000
--appendonly yes
--bind 0.0.0.0
healthcheck:
test: ["CMD-SHELL", "redis-cli -p 6000 ping || exit 1"]
interval: 5s
timeout: 3s
retries: 5
start_period: 10s

redis-node-2:
image: redis:latest
ports: ["6001:6001"]
hostname: redis-node-2
networks:
- redis-cluster-compose
command: >
redis-server
--port 6001
--cluster-enabled yes
--cluster-config-file nodes.conf
--cluster-node-timeout 5000
--cluster-announce-ip redis-node-2
--cluster-announce-port 6001
--cluster-announce-bus-port 16001
--appendonly yes
--bind 0.0.0.0
healthcheck:
test: ["CMD-SHELL", "redis-cli -p 6001 ping || exit 1"]
interval: 5s
timeout: 3s
retries: 5
start_period: 10s

redis-node-3:
image: redis:latest
ports: ["6002:6002"]
hostname: redis-node-3
networks:
- redis-cluster-compose
command: >
redis-server
--port 6002
--cluster-enabled yes
--cluster-config-file nodes.conf
--cluster-node-timeout 5000
--cluster-announce-ip redis-node-3
--cluster-announce-port 6002
--cluster-announce-bus-port 16002
--appendonly yes
--bind 0.0.0.0
healthcheck:
test: ["CMD-SHELL", "redis-cli -p 6002 ping || exit 1"]
interval: 5s
timeout: 3s
retries: 5
start_period: 10s

redis-node-4:
image: redis:latest
ports: ["6003:6003"]
hostname: redis-node-4
networks:
- redis-cluster-compose
command: >
redis-server
--port 6003
--cluster-enabled yes
--cluster-config-file nodes.conf
--cluster-node-timeout 5000
--cluster-announce-ip redis-node-4
--cluster-announce-port 6003
--cluster-announce-bus-port 16003
--appendonly yes
--bind 0.0.0.0
healthcheck:
test: ["CMD-SHELL", "redis-cli -p 6003 ping || exit 1"]
interval: 5s
timeout: 3s
retries: 5
start_period: 10s

redis-node-5:
image: redis:latest
ports: ["6004:6004"]
hostname: redis-node-5
networks:
- redis-cluster-compose
command: >
redis-server
--port 6004
--cluster-enabled yes
--cluster-config-file nodes.conf
--cluster-node-timeout 5000
--cluster-announce-ip redis-node-5
--cluster-announce-port 6004
--cluster-announce-bus-port 16004
--appendonly yes
--bind 0.0.0.0
healthcheck:
test: ["CMD-SHELL", "redis-cli -p 6004 ping || exit 1"]
interval: 5s
timeout: 3s
retries: 5
start_period: 10s

redis-node-6:
image: redis:latest
ports: ["6005:6005"]
hostname: redis-node-6
networks:
- redis-cluster-compose
command: >
redis-server
--port 6005
--cluster-enabled yes
--cluster-config-file nodes.conf
--cluster-node-timeout 5000
--cluster-announce-ip redis-node-6
--cluster-announce-port 6005
--cluster-announce-bus-port 16005
--appendonly yes
--bind 0.0.0.0
healthcheck:
test: ["CMD-SHELL", "redis-cli -p 6005 ping || exit 1"]
interval: 5s
timeout: 3s
retries: 5
start_period: 10s

redis-cluster-creator:
image: redis:latest
networks:
- redis-cluster-compose
command: >
redis-cli -p 6000 --cluster create
redis-node-1:6000 redis-node-2:6001 redis-node-3:6002
redis-node-4:6003 redis-node-5:6004 redis-node-6:6005
--cluster-replicas 1 --cluster-yes
depends_on:
redis-node-1: { condition: service_healthy }
redis-node-2: { condition: service_healthy }
redis-node-3: { condition: service_healthy }
redis-node-4: { condition: service_healthy }
redis-node-5: { condition: service_healthy }
redis-node-6: { condition: service_healthy }
32 changes: 32 additions & 0 deletions examples/cluster/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::env;

use apalis::prelude::*;
use apalis_redis::{RedisConfig, RedisContext, RedisStorage};
use redis::Client;

#[tokio::main]
async fn main() {
let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
let conn = client.get_connection_manager().await.unwrap();
let mut backend = RedisStorage::new_with_config(
conn,
RedisConfig::default()
.set_namespace("{clustered_queue}") // Remember to wrap in "{}" for cross slot
.set_buffer_size(100),
);
backend.push(42).await.unwrap();
async fn task(task: u32, ctx: RedisContext, wrk: WorkerContext) -> Result<(), BoxDynError> {
let handle = std::thread::current();
println!("{task:?}, {ctx:?}, Thread: {:?}", handle.id());
wrk.stop().unwrap();
Ok(())
}

let worker = WorkerBuilder::new("rango-tango")
.backend(backend)
.on_event(|ctx, ev| {
println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
})
.build(task);
worker.run().await.unwrap();
}
3 changes: 0 additions & 3 deletions examples/sentinel/compose.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
version: "3.8"


services:
redis-master:
image: redis:latest
Expand Down
35 changes: 16 additions & 19 deletions lua/list_all_workers.lua
Original file line number Diff line number Diff line change
@@ -1,36 +1,33 @@
-- KEYS: list of active workers sorted sets (one per queue)
-- ARGV[1]: metadata prefix key (e.g. "worker:")
-- Returns: JSON array of objects:
-- [
-- {
-- queue: <queue_key>,
-- worker: <worker_name>,
-- last_seen: <number>,
-- backend_name: <string>,
-- service: <string>
-- },
-- ...
-- ]
local meta_prefix = ARGV[1]
local queues = redis.call("zrange", "core::apalis::queues::list", 0, -1)
local result = {}

for _, queue_key in ipairs(KEYS) do
for _, queue_key in ipairs(queues) do
local workers = redis.call("zrange", queue_key, 0, -1, "WITHSCORES")
-- Derive metadata key from queue key, same pattern as registration
local meta_key = queue_key .. ":workers:metadata"

for i = 1, #workers, 2 do
local name = workers[i]
local last_seen = tonumber(workers[i + 1])
local meta_key = meta_prefix .. name

local meta = redis.call("hmget", meta_key, "backend_name", "service")
local meta_json = redis.call("hget", meta_key, name)
local backend = ""
local service = ""
if meta_json then
local ok, decoded = pcall(cjson.decode, meta_json)
if ok then
backend = decoded.storage or ""
service = decoded.service or ""
end
end

table.insert(result, {
queue = queue_key,
id = name,
last_heartbeat = last_seen,
started_at = 0,
backend = meta[1],
layers = meta[2]
backend = backend,
layers = service,
})
end
end
Expand Down
10 changes: 8 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,12 @@ where

let now: i64 = Utc::now().timestamp();

let worker_metadata_key =
format!("{}:workers:metadata", config.get_namespace().as_ref());

let res = register_worker
.key(workers_set)
.key("core::apalis::workers:metadata::")
.key(worker_metadata_key)
.arg(now)
.arg(inflight_set)
.arg(config.get_keep_alive().as_secs())
Expand Down Expand Up @@ -269,9 +272,12 @@ where

let now: i64 = Utc::now().timestamp();

let worker_metadata_key =
format!("{}:workers:metadata", config.get_namespace().as_ref());

register_worker
.key(workers_set)
.key("core::apalis::workers:metadata::")
.key(worker_metadata_key)
.arg(now)
.arg(inflight_set)
.arg(config.get_keep_alive().as_secs())
Expand Down
20 changes: 6 additions & 14 deletions src/queries/list_workers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker, codec::Codec};
use redis::Script;
use redis::{RedisError, Script};
use ulid::Ulid;

use crate::{RedisContext, RedisStorage};
Expand All @@ -21,9 +21,10 @@ where
let queue = self.config.get_namespace().to_string();
let mut conn = self.conn.clone();
async move {
let worker_metadata_key = format!("{}:workers:metadata", queue);
let json: String = Script::new(include_str!("../../lua/list_workers.lua"))
.key(format!("{}:workers", queue))
.key("core::apalis::workers:metadata::")
.key(worker_metadata_key)
.invoke_async(&mut conn)
.await?;
let workers: Vec<RunningWorker> = serde_json::from_str(&json).map_err(|e| {
Expand All @@ -39,21 +40,12 @@ where
) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
let mut conn = self.conn.clone();
async move {
let queues = redis::cmd("ZRANGE")
.arg("core::apalis::queues::list")
.arg(0)
.arg(-1)
.query_async::<Vec<String>>(&mut conn)
let json: String = Script::new(include_str!("../../lua/list_all_workers.lua"))
.invoke_async(&mut conn)
.await?;

let script = Script::new(include_str!("../../lua/list_all_workers.lua"));
let mut script = script.key(queues);
let script = script.arg("core::apalis::workers:metadata::");

let json: String = script.invoke_async(&mut conn).await?;

let workers: Vec<RunningWorker> = serde_json::from_str(&json).map_err(|e| {
redis::RedisError::from((redis::ErrorKind::Parse, "invalid JSON", e.to_string()))
RedisError::from((redis::ErrorKind::Parse, "invalid JSON", e.to_string()))
})?;

Ok(workers)
Expand Down
Loading