| title | State Management Pattern 2: Observable State with SubscriptionRef | ||||||
|---|---|---|---|---|---|---|---|
| id | state-management-pattern-subscription-ref | ||||||
| skillLevel | advanced | ||||||
| applicationPatternId | concurrency | ||||||
| summary | Build observable state that notifies subscribers on changes, enabling reactive patterns and state-driven architecture. | ||||||
| tags |
|
||||||
| rule |
|
||||||
| related |
|
||||||
| author | effect_website | ||||||
| lessonOrder | 9 |
Observable state enables reactive patterns:
- State binding: UI binds to state, auto-updates on change
- Subscribers: Multiple handlers notified on change
- Event streams: Changes become event streams
- Derived state: Compute values from state changes
- Effect triggering: Changes trigger side effects
Pattern: Combine Ref + PubSub or custom subscription system
Passive state causes problems:
Problem 1: Stale UI
- State changes in backend
- UI doesn't know
- User sees old data
- Manual refresh required
Problem 2: Cascading updates
- User changes form field
- Need to update 5 other fields
- Manual imperative code
- Fragile, easy to miss one
Problem 3: Derived state
- Total = sum of items
- Manual update on each item change
- Duplicate code everywhere
- Bug: total not updated when items change
Problem 4: Side effects
- User enables feature
- Multiple things must happen
- Analytics, notifications, API calls
- All imperative, hard to maintain
Solutions:
Observable state:
- State change = event
- Subscribers notified
- UI binds directly
- Auto-updates
Reactive flows:
- Define how state flows
newTotal = items.sum()- Automatic recalculation
- No manual updates
Side effect chaining:
- When state changes to "complete"
- Send notification
- Log event
- Trigger cleanup
- All declaratively
This example demonstrates observable state patterns.
import { Effect, Ref, PubSub, Stream } from "effect";
interface StateChange<T> {
readonly previous: T;
readonly current: T;
readonly timestamp: Date;
readonly reason: string;
}
interface Observable<T> {
readonly get: () => Effect.Effect<T>;
readonly set: (value: T, reason: string) => Effect.Effect<void>;
readonly subscribe: () => Stream.Stream<StateChange<T>>;
readonly modify: (f: (current: T) => T, reason: string) => Effect.Effect<void>;
}
const program = Effect.gen(function* () {
console.log(
`\n[OBSERVABLE STATE] Reactive state management\n`
);
// Create observable
const createObservable = <T,>(initialValue: T): Effect.Effect<Observable<T>> =>
Effect.gen(function* () {
const state = yield* Ref.make(initialValue);
const changeStream = yield* PubSub.unbounded<StateChange<T>>();
return {
get: () => Ref.get(state),
set: (value: T, reason: string) =>
Effect.gen(function* () {
const previous = yield* Ref.get(state);
if (previous === value) {
return; // No change
}
yield* Ref.set(state, value);
const change: StateChange<T> = {
previous,
current: value,
timestamp: new Date(),
reason,
};
yield* PubSub.publish(changeStream, change);
}),
subscribe: () =>
PubSub.subscribe(changeStream),
modify: (f: (current: T) => T, reason: string) =>
Effect.gen(function* () {
const previous = yield* Ref.get(state);
const updated = f(previous);
if (previous === updated) {
return; // No change
}
yield* Ref.set(state, updated);
const change: StateChange<T> = {
previous,
current: updated,
timestamp: new Date(),
reason,
};
yield* PubSub.publish(changeStream, change);
}),
};
});
// Example 1: Basic observable counter
console.log(`[1] Observable counter:\n`);
const counter = yield* createObservable(0);
// Subscribe to changes
const printChanges = counter.subscribe().pipe(
Stream.tap((change) =>
Effect.log(
`[CHANGE] ${change.previous} → ${change.current} (${change.reason})`
)
),
Stream.take(5), // Limit to 5 changes for demo
Stream.runDrain
);
// Make changes
yield* counter.set(1, "increment");
yield* counter.set(2, "increment");
yield* counter.set(5, "reset");
// Wait for changes to be processed
yield* Effect.sleep("100 millis");
// Example 2: Derived state (computed values)
console.log(`\n[2] Derived state (total from items):\n`);
interface ShoppingCart {
readonly items: Array<{ id: string; price: number }>;
readonly discount: number;
}
const cart = yield* createObservable<ShoppingCart>({
items: [],
discount: 0,
});
const computeTotal = (state: ShoppingCart): number => {
const subtotal = state.items.reduce((sum, item) => sum + item.price, 0);
return subtotal * (1 - state.discount);
};
// Create derived observable
const total = yield* createObservable(computeTotal(yield* cart.get()));
// Subscribe to cart changes, update total
const updateTotalOnCartChange = cart.subscribe().pipe(
Stream.tap((change) =>
Effect.gen(function* () {
const newTotal = computeTotal(change.current);
yield* total.set(newTotal, "recalculated-from-cart");
yield* Effect.log(
`[TOTAL] Recalculated: $${newTotal.toFixed(2)}`
);
})
),
Stream.take(10),
Stream.runDrain
);
// Make cart changes
yield* cart.modify(
(state) => ({
...state,
items: [
...state.items,
{ id: "item1", price: 19.99 },
],
}),
"add-item"
);
yield* cart.modify(
(state) => ({
...state,
items: [
...state.items,
{ id: "item2", price: 29.99 },
],
}),
"add-item"
);
yield* cart.modify(
(state) => ({
...state,
discount: 0.1,
}),
"apply-discount"
);
yield* Effect.sleep("200 millis");
// Example 3: Effect triggering on state change
console.log(`\n[3] Effects triggered by state changes:\n`);
type AppStatus = "idle" | "loading" | "ready" | "error";
const appStatus = yield* createObservable<AppStatus>("idle");
// Define effects for each status
const handleStatusChange = appStatus.subscribe().pipe(
Stream.tap((change) =>
Effect.gen(function* () {
yield* Effect.log(
`[STATUS] ${change.previous} → ${change.current}`
);
switch (change.current) {
case "loading":
yield* Effect.log(`[EFFECT] Starting loading animation`);
break;
case "ready":
yield* Effect.log(`[EFFECT] Hiding spinner, showing content`);
break;
case "error":
yield* Effect.log(`[EFFECT] Showing error message`);
yield* Effect.log(`[TELEMETRY] Logging error event`);
break;
default:
yield* Effect.log(`[EFFECT] Resetting UI`);
}
})
),
Stream.take(6),
Stream.runDrain
);
// Trigger status changes
yield* appStatus.set("loading", "user-clicked");
yield* appStatus.set("ready", "data-loaded");
yield* appStatus.set("loading", "user-refreshed");
yield* appStatus.set("error", "api-failed");
yield* Effect.sleep("200 millis");
// Example 4: Multi-level state aggregation
console.log(`\n[4] Aggregated state from multiple sources:\n`);
interface UserProfile {
name: string;
email: string;
role: string;
}
interface AppState {
user: UserProfile | null;
notifications: number;
theme: "light" | "dark";
}
const appState = yield* createObservable<AppState>({
user: null,
notifications: 0,
theme: "light",
});
// Subscribe to track changes
const trackChanges = appState.subscribe().pipe(
Stream.tap((change) => {
if (change.current.user && !change.previous.user) {
return Effect.log(`[EVENT] User logged in: ${change.current.user.name}`);
}
if (!change.current.user && change.previous.user) {
return Effect.log(`[EVENT] User logged out`);
}
if (change.current.notifications !== change.previous.notifications) {
return Effect.log(
`[NOTIFY] ${change.current.notifications} notifications`
);
}
if (change.current.theme !== change.previous.theme) {
return Effect.log(`[THEME] Switched to ${change.current.theme}`);
}
return Effect.succeed(undefined);
}),
Stream.take(10),
Stream.runDrain
);
// Make changes
yield* appState.modify(
(state) => ({
...state,
user: { name: "Alice", email: "alice@example.com", role: "admin" },
}),
"user-login"
);
yield* appState.modify(
(state) => ({
...state,
notifications: 5,
}),
"new-notifications"
);
yield* appState.modify(
(state) => ({
...state,
theme: "dark",
}),
"user-preference"
);
yield* Effect.sleep("200 millis");
// Example 5: State snapshot and history
console.log(`\n[5] State history tracking:\n`);
interface HistoryEntry<T> {
value: T;
timestamp: Date;
reason: string;
}
const history = yield* Ref.make<HistoryEntry<number>[]>([]);
const trackedCounter = yield* createObservable(0);
const trackHistory = trackedCounter.subscribe().pipe(
Stream.tap((change) =>
Effect.gen(function* () {
yield* Ref.modify(history, (h) => [
undefined,
[
...h,
{
value: change.current,
timestamp: change.timestamp,
reason: change.reason,
},
],
]);
yield* Effect.log(
`[HISTORY] Recorded: ${change.current} (${change.reason})`
);
})
),
Stream.take(5),
Stream.runDrain
);
// Make changes
for (let i = 1; i <= 4; i++) {
yield* trackedCounter.set(i, `step-${i}`);
}
yield* Effect.sleep("200 millis");
// Print history
const hist = yield* Ref.get(history);
yield* Effect.log(`\n[HISTORY] ${hist.length} entries:`);
for (const entry of hist) {
yield* Effect.log(
` - ${entry.value} (${entry.reason})`
);
}
});
Effect.runPromise(program);Derive multiple values from state:
const createComputedObservable = <T, U,>(
source: Observable<T>,
compute: (value: T) => U
): Effect.Effect<Observable<U>> =>
Effect.gen(function* () {
const computed = yield* createObservable(
compute(yield* source.get())
);
source.subscribe().pipe(
Stream.tap((change) =>
computed.set(compute(change.current), `derived-from-${change.reason}`)
),
Stream.runDrain
);
return computed;
});
// Usage: Compute full name from user
const createFullNameObservable = (user: Observable<User>) =>
createComputedObservable(
user,
(u) => `${u.firstName} ${u.lastName}`
);Group related state changes:
const transactional = <T,>(
observable: Observable<T>,
updates: Array<(current: T) => T>,
reason: string
): Effect.Effect<void> =>
Effect.gen(function* () {
let current = yield* observable.get();
// Apply all updates
for (const update of updates) {
current = update(current);
}
// Single notification
yield* observable.set(current, reason);
});
// Usage: Update multiple fields as one transaction
const transaction = transactional(
appState,
[
(s) => ({ ...s, notifications: 0 }),
(s) => ({ ...s, lastSeen: new Date() }),
],
"mark-all-as-read"
);✅ Use observable state when:
- UI binds to state
- Multiple subscribers
- Derived state needed
- Effect triggering needed
- State-driven architecture
✅ Use PubSub when:
- Fan-out notifications
- Multiple subscribers
- Stream processing
- Decoupled components
- Memory overhead (subscriptions)
- Notification latency
- Complexity of debugging
- Potential for circular updates
| Pattern | When | Trade-off |
|---|---|---|
| Polling | Simple state | Inefficient |
| Observable | Multiple subscribers | Overhead |
| Computed | Derived values | Update cycles |
| Transactional | Related changes | Complexity |
- Concurrency Pattern 5: PubSub Broadcasting - Event broadcasting
- State Management Pattern 1: SynchronizedRef - Thread-safe state
- Stream Pattern 4: Stateful Operations - Stream state
- Stream Pattern 1: Map & Filter - Stream transforms