Skip to content

Latest commit

 

History

History
519 lines (403 loc) · 14.2 KB

File metadata and controls

519 lines (403 loc) · 14.2 KB
title Stream Pattern 5: Grouping and Windowing Streams
id stream-pattern-grouping-windowing
skillLevel advanced
applicationPatternId streams
summary Use grouping and windowing to organize streams by key or time window, enabling batch operations and temporal aggregations.
tags
streams
grouping
windowing
temporal-aggregation
batching
time-based-operations
rule
description
Use groupBy to partition streams by key and tumbling/sliding windows to aggregate streams over time windows.
related
stream-pattern-stateful-operations
stream-pattern-merge-combine
scheduling-pattern-repeat-interval
author effect_website
lessonOrder 1

Guideline

Windowing organizes unbounded streams into bounded chunks:

  • Tumbling window: Fixed-size non-overlapping (e.g., 1-sec windows)
  • Sliding window: Overlapping windows (e.g., 10-sec window, 5-sec hop)
  • Group by key: Partition stream by field value
  • Session window: Event-based windows (e.g., idle timeout)
  • Batch aggregation: Process N items or wait T seconds

Pattern: Stream.groupBy(), custom windowing with Ref and Schedule


Rationale

Unbounded streams need boundaries:

Problem 1: Memory exhaustion

  • Processing 1M events with no boundary = keep all in memory
  • Cumulative memory grows unbounded
  • Eventually OOM error

Problem 2: Late aggregation

  • Can't sum all events until stream ends (never)
  • Need to decide: "sum events in this 1-second window"

Problem 3: Grouping complexity

  • Stream of user events: need per-user aggregation
  • Without groupBy: manual state tracking (error-prone)

Problem 4: Temporal patterns

  • "Top 10 searches in last 5 minutes" requires windowing
  • "Average response time per endpoint per minute" requires grouping + windowing

Solutions:

Tumbling window:

  • Divide stream into 1-sec, 5-sec, or 1-min chunks
  • Process each chunk independently
  • Clear memory between windows
  • Natural for: metrics, batching, reports

Sliding window:

  • Keep last 5 minutes of data at all times
  • Emit updated aggregation every second
  • Detect patterns over overlapping periods
  • Natural for: anomaly detection, trends

Group by:

  • Separate streams by key
  • Each key has independent state
  • Emit grouped results
  • Natural for: per-user, per-endpoint, per-tenant

Good Example

This example demonstrates windowing and grouping patterns.

import { Effect, Stream, Ref, Duration, Schedule } from "effect";

interface Event {
  readonly timestamp: Date;
  readonly userId: string;
  readonly action: string;
  readonly duration: number; // milliseconds
}

// Simulate event stream
const generateEvents = (): Event[] => [
  { timestamp: new Date(Date.now() - 5000), userId: "user1", action: "click", duration: 100 },
  { timestamp: new Date(Date.now() - 4500), userId: "user2", action: "view", duration: 250 },
  { timestamp: new Date(Date.now() - 4000), userId: "user1", action: "scroll", duration: 150 },
  { timestamp: new Date(Date.now() - 3500), userId: "user3", action: "click", duration: 120 },
  { timestamp: new Date(Date.now() - 3000), userId: "user2", action: "click", duration: 180 },
  { timestamp: new Date(Date.now() - 2500), userId: "user1", action: "view", duration: 200 },
  { timestamp: new Date(Date.now() - 2000), userId: "user3", action: "view", duration: 300 },
  { timestamp: new Date(Date.now() - 1500), userId: "user1", action: "submit", duration: 500 },
  { timestamp: new Date(Date.now() - 1000), userId: "user2", action: "scroll", duration: 100 },
];

// Main: windowing and grouping examples
const program = Effect.gen(function* () {
  console.log(`\n[WINDOWING & GROUPING] Stream organization patterns\n`);

  const events = generateEvents();

  // Example 1: Tumbling window (fixed-size batches)
  console.log(`[1] Tumbling window (2-event batches):\n`);

  const windowSize = 2;
  let batchNumber = 1;

  for (let i = 0; i < events.length; i += windowSize) {
    const batch = events.slice(i, i + windowSize);

    yield* Effect.log(`[WINDOW ${batchNumber}] (${batch.length} events)`);

    let totalDuration = 0;

    for (const event of batch) {
      yield* Effect.log(
        `  - ${event.userId}: ${event.action} (${event.duration}ms)`
      );

      totalDuration += event.duration;
    }

    yield* Effect.log(`[WINDOW ${batchNumber}] Total duration: ${totalDuration}ms\n`);

    batchNumber++;
  }

  // Example 2: Sliding window (overlapping)
  console.log(`[2] Sliding window (last 3 events, slide by 1):\n`);

  const windowSizeSlide = 3;
  const slideBy = 1;

  for (let i = 0; i <= events.length - windowSizeSlide; i += slideBy) {
    const window = events.slice(i, i + windowSizeSlide);

    const avgDuration =
      window.reduce((sum, e) => sum + e.duration, 0) / window.length;

    yield* Effect.log(
      `[SLIDE ${i / slideBy}] ${window.length} events, avg duration: ${avgDuration.toFixed(0)}ms`
    );
  }

  // Example 3: Group by key
  console.log(`\n[3] Group by user:\n`);

  const byUser = new Map<string, Event[]>();

  for (const event of events) {
    if (!byUser.has(event.userId)) {
      byUser.set(event.userId, []);
    }

    byUser.get(event.userId)!.push(event);
  }

  for (const [userId, userEvents] of byUser) {
    const totalActions = userEvents.length;
    const totalTime = userEvents.reduce((sum, e) => sum + e.duration, 0);
    const avgTime = totalTime / totalActions;

    yield* Effect.log(
      `[USER ${userId}] ${totalActions} actions, ${totalTime}ms total, ${avgTime.toFixed(0)}ms avg`
    );
  }

  // Example 4: Group + Window combination
  console.log(`\n[4] Group by user, window by action type:\n`);

  for (const [userId, userEvents] of byUser) {
    const byAction = new Map<string, Event[]>();

    for (const event of userEvents) {
      if (!byAction.has(event.action)) {
        byAction.set(event.action, []);
      }

      byAction.get(event.action)!.push(event);
    }

    yield* Effect.log(`[USER ${userId}] Action breakdown:`);

    for (const [action, actionEvents] of byAction) {
      const count = actionEvents.length;
      const total = actionEvents.reduce((sum, e) => sum + e.duration, 0);

      yield* Effect.log(`  ${action}: ${count}x (${total}ms total)`);
    }
  }

  // Example 5: Session window (based on inactivity timeout)
  console.log(`\n[5] Session window (gap > 1000ms = new session):\n`);

  const sessionGapMs = 1000;
  const sessions: Event[][] = [];
  let currentSession: Event[] = [];
  let lastTimestamp = events[0]?.timestamp.getTime() ?? 0;

  for (const event of events) {
    const currentTime = event.timestamp.getTime();
    const timeSinceLastEvent = currentTime - lastTimestamp;

    if (timeSinceLastEvent > sessionGapMs && currentSession.length > 0) {
      sessions.push(currentSession);
      yield* Effect.log(
        `[SESSION] Closed (${currentSession.length} events, gap: ${timeSinceLastEvent}ms)`
      );

      currentSession = [];
    }

    currentSession.push(event);
    lastTimestamp = currentTime;
  }

  if (currentSession.length > 0) {
    sessions.push(currentSession);
    yield* Effect.log(`[SESSION] Final (${currentSession.length} events)`);
  }

  // Example 6: Top-K aggregation in window
  console.log(`\n[6] Top 2 actions in last window:\n`);

  const lastWindow = events.slice(-3);

  const actionCounts = new Map<string, number>();

  for (const event of lastWindow) {
    actionCounts.set(
      event.action,
      (actionCounts.get(event.action) ?? 0) + 1
    );
  }

  const topActions = Array.from(actionCounts.entries())
    .sort((a, b) => b[1] - a[1])
    .slice(0, 2);

  yield* Effect.log(`[TOP-K] In last window of 3 events:`);

  for (const [action, count] of topActions) {
    yield* Effect.log(`  ${action}: ${count}x`);
  }
});

Effect.runPromise(program);

Advanced: Custom Windowing with State

Build sophisticated windows using Ref:

interface Window<A> {
  id: string;
  items: A[];
  openTime: Date;
  closeTime: Date;
}

const createWindowManager = <A,>(config: {
  windowDurationMs: number;
  maxItemsPerWindow: number;
  onWindowClose: (window: Window<A>) => Effect.Effect<void>;
}) =>
  Effect.gen(function* () {
    const currentWindow = yield* Ref.make<Window<A>>({
      id: `window-${Date.now()}`,
      items: [],
      openTime: new Date(),
      closeTime: new Date(Date.now() + config.windowDurationMs),
    });

    const addItem = (item: A) =>
      Effect.gen(function* () {
        const window = yield* Ref.get(currentWindow);

        if (
          window.items.length >= config.maxItemsPerWindow ||
          Date.now() >= window.closeTime.getTime()
        ) {
          // Close current window and open new one
          yield* config.onWindowClose(window);

          yield* Ref.set(currentWindow, {
            id: `window-${Date.now()}`,
            items: [item],
            openTime: new Date(),
            closeTime: new Date(Date.now() + config.windowDurationMs),
          });
        } else {
          // Add to current window
          yield* Ref.modify(currentWindow, (w) => [
            undefined,
            { ...w, items: [...w.items, item] },
          ]);
        }
      });

    return { addItem, currentWindow };
  });

// Usage
const windowManager = createWindowManager({
  windowDurationMs: 1000,
  maxItemsPerWindow: 10,
  onWindowClose: (window) =>
    Effect.log(`[WINDOW CLOSED] ${window.id}: ${window.items.length} items`),
});

Advanced: Stateful Grouping with Cleanup

Manage per-key state with automatic cleanup:

interface GroupState<V> {
  value: V;
  lastUpdated: Date;
  accessCount: number;
}

const createGroupedAggregator = <K, V>(config: {
  initialValue: V;
  update: (current: V, newItem: unknown) => V;
  ttlMs: number;
  maxGroups: number;
  onEvict: (key: K, value: V) => Effect.Effect<void>;
}) =>
  Effect.gen(function* () {
    const groups = yield* Ref.make<Map<K, GroupState<V>>>(new Map());

    const updateGroup = (key: K, item: unknown) =>
      Effect.gen(function* () {
        const current = yield* Ref.get(groups);

        if (!current.has(key)) {
          // New group
          if (current.size >= config.maxGroups) {
            // Evict oldest group
            const [oldestKey, oldest] = Array.from(current.entries()).reduce(
              (a, b) =>
                a[1].lastUpdated < b[1].lastUpdated ? a : b
            );

            yield* config.onEvict(oldestKey, oldest.value);

            const updated = new Map(current);
            updated.delete(oldestKey);
            yield* Ref.set(groups, updated);
          }

          // Add new group
          yield* Ref.modify(groups, (g) => [
            undefined,
            new Map(g).set(key, {
              value: config.initialValue,
              lastUpdated: new Date(),
              accessCount: 1,
            }),
          ]);
        } else {
          // Update existing group
          const group = current.get(key)!;

          yield* Ref.modify(groups, (g) => [
            undefined,
            new Map(g).set(key, {
              value: config.update(group.value, item),
              lastUpdated: new Date(),
              accessCount: group.accessCount + 1,
            }),
          ]);
        }
      });

    const cleanup = Effect.gen(function* () {
      const now = Date.now();
      const current = yield* Ref.get(groups);

      const expired = Array.from(current.entries()).filter(
        ([_, state]) => now - state.lastUpdated.getTime() > config.ttlMs
      );

      for (const [key, state] of expired) {
        yield* config.onEvict(key, state.value);
        current.delete(key);
      }
    });

    return { updateGroup, cleanup, groups };
  });

Advanced: Time-Based Window Integration

Integrate with scheduling for time windows:

const createTimeWindowedStream = <A, B>(
  stream: Stream.Stream<A>,
  config: {
    windowSize: Duration.Duration;
    compute: (items: A[]) => Effect.Effect<B>;
  }
) =>
  Effect.gen(function* () {
    const buffer = yield* Ref.make<A[]>([]);

    const windowTick = Effect.gen(function* () {
      const items = yield* Ref.get(buffer);

      if (items.length > 0) {
        const result = yield* config.compute(items);

        yield* Effect.log(`[WINDOW] Computed result: ${JSON.stringify(result)}`);

        yield* Ref.set(buffer, []);
      }
    }).pipe(
      Effect.repeat(
        Schedule.fixed(config.windowSize)
      )
    );

    const consume = stream.pipe(
      Stream.tap((item) =>
        Ref.modify(buffer, (items) => [undefined, [...items, item]])
      )
    );

    return [consume, windowTick] as const;
  });

Performance Considerations

Pattern Memory CPU Latency Use Case
Tumbling Low Low Mid Batching, reports
Sliding High Medium Low Real-time metrics
Group by Medium Medium Low Per-entity aggregation
Session Medium Medium High User session tracking
Top-K Low High Medium Leaderboards

When to Use This Pattern

Use tumbling windows when:

  • Batching for efficiency
  • End-of-period reports
  • Memory matters
  • Natural period (per hour, day)

Use sliding windows when:

  • Real-time monitoring
  • Detect trends
  • Latency-sensitive
  • Memory available

Use grouping when:

  • Per-entity aggregation
  • Multi-tenant isolation
  • User analytics
  • Sharded processing

⚠️ Trade-offs:

  • Sliding windows use more memory
  • Group state management complexity
  • Cleanup/eviction policies matter
  • Tuning window sizes critical

Window Size Tuning

Metric Too Small Too Large
Latency Constant churn Long delays
Accuracy Noisy results Stale aggregations
Memory More windows Larger windows
CPU More computations Fewer computations

Recommendation: Start with window size = desired reporting interval


See Also