Skip to content

Commit 01d2005

Browse files
authored
feat(scheduler): add schedules read-model projection (#383)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 707efba commit 01d2005

27 files changed

Lines changed: 10710 additions & 0 deletions
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
edition = "2024";
2+
3+
package trogonai.scheduler.schedules.projections.v1;
4+
5+
import "google/protobuf/duration.proto";
6+
7+
// Delivery is the read model's own copy of where a schedule publishes when it
8+
// fires.
9+
//
10+
// It mirrors the command-side delivery today but is defined here so the read
11+
// model owns its full storage shape and can evolve independently of the event
12+
// and command schemas.
13+
message Delivery {
14+
oneof kind {
15+
NatsMessage nats_message = 1;
16+
}
17+
18+
message NatsMessage {
19+
string subject = 1 [features.field_presence = LEGACY_REQUIRED];
20+
google.protobuf.Duration ttl = 2;
21+
Source source = 3;
22+
23+
message Source {
24+
oneof kind {
25+
LatestFromSubject latest_from_subject = 1;
26+
}
27+
}
28+
29+
message LatestFromSubject {
30+
string subject = 1 [features.field_presence = LEGACY_REQUIRED];
31+
}
32+
}
33+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
edition = "2024";
2+
3+
package trogonai.scheduler.schedules.projections.v1;
4+
5+
import "trogon/content/v1alpha1/content.proto";
6+
7+
// Message is the read model's own copy of the static payload and headers
8+
// recorded when a schedule was created.
9+
//
10+
// It mirrors the command-side message today but is defined here so the read
11+
// model owns its full storage shape and can evolve independently of the event
12+
// and command schemas.
13+
message Message {
14+
trogon.content.v1alpha1.Content content = 1 [features.field_presence = LEGACY_REQUIRED];
15+
repeated Header headers = 2;
16+
}
17+
18+
message Header {
19+
string name = 1 [features.field_presence = LEGACY_REQUIRED];
20+
string value = 2 [features.field_presence = LEGACY_REQUIRED];
21+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
edition = "2024";
2+
3+
package trogonai.scheduler.schedules.projections.v1;
4+
5+
import "google/protobuf/duration.proto";
6+
import "google/protobuf/timestamp.proto";
7+
import "google/type/datetime.proto";
8+
9+
// Schedule is the read model's own copy of the scheduling strategy recorded when
10+
// a schedule was created.
11+
//
12+
// It mirrors the command-side schedule definition today but is defined here so
13+
// the read model owns its full storage shape and can evolve independently of the
14+
// event and command schemas.
15+
message Schedule {
16+
oneof kind {
17+
At at = 1;
18+
Every every = 2;
19+
Cron cron = 3;
20+
RRule rrule = 4;
21+
}
22+
23+
message At {
24+
google.protobuf.Timestamp at = 1 [features.field_presence = LEGACY_REQUIRED];
25+
}
26+
27+
message Every {
28+
google.protobuf.Duration every = 1 [features.field_presence = LEGACY_REQUIRED];
29+
}
30+
31+
message Cron {
32+
string expr = 1 [features.field_presence = LEGACY_REQUIRED];
33+
google.type.TimeZone timezone = 2 [features.field_presence = LEGACY_REQUIRED];
34+
}
35+
36+
message RRule {
37+
google.protobuf.Timestamp dtstart = 1 [features.field_presence = LEGACY_REQUIRED];
38+
string rrule = 2 [features.field_presence = LEGACY_REQUIRED];
39+
google.type.TimeZone timezone = 3 [features.field_presence = LEGACY_REQUIRED];
40+
repeated google.protobuf.Timestamp rdate = 4;
41+
repeated google.protobuf.Timestamp exdate = 5;
42+
}
43+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
edition = "2024";
2+
3+
package trogonai.scheduler.schedules.projections.v1;
4+
5+
import "google/protobuf/timestamp.proto";
6+
import "trogonai/scheduler/schedules/projections/v1/delivery.proto";
7+
import "trogonai/scheduler/schedules/projections/v1/message.proto";
8+
import "trogonai/scheduler/schedules/projections/v1/schedule.proto";
9+
import "trogonai/scheduler/schedules/projections/v1/schedule_status.proto";
10+
11+
// ScheduleProjection is the read-model projection of a schedule's current state,
12+
// folded from the schedule event stream and stored as the value of each entry in
13+
// the schedules KV bucket.
14+
//
15+
// It is a derived, rebuildable read view, so it defines its own status, schedule,
16+
// delivery, and message types rather than embedding the event and command protos.
17+
// That keeps this storage shape free to evolve under protobuf's field rules in
18+
// isolation from the schemas it is folded from.
19+
message ScheduleProjection {
20+
// Stable schedule id; also the schedule stream id.
21+
string schedule_id = 1 [features.field_presence = LEGACY_REQUIRED];
22+
// Current lifecycle status (scheduled or paused).
23+
ScheduleStatus status = 2 [features.field_presence = LEGACY_REQUIRED];
24+
// True once a recurring schedule has run to exhaustion: it stays visible but
25+
// will never fire again.
26+
bool completed = 3;
27+
// The next planned occurrence, if one is armed and pending.
28+
google.protobuf.Timestamp next_occurrence_at = 4;
29+
// The most recently recorded occurrence, if any has fired.
30+
google.protobuf.Timestamp last_occurrence_at = 5;
31+
// The schedule definition recorded at creation.
32+
Schedule schedule = 6 [features.field_presence = LEGACY_REQUIRED];
33+
// The delivery definition recorded at creation.
34+
Delivery delivery = 7 [features.field_presence = LEGACY_REQUIRED];
35+
// The static message payload and headers recorded at creation.
36+
Message message = 8 [features.field_presence = LEGACY_REQUIRED];
37+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
edition = "2024";
2+
3+
package trogonai.scheduler.schedules.projections.v1;
4+
5+
// ScheduleStatus is the read model's own copy of a schedule's lifecycle status.
6+
//
7+
// It mirrors the command-side status today but is defined here so the read model
8+
// owns its full storage shape and can evolve under protobuf's field rules without
9+
// depending on the event or command schemas.
10+
message ScheduleStatus {
11+
// A oneof gives each lifecycle state a place to grow without adding fields
12+
// that are invalid for other states.
13+
oneof kind {
14+
Scheduled scheduled = 1;
15+
Paused paused = 2;
16+
}
17+
18+
// Scheduled means the schedule participates in future fire decisions.
19+
message Scheduled {}
20+
21+
// Paused means the schedule keeps its definition without future fires.
22+
message Paused {}
23+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
//! The schedules read-model projection (a NATS KV view of current schedules,
2+
//! folded from the schedule event stream).
3+
//!
4+
//! # Boundary
5+
//!
6+
//! This module is the **write side** only: it folds `v1` event protos straight
7+
//! into the stored `projections.v1.ScheduleProjection` proto and owns that storage layout
8+
//! (bucket, key scheme, checkpoint — see [`schedules::storage`]). It depends on
9+
//! the event/view protos, the shared error type, and the shared events stream —
10+
//! never on the read model or the queries. Reading the stored proto back out and
11+
//! decoding it into the read-model value objects callers see is the **read side**
12+
//! and lives entirely in [`crate::queries`].
13+
//!
14+
//! # Single active writer
15+
//!
16+
//! This projection must be driven by a single active instance, the same
17+
//! invariant the execution worker requires (see `processor::execution::worker`).
18+
//! The event log itself is safe under concurrent writers — per-subject
19+
//! optimistic concurrency serializes appends — but the KV writes here are
20+
//! read-modify-write without compare-and-swap, and [`catch_up_schedules_read_model`]
21+
//! rebuilds the bucket from the event log. With two instances writing the same
22+
//! bucket, a write can land on stale state, or a boot-time rebuild can overwrite
23+
//! a peer's newer write.
24+
//!
25+
//! Enforce this by deploying the scheduler as a single active instance (a single
26+
//! replica, a `Recreate` rollout, or leader election) — not with a
27+
//! projection-specific lock, since the worker already gates the whole service.
28+
//!
29+
//! ## Safety net for the rolling-restart overlap
30+
//!
31+
//! A rolling deploy briefly runs two instances. Most of the projection degrades
32+
//! gracefully, not corrupts, during that window:
33+
//! - Catch-up early-returns when the checkpoint is current, so a booting instance
34+
//! does not rebuild (or reconcile) in steady state.
35+
//! - The checkpoint only advances contiguously, so a racing writer can stall it
36+
//! (forcing a harmless re-fold) but never advance it past an unprojected event.
37+
//! - A live projection failure never fails the durable append and never advances
38+
//! the checkpoint, so any divergence self-heals on the next clean start.
39+
//!
40+
//! The one step that genuinely depends on the single-writer invariant is the
41+
//! catch-up reconcile: it deletes any current row absent from the freshly folded
42+
//! state, so during an overlap it could delete a row a peer just created. That
43+
//! row is re-created by the peer's next event or restart.
44+
//!
45+
//! The residual is a transient stale (or briefly missing) schedule that resolves
46+
//! on the schedule's next event or the next restart.
47+
48+
mod schedules;
49+
50+
pub(crate) use schedules::storage;
51+
pub(crate) use schedules::{catch_up_schedules_read_model, project_appended_events};

0 commit comments

Comments
 (0)