| title | Stream Pattern 4: Stateful Operations with Scan and Fold | ||||||
|---|---|---|---|---|---|---|---|
| id | stream-pattern-stateful-operations | ||||||
| skillLevel | intermediate | ||||||
| applicationPatternId | streams | ||||||
| summary | Use Stream.scan and Stream.fold to maintain state across stream elements, enabling cumulative operations, counters, aggregations, and stateful transformations. | ||||||
| tags |
|
||||||
| rule |
|
||||||
| related |
|
||||||
| author | effect_website | ||||||
| lessonOrder | 9 |
Stateful stream operations:
- scan: Apply function with accumulator, emit intermediate states
- fold: Apply function with accumulator, emit only final result
- reduce: Like fold but requires non-empty stream
Pattern: stream.pipe(Stream.scan(initialState, reducer)) or Stream.fold(initialState, reducer)
Processing streams without scan/fold creates issues:
- Manual state tracking: Ref or mutable variables outside stream
- Lost context: Hard to correlate intermediate values
- Error-prone: Easy to forget state updates
- Testing difficulty: State spread across code
Scan/fold enable:
- Declarative state: State threaded through stream
- Intermediate values: Emit state at each step (scan)
- Type-safe: Accumulator type guaranteed
- Composable: Chain stateful operations
Real-world example: Running average of metrics
- Without scan: Track count and sum manually, calculate average, emit
- With scan:
stream.pipe(Stream.scan(initialState, updateAverage))
This example demonstrates maintaining statistics across a stream of measurements.
import { Stream, Effect, Chunk } from "effect";
interface Measurement {
readonly id: number;
readonly value: number;
readonly timestamp: Date;
}
interface RunningStats {
readonly count: number;
readonly sum: number;
readonly min: number;
readonly max: number;
readonly average: number;
readonly variance: number;
readonly lastValue: number;
}
// Create stream of measurements
const createMeasurementStream = (): Stream.Stream<Measurement> =>
Stream.fromIterable([
{ id: 1, value: 10, timestamp: new Date() },
{ id: 2, value: 20, timestamp: new Date() },
{ id: 3, value: 15, timestamp: new Date() },
{ id: 4, value: 25, timestamp: new Date() },
{ id: 5, value: 30, timestamp: new Date() },
{ id: 6, value: 22, timestamp: new Date() },
]);
// Initial statistics state
const initialStats: RunningStats = {
count: 0,
sum: 0,
min: Infinity,
max: -Infinity,
average: 0,
variance: 0,
lastValue: 0,
};
// Reducer: update stats for each measurement
const updateStats = (
stats: RunningStats,
measurement: Measurement
): RunningStats => {
const newCount = stats.count + 1;
const newSum = stats.sum + measurement.value;
const newAverage = newSum / newCount;
// Calculate variance incrementally
const delta = measurement.value - stats.average;
const delta2 = measurement.value - newAverage;
const newVariance = stats.variance + delta * delta2;
return {
count: newCount,
sum: newSum,
min: Math.min(stats.min, measurement.value),
max: Math.max(stats.max, measurement.value),
average: newAverage,
variance: newVariance / newCount,
lastValue: measurement.value,
};
};
// Main: demonstrate scan with statistics
const program = Effect.gen(function* () {
console.log(`\n[SCAN] Running statistics stream:\n`);
// Use scan to emit intermediate statistics
const statsStream = createMeasurementStream().pipe(
Stream.scan(initialStats, (stats, measurement) => {
const newStats = updateStats(stats, measurement);
console.log(
`[MEASUREMENT ${measurement.id}] Value: ${measurement.value}`
);
console.log(
` Count: ${newStats.count}, Avg: ${newStats.average.toFixed(2)}, ` +
`Min: ${newStats.min}, Max: ${newStats.max}, ` +
`Variance: ${newStats.variance.toFixed(2)}`
);
return newStats;
})
);
// Collect all intermediate stats
const allStats = yield* statsStream.pipe(Stream.runCollect);
// Final statistics
const finalStats = Chunk.last(allStats);
if (finalStats._tag === "Some") {
console.log(`\n[FINAL STATISTICS]`);
console.log(` Total measurements: ${finalStats.value.count}`);
console.log(` Average: ${finalStats.value.average.toFixed(2)}`);
console.log(` Min: ${finalStats.value.min}`);
console.log(` Max: ${finalStats.value.max}`);
console.log(
` Std Dev: ${Math.sqrt(finalStats.value.variance).toFixed(2)}`
);
}
// Compare with fold (emit only final result)
console.log(`\n[FOLD] Final statistics only:\n`);
const finalResult = yield* createMeasurementStream().pipe(
Stream.fold(initialStats, updateStats),
Stream.tap((stats) =>
Effect.log(`Final: Count=${stats.count}, Avg=${stats.average.toFixed(2)}`)
)
);
});
Effect.runPromise(program);Use scan to implement state machine across stream:
enum State {
Idle = "idle",
Processing = "processing",
Complete = "complete",
}
interface WorkItem {
readonly id: number;
readonly status: "pending" | "done" | "error";
}
interface WorkState {
readonly state: State;
readonly itemsProcessed: number;
readonly itemsError: number;
readonly currentBatch: number;
}
const stateTransition = (
workState: WorkState,
item: WorkItem
): WorkState => {
// State machine logic
if (workState.state === "idle" && item.status === "pending") {
return {
...workState,
state: State.Processing,
currentBatch: 1,
};
}
if (workState.state === "processing") {
if (item.status === "done") {
return {
...workState,
itemsProcessed: workState.itemsProcessed + 1,
currentBatch: workState.currentBatch + 1,
};
}
if (item.status === "error") {
return {
...workState,
itemsError: workState.itemsError + 1,
state: State.Complete,
};
}
}
return workState;
};
const stateStream = Stream.fromIterable([
{ id: 1, status: "pending" as const },
{ id: 2, status: "done" as const },
{ id: 3, status: "done" as const },
{ id: 4, status: "error" as const },
]).pipe(
Stream.scan<WorkItem, WorkState>(
{ state: State.Idle, itemsProcessed: 0, itemsError: 0, currentBatch: 0 },
stateTransition
),
Stream.tap((state) =>
Effect.log(`State: ${state.state}, Processed: ${state.itemsProcessed}`)
)
);Track statistics over sliding windows:
interface WindowedStats {
readonly windowStart: number;
readonly windowEnd: number;
readonly count: number;
readonly sum: number;
readonly average: number;
}
const slidingWindowStats = <A extends { value: number }>(
stream: Stream.Stream<A>,
windowSizeMs: number
): Stream.Stream<WindowedStats> => {
const initialWindow: WindowedStats = {
windowStart: Date.now(),
windowEnd: Date.now() + windowSizeMs,
count: 0,
sum: 0,
average: 0,
};
return stream.pipe(
Stream.scan(initialWindow, (window, item) => {
const now = Date.now();
// Check if outside current window
if (now > window.windowEnd) {
// Start new window
return {
windowStart: now,
windowEnd: now + windowSizeMs,
count: 1,
sum: item.value,
average: item.value,
};
}
// Add to current window
const newCount = window.count + 1;
const newSum = window.sum + item.value;
return {
...window,
count: newCount,
sum: newSum,
average: newSum / newCount,
};
})
);
};
// Usage: Track 5-second windowed average
const windowedMetrics = slidingWindowStats(
Stream.fromIterable(
Array.from({ length: 100 }, (_, i) => ({ value: Math.random() * 100 }))
),
5000
).pipe(
Stream.tap((window) =>
Effect.log(
`Window avg: ${window.average.toFixed(2)}, count: ${window.count}`
)
)
);Apply different accumulation rules based on values:
interface ConditionalStats {
readonly allCount: number;
readonly evenSum: number;
readonly oddSum: number;
readonly largeCount: number;
}
const conditionalFold = (
stream: Stream.Stream<number>
): Stream.Stream<ConditionalStats> =>
stream.pipe(
Stream.fold<number, ConditionalStats>(
{
allCount: 0,
evenSum: 0,
oddSum: 0,
largeCount: 0,
},
(stats, value) => ({
allCount: stats.allCount + 1,
evenSum: value % 2 === 0 ? stats.evenSum + value : stats.evenSum,
oddSum: value % 2 !== 0 ? stats.oddSum + value : stats.oddSum,
largeCount: value > 50 ? stats.largeCount + 1 : stats.largeCount,
})
)
);
// Usage
const conditionalStats = conditionalFold(
Stream.fromIterable([10, 25, 40, 55, 70, 85])
).pipe(
Stream.tap((stats) =>
Effect.log(
`Even sum: ${stats.evenSum}, Odd sum: ${stats.oddSum}, Large: ${stats.largeCount}`
)
)
);Collect errors while processing stream:
interface ProcessResult {
readonly succeeded: number;
readonly failed: number;
readonly errors: Error[];
}
const processWithErrorCollection = <A>(
stream: Stream.Stream<A>,
process: (item: A) => Effect.Effect<void>
): Effect.Effect<ProcessResult> =>
stream.pipe(
Stream.fold<A, ProcessResult>(
{ succeeded: 0, failed: 0, errors: [] },
async (result, item) => {
try {
await process(item);
return { ...result, succeeded: result.succeeded + 1 };
} catch (error) {
return {
...result,
failed: result.failed + 1,
errors: [...result.errors, error as Error],
};
}
}
),
Stream.take(1),
Stream.runCollect,
Effect.map((chunk) => Chunk.head(chunk)?._tag === "Some" ? Chunk.head(chunk)!.value : { succeeded: 0, failed: 0, errors: [] })
);
// Usage: Process items, collect errors
const errorCollection = processWithErrorCollection(
Stream.fromIterable([1, 2, 3, 4, 5]),
(item) =>
Effect.gen(function* () {
if (item === 3) {
throw new Error("Processing failed");
}
yield* Effect.log(`Processed: ${item}`);
})
);✅ Use scan when:
- Tracking state across elements
- Need intermediate values (running total, count)
- State transitions based on elements
- Debugging stream values at stages
✅ Use fold when:
- Need only final aggregate
- Computing single result from stream
- Collecting summary statistics
- Building final data structure
- Accumulator type becomes state carrier
- Complex accumulator types hard to debug
- Errors in reducer crash stream
- State not shared outside stream
| Operator | Emits | Use Case |
|---|---|---|
| scan | Intermediate + final | Running statistics, state trace |
| fold | Only final | Summary aggregate |
| reduce | Only final | Non-empty stream aggregation |
- Stream Pattern 1: Map & Filter Transformations - Stream transformations
- Stream Collect Results - Collecting stream output
- Manage Shared State with Ref - External state management
- Process Streaming Data with Stream - Stream basics