| title | Stream Pattern 5: Grouping and Windowing Streams | ||||||
|---|---|---|---|---|---|---|---|
| id | stream-pattern-grouping-windowing | ||||||
| skillLevel | advanced | ||||||
| applicationPatternId | streams | ||||||
| summary | Use grouping and windowing to organize streams by key or time window, enabling batch operations and temporal aggregations. | ||||||
| tags |
|
||||||
| rule |
|
||||||
| related |
|
||||||
| author | effect_website | ||||||
| lessonOrder | 1 |
Windowing organizes unbounded streams into bounded chunks:
- Tumbling window: Fixed-size non-overlapping (e.g., 1-sec windows)
- Sliding window: Overlapping windows (e.g., 10-sec window, 5-sec hop)
- Group by key: Partition stream by field value
- Session window: Event-based windows (e.g., idle timeout)
- Batch aggregation: Process N items or wait T seconds
Pattern: Stream.groupBy(), custom windowing with Ref and Schedule
Unbounded streams need boundaries:
Problem 1: Memory exhaustion
- Processing 1M events with no boundary = keep all in memory
- Cumulative memory grows unbounded
- Eventually OOM error
Problem 2: Late aggregation
- Can't sum all events until stream ends (never)
- Need to decide: "sum events in this 1-second window"
Problem 3: Grouping complexity
- Stream of user events: need per-user aggregation
- Without groupBy: manual state tracking (error-prone)
Problem 4: Temporal patterns
- "Top 10 searches in last 5 minutes" requires windowing
- "Average response time per endpoint per minute" requires grouping + windowing
Solutions:
Tumbling window:
- Divide stream into 1-sec, 5-sec, or 1-min chunks
- Process each chunk independently
- Clear memory between windows
- Natural for: metrics, batching, reports
Sliding window:
- Keep last 5 minutes of data at all times
- Emit updated aggregation every second
- Detect patterns over overlapping periods
- Natural for: anomaly detection, trends
Group by:
- Separate streams by key
- Each key has independent state
- Emit grouped results
- Natural for: per-user, per-endpoint, per-tenant
This example demonstrates windowing and grouping patterns.
import { Effect, Stream, Ref, Duration, Schedule } from "effect";
interface Event {
readonly timestamp: Date;
readonly userId: string;
readonly action: string;
readonly duration: number; // milliseconds
}
// Simulate event stream
const generateEvents = (): Event[] => [
{ timestamp: new Date(Date.now() - 5000), userId: "user1", action: "click", duration: 100 },
{ timestamp: new Date(Date.now() - 4500), userId: "user2", action: "view", duration: 250 },
{ timestamp: new Date(Date.now() - 4000), userId: "user1", action: "scroll", duration: 150 },
{ timestamp: new Date(Date.now() - 3500), userId: "user3", action: "click", duration: 120 },
{ timestamp: new Date(Date.now() - 3000), userId: "user2", action: "click", duration: 180 },
{ timestamp: new Date(Date.now() - 2500), userId: "user1", action: "view", duration: 200 },
{ timestamp: new Date(Date.now() - 2000), userId: "user3", action: "view", duration: 300 },
{ timestamp: new Date(Date.now() - 1500), userId: "user1", action: "submit", duration: 500 },
{ timestamp: new Date(Date.now() - 1000), userId: "user2", action: "scroll", duration: 100 },
];
// Main: windowing and grouping examples
const program = Effect.gen(function* () {
console.log(`\n[WINDOWING & GROUPING] Stream organization patterns\n`);
const events = generateEvents();
// Example 1: Tumbling window (fixed-size batches)
console.log(`[1] Tumbling window (2-event batches):\n`);
const windowSize = 2;
let batchNumber = 1;
for (let i = 0; i < events.length; i += windowSize) {
const batch = events.slice(i, i + windowSize);
yield* Effect.log(`[WINDOW ${batchNumber}] (${batch.length} events)`);
let totalDuration = 0;
for (const event of batch) {
yield* Effect.log(
` - ${event.userId}: ${event.action} (${event.duration}ms)`
);
totalDuration += event.duration;
}
yield* Effect.log(`[WINDOW ${batchNumber}] Total duration: ${totalDuration}ms\n`);
batchNumber++;
}
// Example 2: Sliding window (overlapping)
console.log(`[2] Sliding window (last 3 events, slide by 1):\n`);
const windowSizeSlide = 3;
const slideBy = 1;
for (let i = 0; i <= events.length - windowSizeSlide; i += slideBy) {
const window = events.slice(i, i + windowSizeSlide);
const avgDuration =
window.reduce((sum, e) => sum + e.duration, 0) / window.length;
yield* Effect.log(
`[SLIDE ${i / slideBy}] ${window.length} events, avg duration: ${avgDuration.toFixed(0)}ms`
);
}
// Example 3: Group by key
console.log(`\n[3] Group by user:\n`);
const byUser = new Map<string, Event[]>();
for (const event of events) {
if (!byUser.has(event.userId)) {
byUser.set(event.userId, []);
}
byUser.get(event.userId)!.push(event);
}
for (const [userId, userEvents] of byUser) {
const totalActions = userEvents.length;
const totalTime = userEvents.reduce((sum, e) => sum + e.duration, 0);
const avgTime = totalTime / totalActions;
yield* Effect.log(
`[USER ${userId}] ${totalActions} actions, ${totalTime}ms total, ${avgTime.toFixed(0)}ms avg`
);
}
// Example 4: Group + Window combination
console.log(`\n[4] Group by user, window by action type:\n`);
for (const [userId, userEvents] of byUser) {
const byAction = new Map<string, Event[]>();
for (const event of userEvents) {
if (!byAction.has(event.action)) {
byAction.set(event.action, []);
}
byAction.get(event.action)!.push(event);
}
yield* Effect.log(`[USER ${userId}] Action breakdown:`);
for (const [action, actionEvents] of byAction) {
const count = actionEvents.length;
const total = actionEvents.reduce((sum, e) => sum + e.duration, 0);
yield* Effect.log(` ${action}: ${count}x (${total}ms total)`);
}
}
// Example 5: Session window (based on inactivity timeout)
console.log(`\n[5] Session window (gap > 1000ms = new session):\n`);
const sessionGapMs = 1000;
const sessions: Event[][] = [];
let currentSession: Event[] = [];
let lastTimestamp = events[0]?.timestamp.getTime() ?? 0;
for (const event of events) {
const currentTime = event.timestamp.getTime();
const timeSinceLastEvent = currentTime - lastTimestamp;
if (timeSinceLastEvent > sessionGapMs && currentSession.length > 0) {
sessions.push(currentSession);
yield* Effect.log(
`[SESSION] Closed (${currentSession.length} events, gap: ${timeSinceLastEvent}ms)`
);
currentSession = [];
}
currentSession.push(event);
lastTimestamp = currentTime;
}
if (currentSession.length > 0) {
sessions.push(currentSession);
yield* Effect.log(`[SESSION] Final (${currentSession.length} events)`);
}
// Example 6: Top-K aggregation in window
console.log(`\n[6] Top 2 actions in last window:\n`);
const lastWindow = events.slice(-3);
const actionCounts = new Map<string, number>();
for (const event of lastWindow) {
actionCounts.set(
event.action,
(actionCounts.get(event.action) ?? 0) + 1
);
}
const topActions = Array.from(actionCounts.entries())
.sort((a, b) => b[1] - a[1])
.slice(0, 2);
yield* Effect.log(`[TOP-K] In last window of 3 events:`);
for (const [action, count] of topActions) {
yield* Effect.log(` ${action}: ${count}x`);
}
});
Effect.runPromise(program);Build sophisticated windows using Ref:
interface Window<A> {
id: string;
items: A[];
openTime: Date;
closeTime: Date;
}
const createWindowManager = <A,>(config: {
windowDurationMs: number;
maxItemsPerWindow: number;
onWindowClose: (window: Window<A>) => Effect.Effect<void>;
}) =>
Effect.gen(function* () {
const currentWindow = yield* Ref.make<Window<A>>({
id: `window-${Date.now()}`,
items: [],
openTime: new Date(),
closeTime: new Date(Date.now() + config.windowDurationMs),
});
const addItem = (item: A) =>
Effect.gen(function* () {
const window = yield* Ref.get(currentWindow);
if (
window.items.length >= config.maxItemsPerWindow ||
Date.now() >= window.closeTime.getTime()
) {
// Close current window and open new one
yield* config.onWindowClose(window);
yield* Ref.set(currentWindow, {
id: `window-${Date.now()}`,
items: [item],
openTime: new Date(),
closeTime: new Date(Date.now() + config.windowDurationMs),
});
} else {
// Add to current window
yield* Ref.modify(currentWindow, (w) => [
undefined,
{ ...w, items: [...w.items, item] },
]);
}
});
return { addItem, currentWindow };
});
// Usage
const windowManager = createWindowManager({
windowDurationMs: 1000,
maxItemsPerWindow: 10,
onWindowClose: (window) =>
Effect.log(`[WINDOW CLOSED] ${window.id}: ${window.items.length} items`),
});Manage per-key state with automatic cleanup:
interface GroupState<V> {
value: V;
lastUpdated: Date;
accessCount: number;
}
const createGroupedAggregator = <K, V>(config: {
initialValue: V;
update: (current: V, newItem: unknown) => V;
ttlMs: number;
maxGroups: number;
onEvict: (key: K, value: V) => Effect.Effect<void>;
}) =>
Effect.gen(function* () {
const groups = yield* Ref.make<Map<K, GroupState<V>>>(new Map());
const updateGroup = (key: K, item: unknown) =>
Effect.gen(function* () {
const current = yield* Ref.get(groups);
if (!current.has(key)) {
// New group
if (current.size >= config.maxGroups) {
// Evict oldest group
const [oldestKey, oldest] = Array.from(current.entries()).reduce(
(a, b) =>
a[1].lastUpdated < b[1].lastUpdated ? a : b
);
yield* config.onEvict(oldestKey, oldest.value);
const updated = new Map(current);
updated.delete(oldestKey);
yield* Ref.set(groups, updated);
}
// Add new group
yield* Ref.modify(groups, (g) => [
undefined,
new Map(g).set(key, {
value: config.initialValue,
lastUpdated: new Date(),
accessCount: 1,
}),
]);
} else {
// Update existing group
const group = current.get(key)!;
yield* Ref.modify(groups, (g) => [
undefined,
new Map(g).set(key, {
value: config.update(group.value, item),
lastUpdated: new Date(),
accessCount: group.accessCount + 1,
}),
]);
}
});
const cleanup = Effect.gen(function* () {
const now = Date.now();
const current = yield* Ref.get(groups);
const expired = Array.from(current.entries()).filter(
([_, state]) => now - state.lastUpdated.getTime() > config.ttlMs
);
for (const [key, state] of expired) {
yield* config.onEvict(key, state.value);
current.delete(key);
}
});
return { updateGroup, cleanup, groups };
});Integrate with scheduling for time windows:
const createTimeWindowedStream = <A, B>(
stream: Stream.Stream<A>,
config: {
windowSize: Duration.Duration;
compute: (items: A[]) => Effect.Effect<B>;
}
) =>
Effect.gen(function* () {
const buffer = yield* Ref.make<A[]>([]);
const windowTick = Effect.gen(function* () {
const items = yield* Ref.get(buffer);
if (items.length > 0) {
const result = yield* config.compute(items);
yield* Effect.log(`[WINDOW] Computed result: ${JSON.stringify(result)}`);
yield* Ref.set(buffer, []);
}
}).pipe(
Effect.repeat(
Schedule.fixed(config.windowSize)
)
);
const consume = stream.pipe(
Stream.tap((item) =>
Ref.modify(buffer, (items) => [undefined, [...items, item]])
)
);
return [consume, windowTick] as const;
});| Pattern | Memory | CPU | Latency | Use Case |
|---|---|---|---|---|
| Tumbling | Low | Low | Mid | Batching, reports |
| Sliding | High | Medium | Low | Real-time metrics |
| Group by | Medium | Medium | Low | Per-entity aggregation |
| Session | Medium | Medium | High | User session tracking |
| Top-K | Low | High | Medium | Leaderboards |
✅ Use tumbling windows when:
- Batching for efficiency
- End-of-period reports
- Memory matters
- Natural period (per hour, day)
✅ Use sliding windows when:
- Real-time monitoring
- Detect trends
- Latency-sensitive
- Memory available
✅ Use grouping when:
- Per-entity aggregation
- Multi-tenant isolation
- User analytics
- Sharded processing
- Sliding windows use more memory
- Group state management complexity
- Cleanup/eviction policies matter
- Tuning window sizes critical
| Metric | Too Small | Too Large |
|---|---|---|
| Latency | Constant churn | Long delays |
| Accuracy | Noisy results | Stale aggregations |
| Memory | More windows | Larger windows |
| CPU | More computations | Fewer computations |
Recommendation: Start with window size = desired reporting interval
- Stream Pattern 4: Stateful Operations - Fold/scan basis
- Stream Pattern 3: Backpressure Control - Buffering strategies
- Scheduling Pattern 1: Repeat on Interval - Time-based triggers
- Concurrency Pattern 3: Coordinate with Latch - Multi-event coordination