Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

- *chore*: add sentinel example
- *feat**: idempotency for tasks (#67)
- *feat*: idempotency for tasks (#67)
- *chore*: streamline vacuuming and examples (#68)

## [1.0.0-rc.7] - 2026-04-09

Expand Down
36 changes: 36 additions & 0 deletions examples/vacuum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::env;

use apalis::prelude::*;
use apalis_core::backend::Vacuum;
use apalis_redis::{RedisConfig, RedisContext, RedisStorage};
use redis::Client;

#[tokio::main]
async fn main() {
let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap();
let conn = client.get_connection_manager().await.unwrap();
let mut backend = RedisStorage::new_with_config(
conn,
RedisConfig::default()
.set_namespace("redis_vacuum_worker")
.set_buffer_size(100),
);
backend.push(42).await.unwrap();
async fn task(task: u32, ctx: RedisContext, wrk: WorkerContext) -> Result<(), BoxDynError> {
let handle = std::thread::current();
println!("{task:?}, {ctx:?}, Thread: {:?}", handle.id());
wrk.stop().unwrap();
Ok(())
}

let worker = WorkerBuilder::new("rango-tango")
.backend(backend.clone())
.on_event(|ctx, ev| {
println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
})
.build(task);
worker.run().await.unwrap();

// You can combine this with apalis-cron to vacuum on interval
backend.vacuum().await.unwrap();
}
37 changes: 18 additions & 19 deletions lua/vacuum.lua
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
-- Lua script to clean up data in Redis
-- KEYS[1]: the task data hash
-- KEYS[2]: the metadata prefix (e.g. "task_meta")

-- Define the keys
local done_list_key = KEYS[1]
local data_hash = KEYS[2]
local terminal_statuses = { Done = true, Failed = true, Killed = true }
local result_hash = KEYS[2] .. ":result"

-- Iterate through done_list
local done_list_ids = redis.call('ZRANGE', done_list_key, 0, -1)
local deleted = 0

-- Initialize a variable to count the number of removed items
local removed_items_count = 0
local fields = redis.call("hgetall", KEYS[1])

for _, id in ipairs(done_list_ids) do
-- fields is a flat list of [field, value, field, value, ...]
for i = 1, #fields, 2 do
local task_id = fields[i]
local meta_key = KEYS[2] .. ':' .. task_id
local status = redis.call("hget", meta_key, "status")

local is_member = redis.call('HEXISTS', data_hash, id)
if is_member == 1 then
-- Remove entry from data_hash
redis.call('HDEL', data_hash, id)
removed_items_count = removed_items_count + 1
end
if status and terminal_statuses[status] then
redis.call("hdel", KEYS[1], task_id)
redis.call("hdel", result_hash, task_id)
redis.call("del", meta_key)
deleted = deleted + 1
end
end

-- Clean the done_list
redis.call('DEL', done_list_key)

return removed_items_count
return deleted
2 changes: 0 additions & 2 deletions src/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,6 @@ pub fn deserialize_with_meta<'a>(
Some(idempotency_key_raw)
};

dbg!(&idempotency_key);

let meta = meta_fields[9..]
.chunks(2)
.filter_map(|chunk| {
Expand Down
1 change: 1 addition & 0 deletions src/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ mod list_queues;
mod list_tasks;
mod list_workers;
mod metrics;
mod vacuum;
mod wait_for;
22 changes: 22 additions & 0 deletions src/queries/vacuum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use apalis_core::backend::Vacuum;
use apalis_core::backend::codec::Codec;
use redis::aio::ConnectionLike;

use crate::RedisStorage;

impl<Args, Conn, C> Vacuum for RedisStorage<Args, Conn, C>
where
Args: Unpin + Send + Sync + 'static,
Conn: ConnectionLike + Clone + Send + Sync + 'static,
C: Codec<Args, Compact = Vec<u8>> + Unpin + Send + 'static,
C::Error: std::error::Error + Send + Sync + 'static,
{
async fn vacuum(&mut self) -> Result<usize, Self::Error> {
let vacuum_script = redis::Script::new(include_str!("../../lua/vacuum.lua"));
vacuum_script
.key(self.config.job_data_hash())
.key(self.config.job_meta_hash())
.invoke_async(&mut self.conn)
.await
}
}
Loading