| title | Stream Pattern 2: Merge and Combine Multiple Streams | ||||||
|---|---|---|---|---|---|---|---|
| id | stream-pattern-merge-combine | ||||||
| skillLevel | intermediate | ||||||
| applicationPatternId | streams | ||||||
| summary | Use Stream.merge, Stream.concat, and Stream.mergeAll to combine multiple streams into a single stream, enabling multi-source data aggregation. | ||||||
| tags |
|
||||||
| rule |
|
||||||
| related |
|
||||||
| author | effect_website | ||||||
| lessonOrder | 7 |
Combine multiple streams using:
- merge: Interleave elements from multiple streams (unordered)
- concat: Chain streams sequentially (ordered, waits for first to complete)
- mergeAll: Merge collection of streams
- zip: Combine corresponding elements from multiple streams
Pattern: Stream.merge(stream1, stream2) or stream1.pipe(Stream.concat(stream2))
Multi-source data processing without merge/concat creates issues:
- Complex coordination: Manual loop over multiple sources
- Hard to aggregate: Collecting from different sources is verbose
- Ordering confusion: Sequential vs. parallel unclear
- Resource management: Multiple independent consumers
Merge/concat enable:
- Simple composition: Combine streams naturally
- Semantic clarity: Merge = parallel, concat = sequential
- Aggregation: Single consumer for multiple sources
- Scalability: Add sources without refactoring
Real-world example: Aggregating user events
- Without merge: Poll user service, poll event log, poll notifications separately
- With merge:
Stream.merge(userStream, eventStream, notificationStream)
This example demonstrates merging multiple event streams into a unified stream.
import { Stream, Effect, Chunk } from "effect";
interface Event {
readonly source: string;
readonly type: string;
readonly data: string;
readonly timestamp: Date;
}
// Create independent event streams from different sources
const createUserEventStream = (): Stream.Stream<Event> =>
Stream.fromIterable([
{ source: "user-service", type: "login", data: "user-123", timestamp: new Date(Date.now() + 0) },
{ source: "user-service", type: "logout", data: "user-123", timestamp: new Date(Date.now() + 500) },
]).pipe(
Stream.tap(() => Effect.sleep("500 millis"))
);
const createPaymentEventStream = (): Stream.Stream<Event> =>
Stream.fromIterable([
{ source: "payment-service", type: "payment-started", data: "order-456", timestamp: new Date(Date.now() + 200) },
{ source: "payment-service", type: "payment-completed", data: "order-456", timestamp: new Date(Date.now() + 800) },
]).pipe(
Stream.tap(() => Effect.sleep("600 millis"))
);
const createAuditEventStream = (): Stream.Stream<Event> =>
Stream.fromIterable([
{ source: "audit-log", type: "access-granted", data: "resource-789", timestamp: new Date(Date.now() + 100) },
{ source: "audit-log", type: "access-revoked", data: "resource-789", timestamp: new Date(Date.now() + 900) },
]).pipe(
Stream.tap(() => Effect.sleep("800 millis"))
);
// Merge streams (interleaved, unordered)
const mergedEventStream = (): Stream.Stream<Event> => {
const userStream = createUserEventStream();
const paymentStream = createPaymentEventStream();
const auditStream = createAuditEventStream();
return Stream.merge(userStream, paymentStream, auditStream);
};
// Concat streams (sequential, ordered)
const concatenatedEventStream = (): Stream.Stream<Event> => {
return createUserEventStream().pipe(
Stream.concat(createPaymentEventStream()),
Stream.concat(createAuditEventStream())
);
};
// Main: Compare merge vs concat
const program = Effect.gen(function* () {
console.log(`\n[MERGE] Interleaved events from multiple sources:\n`);
// Collect merged stream
const mergedEvents = yield* mergedEventStream().pipe(
Stream.runCollect
);
Chunk.forEach(mergedEvents, (event, idx) => {
console.log(
` ${idx + 1}. [${event.source}] ${event.type}: ${event.data}`
);
});
console.log(`\n[CONCAT] Sequential events (user → payment → audit):\n`);
// Collect concatenated stream
const concatEvents = yield* concatenatedEventStream().pipe(
Stream.runCollect
);
Chunk.forEach(concatEvents, (event, idx) => {
console.log(
` ${idx + 1}. [${event.source}] ${event.type}: ${event.data}`
);
});
});
Effect.runPromise(program);Output shows merge interleaving vs concat ordering:
[MERGE] Interleaved events from multiple sources:
1. [audit-log] access-granted: resource-789
2. [user-service] login: user-123
3. [payment-service] payment-started: order-456
4. [user-service] logout: user-123
5. [payment-service] payment-completed: order-456
6. [audit-log] access-revoked: resource-789
[CONCAT] Sequential events (user → payment → audit):
1. [user-service] login: user-123
2. [user-service] logout: user-123
3. [payment-service] payment-started: order-456
4. [payment-service] payment-completed: order-456
5. [audit-log] access-granted: resource-789
6. [audit-log] access-revoked: resource-789
Handle priority streams differently:
interface PrioritizedEvent extends Event {
readonly priority: number; // 1=low, 5=high
}
const priorityMergeStream = (
highPriorityStream: Stream.Stream<PrioritizedEvent>,
normalPriorityStream: Stream.Stream<PrioritizedEvent>
): Stream.Stream<PrioritizedEvent> =>
Stream.mergeAll([
// Take high priority first, then normal
highPriorityStream,
normalPriorityStream,
]).pipe(
// Sort by priority (though stream nature means some interleaving)
Stream.tap((event) =>
Effect.log(
`Event [priority=${event.priority}] ${event.type}`
)
)
);Wait for matching elements from multiple streams:
interface RequestEvent {
readonly requestId: string;
readonly timestamp: Date;
}
interface ResponseEvent {
readonly requestId: string;
readonly duration: number;
readonly timestamp: Date;
}
interface RequestResponse {
readonly requestId: string;
readonly duration: number;
}
const zipRequestResponse = (
requestStream: Stream.Stream<RequestEvent>,
responseStream: Stream.Stream<ResponseEvent>
): Stream.Stream<RequestResponse> =>
requestStream.pipe(
Stream.zip(responseStream),
Stream.map(([request, response]) => ({
requestId: request.requestId,
duration: response.duration,
}))
);
// Usage: Correlate requests with responses
const correlatedStream = zipRequestResponse(
Stream.fromIterable([
{ requestId: "req-1", timestamp: new Date() },
{ requestId: "req-2", timestamp: new Date() },
]),
Stream.fromIterable([
{ requestId: "req-1", duration: 100, timestamp: new Date() },
{ requestId: "req-2", duration: 150, timestamp: new Date() },
])
).pipe(
Stream.tap((item) =>
Effect.log(
`Request ${item.requestId} took ${item.duration}ms`
)
),
Stream.runDrain
);Handle errors from merged streams:
const mergeWithErrorHandling = <A>(
streams: Stream.Stream<A>[]
): Stream.Stream<
{ tag: "success"; value: A } | { tag: "error"; error: Error }
> =>
Stream.mergeAll(
streams.map((stream) =>
stream.pipe(
Stream.map((value) => ({ tag: "success" as const, value })),
Stream.catchAll((error) =>
Stream.succeed({
tag: "error" as const,
error: error as Error,
})
)
)
)
);
// Usage
const resilientMerge = mergeWithErrorHandling([
createUserEventStream(),
createPaymentEventStream(),
createAuditEventStream(),
]).pipe(
Stream.tap((item) => {
if (item.tag === "error") {
return Effect.log(`Error from stream: ${item.error.message}`);
} else {
return Effect.log(`Event: ${item.value.type}`);
}
}),
Stream.runDrain
);Fairly interleave elements from multiple streams:
const roundRobinMerge = <A>(
streams: Stream.Stream<A>[]
): Stream.Stream<A> => {
const queues = streams.map(() => Queue.bounded<A>(1));
// Forward each stream to its queue
const forwarders = streams.map((stream, idx) =>
stream.pipe(
Stream.runForEach((item) => Queue.offer(queues[idx], item))
)
);
// Consume from queues in round-robin fashion
return Stream.fromEffect(
Effect.gen(function* () {
yield* Effect.all(forwarders.map((f) => Effect.fork(f)));
let currentQueue = 0;
return Stream.repeatEffect(
Effect.gen(function* () {
// Try queues in round-robin order
for (let i = 0; i < queues.length; i++) {
const queueIdx = (currentQueue + i) % queues.length;
const item = yield* Queue.poll(queues[queueIdx]);
if (item._tag === "Some") {
currentQueue = queueIdx;
return item.value;
}
}
// Wait and retry
yield* Effect.sleep("10 millis");
return yield* roundRobinMerge(streams).pipe(
Stream.take(1),
Stream.runCollect
);
})
);
})
).pipe(Stream.flatten);
};Remove duplicate events from merged streams:
const mergeDeduped = <A extends { id: string }>(
streams: Stream.Stream<A>[]
): Stream.Stream<A> => {
const seen = new Set<string>();
return Stream.mergeAll(streams).pipe(
Stream.filter((item) => {
if (seen.has(item.id)) {
return false; // Duplicate, filter out
}
seen.add(item.id);
return true;
})
);
};
// Usage: Merge event streams, remove duplicates
const dedupedEvents = mergeDeduped([
createUserEventStream(),
createPaymentEventStream(),
createAuditEventStream(),
]).pipe(
Stream.tap((event) =>
Effect.log(`Unique event: ${event.type}`)
),
Stream.runDrain
);✅ Use merge/concat when:
- Combining multiple independent streams
- Aggregating from different sources
- Multi-source data pipelines
- Event correlation and joining
- Fan-in patterns (many→one)
✅ Merge when: Parallel, interleaved order acceptable
✅ Concat when: Sequential, specific order needed
- Merge overhead grows with stream count
- Can't resume individual failed streams
- Backpressure affects all sources
- Ordering unclear with merge
| Operator | Behavior | Use Case |
|---|---|---|
| merge | Parallel, interleaved | Fan-in, parallel sources |
| concat | Sequential, waits for first | Chaining ordered sequences |
| zip | Wait for corresponding elements | Correlating matched pairs |
| mergeAll | Merge collection of streams | Dynamic number of sources |
- Stream Pattern 1: Map & Filter Transformations - Stream transformations
- Process Streaming Data with Stream - Stream basics
- Stream from Iterable - Creating streams
- Stream Collect Results - Collecting stream output