Skip to content

Latest commit

 

History

History
473 lines (394 loc) · 11.7 KB

File metadata and controls

473 lines (394 loc) · 11.7 KB
title Stream Pattern 4: Stateful Operations with Scan and Fold
id stream-pattern-stateful-operations
skillLevel intermediate
applicationPatternId streams
summary Use Stream.scan and Stream.fold to maintain state across stream elements, enabling cumulative operations, counters, aggregations, and stateful transformations.
tags
streams
state
scan
fold
aggregation
stateful-processing
rule
description
Use scan for stateful element-by-element processing and fold for final aggregation, enabling complex stream analytics without buffering entire stream.
related
stream-pattern-map-filter-transformations
process-streaming-data-with-stream
manage-shared-state-with-ref
author effect_website
lessonOrder 9

Guideline

Stateful stream operations:

  • scan: Apply function with accumulator, emit intermediate states
  • fold: Apply function with accumulator, emit only final result
  • reduce: Like fold but requires non-empty stream

Pattern: stream.pipe(Stream.scan(initialState, reducer)) or Stream.fold(initialState, reducer)


Rationale

Processing streams without scan/fold creates issues:

  • Manual state tracking: Ref or mutable variables outside stream
  • Lost context: Hard to correlate intermediate values
  • Error-prone: Easy to forget state updates
  • Testing difficulty: State spread across code

Scan/fold enable:

  • Declarative state: State threaded through stream
  • Intermediate values: Emit state at each step (scan)
  • Type-safe: Accumulator type guaranteed
  • Composable: Chain stateful operations

Real-world example: Running average of metrics

  • Without scan: Track count and sum manually, calculate average, emit
  • With scan: stream.pipe(Stream.scan(initialState, updateAverage))

Good Example

This example demonstrates maintaining statistics across a stream of measurements.

import { Stream, Effect, Chunk } from "effect";

interface Measurement {
  readonly id: number;
  readonly value: number;
  readonly timestamp: Date;
}

interface RunningStats {
  readonly count: number;
  readonly sum: number;
  readonly min: number;
  readonly max: number;
  readonly average: number;
  readonly variance: number;
  readonly lastValue: number;
}

// Create stream of measurements
const createMeasurementStream = (): Stream.Stream<Measurement> =>
  Stream.fromIterable([
    { id: 1, value: 10, timestamp: new Date() },
    { id: 2, value: 20, timestamp: new Date() },
    { id: 3, value: 15, timestamp: new Date() },
    { id: 4, value: 25, timestamp: new Date() },
    { id: 5, value: 30, timestamp: new Date() },
    { id: 6, value: 22, timestamp: new Date() },
  ]);

// Initial statistics state
const initialStats: RunningStats = {
  count: 0,
  sum: 0,
  min: Infinity,
  max: -Infinity,
  average: 0,
  variance: 0,
  lastValue: 0,
};

// Reducer: update stats for each measurement
const updateStats = (
  stats: RunningStats,
  measurement: Measurement
): RunningStats => {
  const newCount = stats.count + 1;
  const newSum = stats.sum + measurement.value;
  const newAverage = newSum / newCount;

  // Calculate variance incrementally
  const delta = measurement.value - stats.average;
  const delta2 = measurement.value - newAverage;
  const newVariance = stats.variance + delta * delta2;

  return {
    count: newCount,
    sum: newSum,
    min: Math.min(stats.min, measurement.value),
    max: Math.max(stats.max, measurement.value),
    average: newAverage,
    variance: newVariance / newCount,
    lastValue: measurement.value,
  };
};

// Main: demonstrate scan with statistics
const program = Effect.gen(function* () {
  console.log(`\n[SCAN] Running statistics stream:\n`);

  // Use scan to emit intermediate statistics
  const statsStream = createMeasurementStream().pipe(
    Stream.scan(initialStats, (stats, measurement) => {
      const newStats = updateStats(stats, measurement);

      console.log(
        `[MEASUREMENT ${measurement.id}] Value: ${measurement.value}`
      );
      console.log(
        `  Count: ${newStats.count}, Avg: ${newStats.average.toFixed(2)}, ` +
        `Min: ${newStats.min}, Max: ${newStats.max}, ` +
        `Variance: ${newStats.variance.toFixed(2)}`
      );

      return newStats;
    })
  );

  // Collect all intermediate stats
  const allStats = yield* statsStream.pipe(Stream.runCollect);

  // Final statistics
  const finalStats = Chunk.last(allStats);

  if (finalStats._tag === "Some") {
    console.log(`\n[FINAL STATISTICS]`);
    console.log(`  Total measurements: ${finalStats.value.count}`);
    console.log(`  Average: ${finalStats.value.average.toFixed(2)}`);
    console.log(`  Min: ${finalStats.value.min}`);
    console.log(`  Max: ${finalStats.value.max}`);
    console.log(
      `  Std Dev: ${Math.sqrt(finalStats.value.variance).toFixed(2)}`
    );
  }

  // Compare with fold (emit only final result)
  console.log(`\n[FOLD] Final statistics only:\n`);

  const finalResult = yield* createMeasurementStream().pipe(
    Stream.fold(initialStats, updateStats),
    Stream.tap((stats) =>
      Effect.log(`Final: Count=${stats.count}, Avg=${stats.average.toFixed(2)}`)
    )
  );
});

Effect.runPromise(program);

Advanced: Multi-Stage State Machine

Use scan to implement state machine across stream:

enum State {
  Idle = "idle",
  Processing = "processing",
  Complete = "complete",
}

interface WorkItem {
  readonly id: number;
  readonly status: "pending" | "done" | "error";
}

interface WorkState {
  readonly state: State;
  readonly itemsProcessed: number;
  readonly itemsError: number;
  readonly currentBatch: number;
}

const stateTransition = (
  workState: WorkState,
  item: WorkItem
): WorkState => {
  // State machine logic
  if (workState.state === "idle" && item.status === "pending") {
    return {
      ...workState,
      state: State.Processing,
      currentBatch: 1,
    };
  }

  if (workState.state === "processing") {
    if (item.status === "done") {
      return {
        ...workState,
        itemsProcessed: workState.itemsProcessed + 1,
        currentBatch: workState.currentBatch + 1,
      };
    }

    if (item.status === "error") {
      return {
        ...workState,
        itemsError: workState.itemsError + 1,
        state: State.Complete,
      };
    }
  }

  return workState;
};

const stateStream = Stream.fromIterable([
  { id: 1, status: "pending" as const },
  { id: 2, status: "done" as const },
  { id: 3, status: "done" as const },
  { id: 4, status: "error" as const },
]).pipe(
  Stream.scan<WorkItem, WorkState>(
    { state: State.Idle, itemsProcessed: 0, itemsError: 0, currentBatch: 0 },
    stateTransition
  ),
  Stream.tap((state) =>
    Effect.log(`State: ${state.state}, Processed: ${state.itemsProcessed}`)
  )
);

Advanced: Windowed Aggregation

Track statistics over sliding windows:

interface WindowedStats {
  readonly windowStart: number;
  readonly windowEnd: number;
  readonly count: number;
  readonly sum: number;
  readonly average: number;
}

const slidingWindowStats = <A extends { value: number }>(
  stream: Stream.Stream<A>,
  windowSizeMs: number
): Stream.Stream<WindowedStats> => {
  const initialWindow: WindowedStats = {
    windowStart: Date.now(),
    windowEnd: Date.now() + windowSizeMs,
    count: 0,
    sum: 0,
    average: 0,
  };

  return stream.pipe(
    Stream.scan(initialWindow, (window, item) => {
      const now = Date.now();

      // Check if outside current window
      if (now > window.windowEnd) {
        // Start new window
        return {
          windowStart: now,
          windowEnd: now + windowSizeMs,
          count: 1,
          sum: item.value,
          average: item.value,
        };
      }

      // Add to current window
      const newCount = window.count + 1;
      const newSum = window.sum + item.value;

      return {
        ...window,
        count: newCount,
        sum: newSum,
        average: newSum / newCount,
      };
    })
  );
};

// Usage: Track 5-second windowed average
const windowedMetrics = slidingWindowStats(
  Stream.fromIterable(
    Array.from({ length: 100 }, (_, i) => ({ value: Math.random() * 100 }))
  ),
  5000
).pipe(
  Stream.tap((window) =>
    Effect.log(
      `Window avg: ${window.average.toFixed(2)}, count: ${window.count}`
    )
  )
);

Advanced: Conditional Accumulation

Apply different accumulation rules based on values:

interface ConditionalStats {
  readonly allCount: number;
  readonly evenSum: number;
  readonly oddSum: number;
  readonly largeCount: number;
}

const conditionalFold = (
  stream: Stream.Stream<number>
): Stream.Stream<ConditionalStats> =>
  stream.pipe(
    Stream.fold<number, ConditionalStats>(
      {
        allCount: 0,
        evenSum: 0,
        oddSum: 0,
        largeCount: 0,
      },
      (stats, value) => ({
        allCount: stats.allCount + 1,
        evenSum: value % 2 === 0 ? stats.evenSum + value : stats.evenSum,
        oddSum: value % 2 !== 0 ? stats.oddSum + value : stats.oddSum,
        largeCount: value > 50 ? stats.largeCount + 1 : stats.largeCount,
      })
    )
  );

// Usage
const conditionalStats = conditionalFold(
  Stream.fromIterable([10, 25, 40, 55, 70, 85])
).pipe(
  Stream.tap((stats) =>
    Effect.log(
      `Even sum: ${stats.evenSum}, Odd sum: ${stats.oddSum}, Large: ${stats.largeCount}`
    )
  )
);

Advanced: Error Accumulation

Collect errors while processing stream:

interface ProcessResult {
  readonly succeeded: number;
  readonly failed: number;
  readonly errors: Error[];
}

const processWithErrorCollection = <A>(
  stream: Stream.Stream<A>,
  process: (item: A) => Effect.Effect<void>
): Effect.Effect<ProcessResult> =>
  stream.pipe(
    Stream.fold<A, ProcessResult>(
      { succeeded: 0, failed: 0, errors: [] },
      async (result, item) => {
        try {
          await process(item);
          return { ...result, succeeded: result.succeeded + 1 };
        } catch (error) {
          return {
            ...result,
            failed: result.failed + 1,
            errors: [...result.errors, error as Error],
          };
        }
      }
    ),
    Stream.take(1),
    Stream.runCollect,
    Effect.map((chunk) => Chunk.head(chunk)?._tag === "Some" ? Chunk.head(chunk)!.value : { succeeded: 0, failed: 0, errors: [] })
  );

// Usage: Process items, collect errors
const errorCollection = processWithErrorCollection(
  Stream.fromIterable([1, 2, 3, 4, 5]),
  (item) =>
    Effect.gen(function* () {
      if (item === 3) {
        throw new Error("Processing failed");
      }
      yield* Effect.log(`Processed: ${item}`);
    })
);

When to Use This Pattern

Use scan when:

  • Tracking state across elements
  • Need intermediate values (running total, count)
  • State transitions based on elements
  • Debugging stream values at stages

Use fold when:

  • Need only final aggregate
  • Computing single result from stream
  • Collecting summary statistics
  • Building final data structure

⚠️ Trade-offs:

  • Accumulator type becomes state carrier
  • Complex accumulator types hard to debug
  • Errors in reducer crash stream
  • State not shared outside stream

Scan vs Fold vs Reduce

Operator Emits Use Case
scan Intermediate + final Running statistics, state trace
fold Only final Summary aggregate
reduce Only final Non-empty stream aggregation

See Also