Skip to content

Latest commit

 

History

History
428 lines (347 loc) · 11.4 KB

File metadata and controls

428 lines (347 loc) · 11.4 KB
title Stream Pattern 2: Merge and Combine Multiple Streams
id stream-pattern-merge-combine
skillLevel intermediate
applicationPatternId streams
summary Use Stream.merge, Stream.concat, and Stream.mergeAll to combine multiple streams into a single stream, enabling multi-source data aggregation.
tags
streams
composition
merge
concat
combination
multi-source
rule
description
Use merge and concat combinators to combine multiple streams, enabling aggregation of data from multiple independent sources.
related
stream-pattern-map-filter-transformations
process-streaming-data-with-stream
stream-from-iterable
author effect_website
lessonOrder 7

Guideline

Combine multiple streams using:

  • merge: Interleave elements from multiple streams (unordered)
  • concat: Chain streams sequentially (ordered, waits for first to complete)
  • mergeAll: Merge collection of streams
  • zip: Combine corresponding elements from multiple streams

Pattern: Stream.merge(stream1, stream2) or stream1.pipe(Stream.concat(stream2))


Rationale

Multi-source data processing without merge/concat creates issues:

  • Complex coordination: Manual loop over multiple sources
  • Hard to aggregate: Collecting from different sources is verbose
  • Ordering confusion: Sequential vs. parallel unclear
  • Resource management: Multiple independent consumers

Merge/concat enable:

  • Simple composition: Combine streams naturally
  • Semantic clarity: Merge = parallel, concat = sequential
  • Aggregation: Single consumer for multiple sources
  • Scalability: Add sources without refactoring

Real-world example: Aggregating user events

  • Without merge: Poll user service, poll event log, poll notifications separately
  • With merge: Stream.merge(userStream, eventStream, notificationStream)

Good Example

This example demonstrates merging multiple event streams into a unified stream.

import { Stream, Effect, Chunk } from "effect";

interface Event {
  readonly source: string;
  readonly type: string;
  readonly data: string;
  readonly timestamp: Date;
}

// Create independent event streams from different sources
const createUserEventStream = (): Stream.Stream<Event> =>
  Stream.fromIterable([
    { source: "user-service", type: "login", data: "user-123", timestamp: new Date(Date.now() + 0) },
    { source: "user-service", type: "logout", data: "user-123", timestamp: new Date(Date.now() + 500) },
  ]).pipe(
    Stream.tap(() => Effect.sleep("500 millis"))
  );

const createPaymentEventStream = (): Stream.Stream<Event> =>
  Stream.fromIterable([
    { source: "payment-service", type: "payment-started", data: "order-456", timestamp: new Date(Date.now() + 200) },
    { source: "payment-service", type: "payment-completed", data: "order-456", timestamp: new Date(Date.now() + 800) },
  ]).pipe(
    Stream.tap(() => Effect.sleep("600 millis"))
  );

const createAuditEventStream = (): Stream.Stream<Event> =>
  Stream.fromIterable([
    { source: "audit-log", type: "access-granted", data: "resource-789", timestamp: new Date(Date.now() + 100) },
    { source: "audit-log", type: "access-revoked", data: "resource-789", timestamp: new Date(Date.now() + 900) },
  ]).pipe(
    Stream.tap(() => Effect.sleep("800 millis"))
  );

// Merge streams (interleaved, unordered)
const mergedEventStream = (): Stream.Stream<Event> => {
  const userStream = createUserEventStream();
  const paymentStream = createPaymentEventStream();
  const auditStream = createAuditEventStream();

  return Stream.merge(userStream, paymentStream, auditStream);
};

// Concat streams (sequential, ordered)
const concatenatedEventStream = (): Stream.Stream<Event> => {
  return createUserEventStream().pipe(
    Stream.concat(createPaymentEventStream()),
    Stream.concat(createAuditEventStream())
  );
};

// Main: Compare merge vs concat
const program = Effect.gen(function* () {
  console.log(`\n[MERGE] Interleaved events from multiple sources:\n`);

  // Collect merged stream
  const mergedEvents = yield* mergedEventStream().pipe(
    Stream.runCollect
  );

  Chunk.forEach(mergedEvents, (event, idx) => {
    console.log(
      `  ${idx + 1}. [${event.source}] ${event.type}: ${event.data}`
    );
  });

  console.log(`\n[CONCAT] Sequential events (user → payment → audit):\n`);

  // Collect concatenated stream
  const concatEvents = yield* concatenatedEventStream().pipe(
    Stream.runCollect
  );

  Chunk.forEach(concatEvents, (event, idx) => {
    console.log(
      `  ${idx + 1}. [${event.source}] ${event.type}: ${event.data}`
    );
  });
});

Effect.runPromise(program);

Output shows merge interleaving vs concat ordering:

[MERGE] Interleaved events from multiple sources:

  1. [audit-log] access-granted: resource-789
  2. [user-service] login: user-123
  3. [payment-service] payment-started: order-456
  4. [user-service] logout: user-123
  5. [payment-service] payment-completed: order-456
  6. [audit-log] access-revoked: resource-789

[CONCAT] Sequential events (user → payment → audit):

  1. [user-service] login: user-123
  2. [user-service] logout: user-123
  3. [payment-service] payment-started: order-456
  4. [payment-service] payment-completed: order-456
  5. [audit-log] access-granted: resource-789
  6. [audit-log] access-revoked: resource-789

Advanced: Merging with Prioritization

Handle priority streams differently:

interface PrioritizedEvent extends Event {
  readonly priority: number; // 1=low, 5=high
}

const priorityMergeStream = (
  highPriorityStream: Stream.Stream<PrioritizedEvent>,
  normalPriorityStream: Stream.Stream<PrioritizedEvent>
): Stream.Stream<PrioritizedEvent> =>
  Stream.mergeAll([
    // Take high priority first, then normal
    highPriorityStream,
    normalPriorityStream,
  ]).pipe(
    // Sort by priority (though stream nature means some interleaving)
    Stream.tap((event) =>
      Effect.log(
        `Event [priority=${event.priority}] ${event.type}`
      )
    )
  );

Advanced: Zip/Combine Corresponding Elements

Wait for matching elements from multiple streams:

interface RequestEvent {
  readonly requestId: string;
  readonly timestamp: Date;
}

interface ResponseEvent {
  readonly requestId: string;
  readonly duration: number;
  readonly timestamp: Date;
}

interface RequestResponse {
  readonly requestId: string;
  readonly duration: number;
}

const zipRequestResponse = (
  requestStream: Stream.Stream<RequestEvent>,
  responseStream: Stream.Stream<ResponseEvent>
): Stream.Stream<RequestResponse> =>
  requestStream.pipe(
    Stream.zip(responseStream),
    Stream.map(([request, response]) => ({
      requestId: request.requestId,
      duration: response.duration,
    }))
  );

// Usage: Correlate requests with responses
const correlatedStream = zipRequestResponse(
  Stream.fromIterable([
    { requestId: "req-1", timestamp: new Date() },
    { requestId: "req-2", timestamp: new Date() },
  ]),
  Stream.fromIterable([
    { requestId: "req-1", duration: 100, timestamp: new Date() },
    { requestId: "req-2", duration: 150, timestamp: new Date() },
  ])
).pipe(
  Stream.tap((item) =>
    Effect.log(
      `Request ${item.requestId} took ${item.duration}ms`
    )
  ),
  Stream.runDrain
);

Advanced: Merge with Error Handling

Handle errors from merged streams:

const mergeWithErrorHandling = <A>(
  streams: Stream.Stream<A>[]
): Stream.Stream<
  { tag: "success"; value: A } | { tag: "error"; error: Error }
> =>
  Stream.mergeAll(
    streams.map((stream) =>
      stream.pipe(
        Stream.map((value) => ({ tag: "success" as const, value })),
        Stream.catchAll((error) =>
          Stream.succeed({
            tag: "error" as const,
            error: error as Error,
          })
        )
      )
    )
  );

// Usage
const resilientMerge = mergeWithErrorHandling([
  createUserEventStream(),
  createPaymentEventStream(),
  createAuditEventStream(),
]).pipe(
  Stream.tap((item) => {
    if (item.tag === "error") {
      return Effect.log(`Error from stream: ${item.error.message}`);
    } else {
      return Effect.log(`Event: ${item.value.type}`);
    }
  }),
  Stream.runDrain
);

Advanced: Round-Robin Merge

Fairly interleave elements from multiple streams:

const roundRobinMerge = <A>(
  streams: Stream.Stream<A>[]
): Stream.Stream<A> => {
  const queues = streams.map(() => Queue.bounded<A>(1));

  // Forward each stream to its queue
  const forwarders = streams.map((stream, idx) =>
    stream.pipe(
      Stream.runForEach((item) => Queue.offer(queues[idx], item))
    )
  );

  // Consume from queues in round-robin fashion
  return Stream.fromEffect(
    Effect.gen(function* () {
      yield* Effect.all(forwarders.map((f) => Effect.fork(f)));

      let currentQueue = 0;

      return Stream.repeatEffect(
        Effect.gen(function* () {
          // Try queues in round-robin order
          for (let i = 0; i < queues.length; i++) {
            const queueIdx = (currentQueue + i) % queues.length;
            const item = yield* Queue.poll(queues[queueIdx]);

            if (item._tag === "Some") {
              currentQueue = queueIdx;
              return item.value;
            }
          }

          // Wait and retry
          yield* Effect.sleep("10 millis");
          return yield* roundRobinMerge(streams).pipe(
            Stream.take(1),
            Stream.runCollect
          );
        })
      );
    })
  ).pipe(Stream.flatten);
};

Advanced: Merge with Deduplication

Remove duplicate events from merged streams:

const mergeDeduped = <A extends { id: string }>(
  streams: Stream.Stream<A>[]
): Stream.Stream<A> => {
  const seen = new Set<string>();

  return Stream.mergeAll(streams).pipe(
    Stream.filter((item) => {
      if (seen.has(item.id)) {
        return false; // Duplicate, filter out
      }
      seen.add(item.id);
      return true;
    })
  );
};

// Usage: Merge event streams, remove duplicates
const dedupedEvents = mergeDeduped([
  createUserEventStream(),
  createPaymentEventStream(),
  createAuditEventStream(),
]).pipe(
  Stream.tap((event) =>
    Effect.log(`Unique event: ${event.type}`)
  ),
  Stream.runDrain
);

When to Use This Pattern

Use merge/concat when:

  • Combining multiple independent streams
  • Aggregating from different sources
  • Multi-source data pipelines
  • Event correlation and joining
  • Fan-in patterns (many→one)

Merge when: Parallel, interleaved order acceptable

Concat when: Sequential, specific order needed

⚠️ Trade-offs:

  • Merge overhead grows with stream count
  • Can't resume individual failed streams
  • Backpressure affects all sources
  • Ordering unclear with merge

Merge vs Concat vs Zip

Operator Behavior Use Case
merge Parallel, interleaved Fan-in, parallel sources
concat Sequential, waits for first Chaining ordered sequences
zip Wait for corresponding elements Correlating matched pairs
mergeAll Merge collection of streams Dynamic number of sources

See Also