diff --git a/CHANGELOG.md b/CHANGELOG.md index 6976b38..0a4a1a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - *chore*: add sentinel example -- *feat**: idempotency for tasks (#67) +- *feat*: idempotency for tasks (#67) +- *chore*: streamline vacuuming and examples (#68) ## [1.0.0-rc.7] - 2026-04-09 diff --git a/examples/vacuum.rs b/examples/vacuum.rs new file mode 100644 index 0000000..7c1c6bd --- /dev/null +++ b/examples/vacuum.rs @@ -0,0 +1,36 @@ +use std::env; + +use apalis::prelude::*; +use apalis_core::backend::Vacuum; +use apalis_redis::{RedisConfig, RedisContext, RedisStorage}; +use redis::Client; + +#[tokio::main] +async fn main() { + let client = Client::open(env::var("REDIS_URL").unwrap()).unwrap(); + let conn = client.get_connection_manager().await.unwrap(); + let mut backend = RedisStorage::new_with_config( + conn, + RedisConfig::default() + .set_namespace("redis_vacuum_worker") + .set_buffer_size(100), + ); + backend.push(42).await.unwrap(); + async fn task(task: u32, ctx: RedisContext, wrk: WorkerContext) -> Result<(), BoxDynError> { + let handle = std::thread::current(); + println!("{task:?}, {ctx:?}, Thread: {:?}", handle.id()); + wrk.stop().unwrap(); + Ok(()) + } + + let worker = WorkerBuilder::new("rango-tango") + .backend(backend.clone()) + .on_event(|ctx, ev| { + println!("CTX {:?}, On Event = {:?}", ctx.name(), ev); + }) + .build(task); + worker.run().await.unwrap(); + + // You can combine this with apalis-cron to vacuum on interval + backend.vacuum().await.unwrap(); +} diff --git a/lua/vacuum.lua b/lua/vacuum.lua index 82327f9..15e4b7b 100644 --- a/lua/vacuum.lua +++ b/lua/vacuum.lua @@ -1,26 +1,25 @@ --- Lua script to clean up data in Redis +-- KEYS[1]: the task data hash +-- KEYS[2]: the metadata prefix (e.g. "task_meta") --- Define the keys -local done_list_key = KEYS[1] -local data_hash = KEYS[2] +local terminal_statuses = { Done = true, Failed = true, Killed = true } +local result_hash = KEYS[2] .. ":result" --- Iterate through done_list -local done_list_ids = redis.call('ZRANGE', done_list_key, 0, -1) +local deleted = 0 --- Initialize a variable to count the number of removed items -local removed_items_count = 0 +local fields = redis.call("hgetall", KEYS[1]) -for _, id in ipairs(done_list_ids) do +-- fields is a flat list of [field, value, field, value, ...] +for i = 1, #fields, 2 do + local task_id = fields[i] + local meta_key = KEYS[2] .. ':' .. task_id + local status = redis.call("hget", meta_key, "status") - local is_member = redis.call('HEXISTS', data_hash, id) - if is_member == 1 then - -- Remove entry from data_hash - redis.call('HDEL', data_hash, id) - removed_items_count = removed_items_count + 1 - end + if status and terminal_statuses[status] then + redis.call("hdel", KEYS[1], task_id) + redis.call("hdel", result_hash, task_id) + redis.call("del", meta_key) + deleted = deleted + 1 + end end --- Clean the done_list -redis.call('DEL', done_list_key) - -return removed_items_count +return deleted diff --git a/src/fetcher.rs b/src/fetcher.rs index 9fac310..451b51f 100644 --- a/src/fetcher.rs +++ b/src/fetcher.rs @@ -197,8 +197,6 @@ pub fn deserialize_with_meta<'a>( Some(idempotency_key_raw) }; - dbg!(&idempotency_key); - let meta = meta_fields[9..] .chunks(2) .filter_map(|chunk| { diff --git a/src/queries/mod.rs b/src/queries/mod.rs index 9f171a8..bee1c31 100644 --- a/src/queries/mod.rs +++ b/src/queries/mod.rs @@ -3,4 +3,5 @@ mod list_queues; mod list_tasks; mod list_workers; mod metrics; +mod vacuum; mod wait_for; diff --git a/src/queries/vacuum.rs b/src/queries/vacuum.rs new file mode 100644 index 0000000..5e22c48 --- /dev/null +++ b/src/queries/vacuum.rs @@ -0,0 +1,22 @@ +use apalis_core::backend::Vacuum; +use apalis_core::backend::codec::Codec; +use redis::aio::ConnectionLike; + +use crate::RedisStorage; + +impl Vacuum for RedisStorage +where + Args: Unpin + Send + Sync + 'static, + Conn: ConnectionLike + Clone + Send + Sync + 'static, + C: Codec> + Unpin + Send + 'static, + C::Error: std::error::Error + Send + Sync + 'static, +{ + async fn vacuum(&mut self) -> Result { + let vacuum_script = redis::Script::new(include_str!("../../lua/vacuum.lua")); + vacuum_script + .key(self.config.job_data_hash()) + .key(self.config.job_meta_hash()) + .invoke_async(&mut self.conn) + .await + } +}