Skip to content

Commit f0d1276

Browse files
committed
feat(trogon-cron): add scheduler
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 290461a commit f0d1276

20 files changed

Lines changed: 22073 additions & 1 deletion

rsworkspace/Cargo.lock

Lines changed: 76 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rsworkspace/crates/AGENTS.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,16 @@ For NATS infrastructure and testing, use the `trogon-nats` crate which provides:
1818
## Module conventions
1919

2020
Place observability concerns (metrics, tracing spans, logging helpers) under a `telemetry` module within each crate. Example: `acp-nats/src/telemetry/metrics.rs`. This keeps observability code separated from domain logic and provides a consistent location across crates.
21+
22+
For distributed CRON scheduling, use the `trogon-cron` crate which provides:
23+
- `Scheduler` — runs the tick loop; only one instance fires per cluster (leader election via NATS KV TTL)
24+
- `CronClient` — register/remove/enable/disable jobs stored in NATS KV (`cron_configs` bucket)
25+
- Two schedule types: `interval_sec` (fixed interval) or `cron` (6-field expression: sec min hour dom month dow)
26+
- Two action types: `publish` (NATS subject) or `spawn` (process with optional timeout and concurrency guard)
27+
- Tick payloads include `job_id`, `fired_at`, `execution_id`, and optional `payload` forwarded from the job config
28+
- OTel trace context is injected in tick message headers automatically
29+
- Ticks are published to the `CRON_TICKS` JetStream stream (subjects: `cron.>`, max_age: 1h) for at-least-once delivery; job action subjects must start with `cron.`
30+
- Workers subscribe by creating a durable consumer on `CRON_TICKS` filtered to their subject (e.g. `cron.backup`); ticks missed during downtime are replayed up to 1 hour later
31+
- Integration tests in `tests/integration.rs` (run with `NATS_TEST_URL=... cargo test -- --include-ignored`)
32+
- Binary: `trogon-cron serve` / `trogon-cron job list|add|get|remove|enable|disable`
33+
- `test-support` feature exposes `MockTickPublisher` (records published ticks) and `MockLeaderLock` (controllable via `deny_acquire()`/`deny_renew()`) for unit testing without a real NATS server
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
[package]
2+
name = "trogon-cron"
3+
version = "0.1.0"
4+
edition = "2024"
5+
autotests = false
6+
7+
[features]
8+
test-support = []
9+
10+
[dependencies]
11+
async-nats = { workspace = true, features = ["jetstream", "kv"] }
12+
bytes = { workspace = true }
13+
chrono = { version = "0.4", features = ["serde"] }
14+
clap = { workspace = true, features = ["env"] }
15+
cron = "0.12"
16+
futures = { workspace = true }
17+
nix = { version = "0.29", features = ["signal"] }
18+
opentelemetry = { workspace = true }
19+
rand = "0.8"
20+
serde = { workspace = true }
21+
serde_json = { workspace = true }
22+
tokio = { workspace = true, features = ["time", "process", "sync", "signal", "rt-multi-thread", "macros"] }
23+
tracing = { workspace = true }
24+
tracing-opentelemetry = "0.32.1"
25+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
26+
trogon-nats = { path = "../trogon-nats" }
27+
trogon-std = { path = "../trogon-std" }
28+
uuid = { version = "1", features = ["v4"] }
29+
30+
[dev-dependencies]
31+
nix = { version = "0.29", features = ["signal"] }
32+
time = "0.3"
33+
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
34+
uuid = { version = "1", features = ["v4"] }
35+
36+
[[bin]]
37+
name = "trogon-cron"
38+
path = "src/main.rs"
39+
40+
[[test]]
41+
name = "cron_unit"
42+
path = "tests/cron_unit.rs"
43+
required-features = ["test-support"]
44+
45+
[[test]]
46+
name = "integration"
47+
path = "tests/integration.rs"
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Build context: rsworkspace/
2+
# docker build -f crates/trogon-cron/Dockerfile -t trogon-cron .
3+
4+
# ── Chef stage ────────────────────────────────────────────────────────────────
5+
FROM rust:1.83-slim AS chef
6+
RUN cargo install cargo-chef --locked
7+
WORKDIR /app
8+
9+
# ── Planner stage ─────────────────────────────────────────────────────────────
10+
FROM chef AS planner
11+
COPY . .
12+
RUN cargo chef prepare --recipe-path recipe.json
13+
14+
# ── Builder stage ─────────────────────────────────────────────────────────────
15+
FROM chef AS builder
16+
RUN apt-get update && apt-get install -y pkg-config libssl-dev && rm -rf /var/lib/apt/lists/*
17+
COPY --from=planner /app/recipe.json recipe.json
18+
RUN cargo chef cook --release --recipe-path recipe.json
19+
COPY . .
20+
RUN cargo build --release -p trogon-cron
21+
22+
# ── Runtime stage ─────────────────────────────────────────────────────────────
23+
FROM debian:bookworm-slim
24+
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
25+
COPY --from=builder /app/target/release/trogon-cron /usr/local/bin/trogon-cron
26+
27+
ENV NATS_URL=nats://localhost:4222
28+
ENV RUST_LOG=info
29+
30+
ENTRYPOINT ["trogon-cron"]
31+
CMD ["serve"]
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
services:
2+
nats:
3+
image: nats:2.10-alpine
4+
command: ["-js", "-m", "8222"]
5+
ports:
6+
- "4222:4222" # client connections
7+
- "8222:8222" # HTTP monitoring
8+
healthcheck:
9+
test: ["CMD", "wget", "-qO-", "http://localhost:8222/healthz"]
10+
interval: 5s
11+
timeout: 3s
12+
retries: 5
13+
14+
trogon-cron:
15+
build:
16+
context: ../..
17+
dockerfile: crates/trogon-cron/Dockerfile
18+
environment:
19+
NATS_URL: nats://nats:4222
20+
RUST_LOG: info
21+
depends_on:
22+
nats:
23+
condition: service_healthy
24+
restart: on-failure
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
use async_nats::jetstream;
2+
3+
use crate::{
4+
config::JobConfig, domain::RegisteredJob, error::CronError, nats_impls::NatsConfigStore,
5+
traits::ConfigStore,
6+
};
7+
8+
/// Client for registering and managing CRON job configs.
9+
///
10+
/// Multiple processes can use `CronClient` simultaneously — changes are picked up
11+
/// by all running `Scheduler` instances in real time via KV watch.
12+
///
13+
/// # Example
14+
///
15+
/// ```rust,no_run
16+
/// use trogon_cron::{CronClient, JobConfig, Schedule, Action};
17+
///
18+
/// # async fn example() -> Result<(), trogon_cron::CronError> {
19+
/// let nats = async_nats::connect("nats://localhost:4222").await.unwrap();
20+
/// let client = CronClient::new(nats).await?;
21+
///
22+
/// client.register_job(&JobConfig {
23+
/// id: "health".to_string(),
24+
/// schedule: Schedule::Interval { interval_sec: 30 },
25+
/// action: Action::Publish { subject: "cron.health".to_string() },
26+
/// enabled: true,
27+
/// payload: None,
28+
/// retry: None,
29+
/// }).await?;
30+
/// # Ok(())
31+
/// # }
32+
/// ```
33+
pub struct CronClient<C = NatsConfigStore> {
34+
store: C,
35+
}
36+
37+
impl CronClient<NatsConfigStore> {
38+
/// Connect the client and ensure the config KV bucket exists.
39+
pub async fn new(nats: async_nats::Client) -> Result<Self, CronError> {
40+
let js = jetstream::new(nats);
41+
let store = NatsConfigStore::new(js).await?;
42+
Ok(Self { store })
43+
}
44+
}
45+
46+
impl<C: ConfigStore> CronClient<C> {
47+
/// Create a client backed by any `ConfigStore` implementation.
48+
pub fn with_store(store: C) -> Self {
49+
Self { store }
50+
}
51+
52+
/// Register or update a job. Existing jobs with the same `id` are overwritten.
53+
///
54+
/// Structural config constraints are validated here so callers get an
55+
/// immediate error rather than a silent scheduler-side skip. Filesystem
56+
/// checks for spawned binaries are intentionally omitted because the client
57+
/// may run on a different host than the scheduler.
58+
pub async fn register_job(&self, config: &JobConfig) -> Result<(), CronError> {
59+
let _ = RegisteredJob::try_from(config)?;
60+
self.store.put_job(config).await?;
61+
tracing::info!(job_id = %config.id, "Job registered");
62+
Ok(())
63+
}
64+
65+
/// Remove a job by id. No-op if the job doesn't exist.
66+
pub async fn remove_job(&self, id: &str) -> Result<(), CronError> {
67+
self.store.delete_job(id).await?;
68+
tracing::info!(job_id = %id, "Job removed");
69+
Ok(())
70+
}
71+
72+
/// Enable or disable a job without removing it.
73+
pub async fn set_enabled(&self, id: &str, enabled: bool) -> Result<(), CronError> {
74+
let mut config = self
75+
.get_job(id)
76+
.await?
77+
.ok_or_else(|| CronError::Kv(format!("job '{}' not found", id)))?;
78+
config.enabled = enabled;
79+
self.register_job(&config).await
80+
}
81+
82+
/// Get a single job config by id. Returns `None` if not found or deleted.
83+
pub async fn get_job(&self, id: &str) -> Result<Option<JobConfig>, CronError> {
84+
self.store.get_job(id).await
85+
}
86+
87+
/// List all currently active job configs.
88+
pub async fn list_jobs(&self) -> Result<Vec<JobConfig>, CronError> {
89+
self.store.list_jobs().await
90+
}
91+
}

0 commit comments

Comments
 (0)