Skip to content

Latest commit

 

History

History
559 lines (447 loc) · 13.1 KB

File metadata and controls

559 lines (447 loc) · 13.1 KB
title State Management Pattern 2: Observable State with SubscriptionRef
id state-management-pattern-subscription-ref
skillLevel advanced
applicationPatternId concurrency
summary Build observable state that notifies subscribers on changes, enabling reactive patterns and state-driven architecture.
tags
state-management
reactivity
pub-sub
event-driven
state-binding
reactive-ui
rule
description
Combine Ref with PubSub to create observable state where changes trigger notifications, enabling reactive state management.
related
concurrency-pattern-pubsub-event-broadcast
state-management-pattern-synchronized-ref
stream-pattern-stateful-operations
author effect_website
lessonOrder 9

Guideline

Observable state enables reactive patterns:

  • State binding: UI binds to state, auto-updates on change
  • Subscribers: Multiple handlers notified on change
  • Event streams: Changes become event streams
  • Derived state: Compute values from state changes
  • Effect triggering: Changes trigger side effects

Pattern: Combine Ref + PubSub or custom subscription system


Rationale

Passive state causes problems:

Problem 1: Stale UI

  • State changes in backend
  • UI doesn't know
  • User sees old data
  • Manual refresh required

Problem 2: Cascading updates

  • User changes form field
  • Need to update 5 other fields
  • Manual imperative code
  • Fragile, easy to miss one

Problem 3: Derived state

  • Total = sum of items
  • Manual update on each item change
  • Duplicate code everywhere
  • Bug: total not updated when items change

Problem 4: Side effects

  • User enables feature
  • Multiple things must happen
  • Analytics, notifications, API calls
  • All imperative, hard to maintain

Solutions:

Observable state:

  • State change = event
  • Subscribers notified
  • UI binds directly
  • Auto-updates

Reactive flows:

  • Define how state flows
  • newTotal = items.sum()
  • Automatic recalculation
  • No manual updates

Side effect chaining:

  • When state changes to "complete"
  • Send notification
  • Log event
  • Trigger cleanup
  • All declaratively

Good Example

This example demonstrates observable state patterns.

import { Effect, Ref, PubSub, Stream } from "effect";

interface StateChange<T> {
  readonly previous: T;
  readonly current: T;
  readonly timestamp: Date;
  readonly reason: string;
}

interface Observable<T> {
  readonly get: () => Effect.Effect<T>;
  readonly set: (value: T, reason: string) => Effect.Effect<void>;
  readonly subscribe: () => Stream.Stream<StateChange<T>>;
  readonly modify: (f: (current: T) => T, reason: string) => Effect.Effect<void>;
}

const program = Effect.gen(function* () {
  console.log(
    `\n[OBSERVABLE STATE] Reactive state management\n`
  );

  // Create observable
  const createObservable = <T,>(initialValue: T): Effect.Effect<Observable<T>> =>
    Effect.gen(function* () {
      const state = yield* Ref.make(initialValue);
      const changeStream = yield* PubSub.unbounded<StateChange<T>>();

      return {
        get: () => Ref.get(state),

        set: (value: T, reason: string) =>
          Effect.gen(function* () {
            const previous = yield* Ref.get(state);

            if (previous === value) {
              return; // No change
            }

            yield* Ref.set(state, value);

            const change: StateChange<T> = {
              previous,
              current: value,
              timestamp: new Date(),
              reason,
            };

            yield* PubSub.publish(changeStream, change);
          }),

        subscribe: () =>
          PubSub.subscribe(changeStream),

        modify: (f: (current: T) => T, reason: string) =>
          Effect.gen(function* () {
            const previous = yield* Ref.get(state);
            const updated = f(previous);

            if (previous === updated) {
              return; // No change
            }

            yield* Ref.set(state, updated);

            const change: StateChange<T> = {
              previous,
              current: updated,
              timestamp: new Date(),
              reason,
            };

            yield* PubSub.publish(changeStream, change);
          }),
      };
    });

  // Example 1: Basic observable counter
  console.log(`[1] Observable counter:\n`);

  const counter = yield* createObservable(0);

  // Subscribe to changes
  const printChanges = counter.subscribe().pipe(
    Stream.tap((change) =>
      Effect.log(
        `[CHANGE] ${change.previous}${change.current} (${change.reason})`
      )
    ),
    Stream.take(5), // Limit to 5 changes for demo
    Stream.runDrain
  );

  // Make changes
  yield* counter.set(1, "increment");
  yield* counter.set(2, "increment");
  yield* counter.set(5, "reset");

  // Wait for changes to be processed
  yield* Effect.sleep("100 millis");

  // Example 2: Derived state (computed values)
  console.log(`\n[2] Derived state (total from items):\n`);

  interface ShoppingCart {
    readonly items: Array<{ id: string; price: number }>;
    readonly discount: number;
  }

  const cart = yield* createObservable<ShoppingCart>({
    items: [],
    discount: 0,
  });

  const computeTotal = (state: ShoppingCart): number => {
    const subtotal = state.items.reduce((sum, item) => sum + item.price, 0);
    return subtotal * (1 - state.discount);
  };

  // Create derived observable
  const total = yield* createObservable(computeTotal(yield* cart.get()));

  // Subscribe to cart changes, update total
  const updateTotalOnCartChange = cart.subscribe().pipe(
    Stream.tap((change) =>
      Effect.gen(function* () {
        const newTotal = computeTotal(change.current);

        yield* total.set(newTotal, "recalculated-from-cart");

        yield* Effect.log(
          `[TOTAL] Recalculated: $${newTotal.toFixed(2)}`
        );
      })
    ),
    Stream.take(10),
    Stream.runDrain
  );

  // Make cart changes
  yield* cart.modify(
    (state) => ({
      ...state,
      items: [
        ...state.items,
        { id: "item1", price: 19.99 },
      ],
    }),
    "add-item"
  );

  yield* cart.modify(
    (state) => ({
      ...state,
      items: [
        ...state.items,
        { id: "item2", price: 29.99 },
      ],
    }),
    "add-item"
  );

  yield* cart.modify(
    (state) => ({
      ...state,
      discount: 0.1,
    }),
    "apply-discount"
  );

  yield* Effect.sleep("200 millis");

  // Example 3: Effect triggering on state change
  console.log(`\n[3] Effects triggered by state changes:\n`);

  type AppStatus = "idle" | "loading" | "ready" | "error";

  const appStatus = yield* createObservable<AppStatus>("idle");

  // Define effects for each status
  const handleStatusChange = appStatus.subscribe().pipe(
    Stream.tap((change) =>
      Effect.gen(function* () {
        yield* Effect.log(
          `[STATUS] ${change.previous}${change.current}`
        );

        switch (change.current) {
          case "loading":
            yield* Effect.log(`[EFFECT] Starting loading animation`);
            break;

          case "ready":
            yield* Effect.log(`[EFFECT] Hiding spinner, showing content`);
            break;

          case "error":
            yield* Effect.log(`[EFFECT] Showing error message`);
            yield* Effect.log(`[TELEMETRY] Logging error event`);
            break;

          default:
            yield* Effect.log(`[EFFECT] Resetting UI`);
        }
      })
    ),
    Stream.take(6),
    Stream.runDrain
  );

  // Trigger status changes
  yield* appStatus.set("loading", "user-clicked");
  yield* appStatus.set("ready", "data-loaded");
  yield* appStatus.set("loading", "user-refreshed");
  yield* appStatus.set("error", "api-failed");

  yield* Effect.sleep("200 millis");

  // Example 4: Multi-level state aggregation
  console.log(`\n[4] Aggregated state from multiple sources:\n`);

  interface UserProfile {
    name: string;
    email: string;
    role: string;
  }

  interface AppState {
    user: UserProfile | null;
    notifications: number;
    theme: "light" | "dark";
  }

  const appState = yield* createObservable<AppState>({
    user: null,
    notifications: 0,
    theme: "light",
  });

  // Subscribe to track changes
  const trackChanges = appState.subscribe().pipe(
    Stream.tap((change) => {
      if (change.current.user && !change.previous.user) {
        return Effect.log(`[EVENT] User logged in: ${change.current.user.name}`);
      }

      if (!change.current.user && change.previous.user) {
        return Effect.log(`[EVENT] User logged out`);
      }

      if (change.current.notifications !== change.previous.notifications) {
        return Effect.log(
          `[NOTIFY] ${change.current.notifications} notifications`
        );
      }

      if (change.current.theme !== change.previous.theme) {
        return Effect.log(`[THEME] Switched to ${change.current.theme}`);
      }

      return Effect.succeed(undefined);
    }),
    Stream.take(10),
    Stream.runDrain
  );

  // Make changes
  yield* appState.modify(
    (state) => ({
      ...state,
      user: { name: "Alice", email: "alice@example.com", role: "admin" },
    }),
    "user-login"
  );

  yield* appState.modify(
    (state) => ({
      ...state,
      notifications: 5,
    }),
    "new-notifications"
  );

  yield* appState.modify(
    (state) => ({
      ...state,
      theme: "dark",
    }),
    "user-preference"
  );

  yield* Effect.sleep("200 millis");

  // Example 5: State snapshot and history
  console.log(`\n[5] State history tracking:\n`);

  interface HistoryEntry<T> {
    value: T;
    timestamp: Date;
    reason: string;
  }

  const history = yield* Ref.make<HistoryEntry<number>[]>([]);

  const trackedCounter = yield* createObservable(0);

  const trackHistory = trackedCounter.subscribe().pipe(
    Stream.tap((change) =>
      Effect.gen(function* () {
        yield* Ref.modify(history, (h) => [
          undefined,
          [
            ...h,
            {
              value: change.current,
              timestamp: change.timestamp,
              reason: change.reason,
            },
          ],
        ]);

        yield* Effect.log(
          `[HISTORY] Recorded: ${change.current} (${change.reason})`
        );
      })
    ),
    Stream.take(5),
    Stream.runDrain
  );

  // Make changes
  for (let i = 1; i <= 4; i++) {
    yield* trackedCounter.set(i, `step-${i}`);
  }

  yield* Effect.sleep("200 millis");

  // Print history
  const hist = yield* Ref.get(history);

  yield* Effect.log(`\n[HISTORY] ${hist.length} entries:`);

  for (const entry of hist) {
    yield* Effect.log(
      `  - ${entry.value} (${entry.reason})`
    );
  }
});

Effect.runPromise(program);

Advanced: Computed Observable Properties

Derive multiple values from state:

const createComputedObservable = <T, U,>(
  source: Observable<T>,
  compute: (value: T) => U
): Effect.Effect<Observable<U>> =>
  Effect.gen(function* () {
    const computed = yield* createObservable(
      compute(yield* source.get())
    );

    source.subscribe().pipe(
      Stream.tap((change) =>
        computed.set(compute(change.current), `derived-from-${change.reason}`)
      ),
      Stream.runDrain
    );

    return computed;
  });

// Usage: Compute full name from user
const createFullNameObservable = (user: Observable<User>) =>
  createComputedObservable(
    user,
    (u) => `${u.firstName} ${u.lastName}`
  );

Advanced: Transactional Updates

Group related state changes:

const transactional = <T,>(
  observable: Observable<T>,
  updates: Array<(current: T) => T>,
  reason: string
): Effect.Effect<void> =>
  Effect.gen(function* () {
    let current = yield* observable.get();

    // Apply all updates
    for (const update of updates) {
      current = update(current);
    }

    // Single notification
    yield* observable.set(current, reason);
  });

// Usage: Update multiple fields as one transaction
const transaction = transactional(
  appState,
  [
    (s) => ({ ...s, notifications: 0 }),
    (s) => ({ ...s, lastSeen: new Date() }),
  ],
  "mark-all-as-read"
);

When to Use This Pattern

Use observable state when:

  • UI binds to state
  • Multiple subscribers
  • Derived state needed
  • Effect triggering needed
  • State-driven architecture

Use PubSub when:

  • Fan-out notifications
  • Multiple subscribers
  • Stream processing
  • Decoupled components

⚠️ Trade-offs:

  • Memory overhead (subscriptions)
  • Notification latency
  • Complexity of debugging
  • Potential for circular updates

Patterns for Reactivity

Pattern When Trade-off
Polling Simple state Inefficient
Observable Multiple subscribers Overhead
Computed Derived values Update cycles
Transactional Related changes Complexity

See Also