| title | Stream Pattern 6: Resource Management in Streams | ||||||
|---|---|---|---|---|---|---|---|
| id | stream-pattern-resource-management | ||||||
| skillLevel | advanced | ||||||
| applicationPatternId | streams | ||||||
| summary | Properly manage resources (connections, files, memory) in streams using acquire/release patterns and ensuring cleanup on error or completion. | ||||||
| tags |
|
||||||
| rule |
|
||||||
| related |
|
||||||
| author | effect_website | ||||||
| lessonOrder | 2 |
Streams must clean up resources deterministically:
- Acquire/Release: Get resource, use, return resource
- Bracket pattern: Ensure cleanup on success or failure
- Scope safety: Guarantee cleanup even on exceptions
- Connection pooling: Reuse connections, prevent exhaustion
- Concurrent cleanup: Handle cleanup under concurrency
Pattern: Stream.bracket(), Resource.make(), Scope for resource safety
Streams without resource management cause problems:
Problem 1: Resource exhaustion
- Open file streams without closing → file descriptor limit exceeded
- Get connections from pool, never return → connection starvation
- System becomes unresponsive
Problem 2: Memory leaks
- Stream emits large objects → memory grows
- Without cleanup → garbage persists
- GC can't reclaim
Problem 3: Data corruption
- Write to file without flush → partial writes on crash
- Read from connection while another thread writes → data race
- Results are unpredictable
Problem 4: Silent failures
- Resource cleanup fails → error lost
- Application proceeds as if successful
- Hidden bug becomes hard-to-trace crash later
Solutions:
Bracket pattern:
- Acquire resource
- Use resource (even if error)
- Always release resource
- Track errors separately
Resource scopes:
- Nested resource management
- Parent cleanup waits for children
- Hierarchical resource graphs
- Type-safe guarantees
Connection pooling:
- Reuse connections
- Track available/in-use
- Prevent exhaustion
- Support graceful shutdown
This example demonstrates resource acquisition, use, and guaranteed cleanup.
import { Effect, Stream, Resource, Scope, Ref } from "effect";
interface FileHandle {
readonly path: string;
readonly fd: number;
}
interface Connection {
readonly id: string;
readonly isOpen: boolean;
}
// Simulate resource management
const program = Effect.gen(function* () {
console.log(`\n[RESOURCE MANAGEMENT] Stream resource lifecycle\n`);
// Example 1: Bracket pattern for file streams
console.log(`[1] Bracket pattern (acquire → use → release):\n`);
let openHandles = 0;
let closedHandles = 0;
const openFile = (path: string) =>
Effect.gen(function* () {
openHandles++;
yield* Effect.log(`[OPEN] File "${path}" (total open: ${openHandles})`);
return { path, fd: 1000 + openHandles };
});
const closeFile = (handle: FileHandle) =>
Effect.gen(function* () {
closedHandles++;
yield* Effect.log(`[CLOSE] File "${handle.path}" (total closed: ${closedHandles})`);
});
const readFileWithBracket = (path: string) =>
Effect.gen(function* () {
let handle: FileHandle | null = null;
try {
handle = yield* openFile(path);
yield* Effect.log(
`[USE] Reading from fd ${handle.fd} ("${handle.path}")`
);
// Simulate reading
return "file contents";
} finally {
// Guaranteed to run even if error occurs above
if (handle) {
yield* closeFile(handle);
}
}
});
// Test with success
yield* Effect.log(`[TEST] Success case:`);
const content = yield* readFileWithBracket("/data/file.txt");
yield* Effect.log(`[RESULT] Got: "${content}"\n`);
// Test with failure (simulated)
yield* Effect.log(`[TEST] Error case:`);
const failCase = Effect.gen(function* () {
let handle: FileHandle | null = null;
try {
handle = yield* openFile("/data/missing.txt");
// Simulate error mid-operation
yield* Effect.fail(new Error("Read failed"));
} finally {
if (handle) {
yield* closeFile(handle);
}
}
}).pipe(
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[ERROR] Caught: ${error.message}`);
yield* Effect.log(`[CHECK] Closed handles: ${closedHandles} (verifying cleanup)\n`);
})
)
);
yield* failCase;
// Example 2: Connection pool management
console.log(`[2] Connection pooling:\n`);
interface ConnectionPool {
acquire: () => Effect.Effect<Connection>;
release: (conn: Connection) => Effect.Effect<void>;
}
const createConnectionPool = (maxSize: number): Effect.Effect<ConnectionPool> =>
Effect.gen(function* () {
const available = yield* Ref.make<Connection[]>([]);
const inUse = yield* Ref.make<Set<string>>(new Set());
let idCounter = 0;
return {
acquire: Effect.gen(function* () {
const avail = yield* Ref.get(available);
if (avail.length > 0) {
yield* Effect.log(`[POOL] Reusing connection from pool`);
const conn = avail.pop()!;
yield* Ref.modify(inUse, (set) => [
undefined,
new Set(set).add(conn.id),
]);
return conn;
}
const inUseCount = (yield* Ref.get(inUse)).size;
if (inUseCount >= maxSize) {
yield* Effect.fail(new Error("Pool exhausted"));
}
const connId = `conn-${++idCounter}`;
yield* Effect.log(`[POOL] Creating new connection: ${connId}`);
const conn = { id: connId, isOpen: true };
yield* Ref.modify(inUse, (set) => [
undefined,
new Set(set).add(connId),
]);
return conn;
}),
release: (conn: Connection) =>
Effect.gen(function* () {
yield* Ref.modify(inUse, (set) => {
const updated = new Set(set);
updated.delete(conn.id);
return [undefined, updated];
});
yield* Ref.modify(available, (avail) => [
undefined,
[...avail, conn],
]);
yield* Effect.log(`[POOL] Returned connection: ${conn.id}`);
}),
};
});
const pool = yield* createConnectionPool(3);
// Acquire and release connections
const conn1 = yield* pool.acquire();
const conn2 = yield* pool.acquire();
yield* pool.release(conn1);
const conn3 = yield* pool.acquire(); // Reuses conn1
yield* Effect.log(`\n`);
// Example 3: Scope-based resource safety
console.log(`[3] Scoped resources (hierarchical cleanup):\n`);
let scopedCount = 0;
const withScoped = <R,>(create: () => Effect.Effect<R>) =>
Effect.gen(function* () {
scopedCount++;
const id = scopedCount;
yield* Effect.log(`[SCOPE] Enter scope ${id}`);
const resource = yield* create();
yield* Effect.log(`[SCOPE] Using resource in scope ${id}`);
// Cleanup is guaranteed via ensuring
yield* Effect.unit.pipe(
Effect.ensuring(
Effect.log(`[SCOPE] Cleanup guaranteed for scope ${id}`)
)
);
return resource;
});
// Nested scopes
const result = yield* withScoped(() =>
Effect.gen(function* () {
const innerData = yield* withScoped(() => Effect.succeed("inner data"));
return { level: 1, data: innerData };
})
).pipe(
Effect.catchAll(() => Effect.succeed({ level: 0, data: null }))
);
yield* Effect.log(`[SCOPES] Cleanup order: inner → outer\n`);
// Example 4: Stream resource management
console.log(`[4] Stream with resource cleanup:\n`);
let streamResourceCount = 0;
// Simulate stream that acquires resources
const streamWithResources = Stream.empty.pipe(
Stream.tap(() =>
Effect.gen(function* () {
streamResourceCount++;
yield* Effect.log(`[STREAM-RES] Acquired resource ${streamResourceCount}`);
})
),
// Cleanup when stream ends
Stream.ensuring(
Effect.log(`[STREAM-RES] Cleaning up all ${streamResourceCount} resources`)
)
);
yield* Stream.runDrain(streamWithResources);
// Example 5: Error propagation with cleanup
console.log(`\n[5] Error safety with cleanup:\n`);
const safeRead = (retryCount: number) =>
Effect.gen(function* () {
let handle: FileHandle | null = null;
try {
handle = yield* openFile(`/data/file-${retryCount}.txt`);
if (retryCount < 2) {
yield* Effect.log(`[READ] Attempt ${retryCount}: failing intentionally`);
yield* Effect.fail(new Error(`Attempt ${retryCount} failed`));
}
yield* Effect.log(`[READ] Success on attempt ${retryCount}`);
return "success";
} finally {
if (handle) {
yield* closeFile(handle);
}
}
});
// Retry with guaranteed cleanup
const result2 = yield* safeRead(1).pipe(
Effect.retry(
Schedule.recurs(2).pipe(
Schedule.compose(Schedule.fixed("10 millis"))
)
),
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[FINAL] All retries failed: ${error.message}`);
return "fallback";
})
)
);
yield* Effect.log(`\n[FINAL] Result: ${result2}`);
});
Effect.runPromise(program);Build safe resource APIs:
// Safe resource pattern using Effect.acquire
const withDatabaseConnection = <R,>(
operation: (conn: Connection) => Effect.Effect<R>
): Effect.Effect<R> =>
Effect.gen(function* () {
const conn = yield* Effect.acquire(
Effect.gen(function* () {
const connection = { id: "db-1", isOpen: true };
yield* Effect.log(`[DB] Opened connection`);
// Return release effect
return Effect.gen(function* () {
yield* Effect.log(`[DB] Closed connection`);
});
})
);
return yield* operation(conn);
});
// Usage - cleanup guaranteed
const dbOperation = withDatabaseConnection((conn) =>
Effect.gen(function* () {
yield* Effect.log(`[DB] Using connection: ${conn.id}`);
return "query result";
})
);Handle cleanup under concurrency:
const createConcurrentResourcePool = <R,>(config: {
createResource: () => Effect.Effect<R>;
destroyResource: (r: R) => Effect.Effect<void>;
maxConcurrent: number;
}) =>
Effect.gen(function* () {
const resources = yield* Ref.make<R[]>([]);
const inUse = yield* Ref.make<Set<unknown>>(new Set());
const withResource = <A,>(
use: (resource: R) => Effect.Effect<A>
): Effect.Effect<A> =>
Effect.gen(function* () {
let resource: R;
// Acquire
const existing = yield* Ref.get(resources);
if (existing.length > 0) {
resource = existing.pop()!;
} else {
resource = yield* config.createResource();
}
const resourceId = Math.random();
yield* Ref.modify(inUse, (set) =>
[undefined, new Set(set).add(resourceId)]
);
try {
return yield* use(resource);
} finally {
// Release - guaranteed
yield* Ref.modify(inUse, (set) => {
const updated = new Set(set);
updated.delete(resourceId);
return [undefined, updated];
});
yield* Ref.modify(resources, (list) => [
undefined,
[...list, resource],
]);
}
});
const shutdownAll = Effect.gen(function* () {
const all = yield* Ref.get(resources);
for (const resource of all) {
yield* config.destroyResource(resource);
}
const stillInUse = yield* Ref.get(inUse);
if (stillInUse.size > 0) {
yield* Effect.log(
`[WARNING] ${stillInUse.size} resources still in use during shutdown`
);
}
});
return { withResource, shutdownAll };
});Ensure all resources clean up on interruption:
const createGracefulStream = <A,>(
source: Stream.Stream<A>,
config: {
shutdown: () => Effect.Effect<void>;
timeout: Duration.Duration;
}
) =>
source.pipe(
Stream.ensuring(
Effect.gen(function* () {
yield* Effect.log(`[STREAM] Starting graceful shutdown`);
yield* config.shutdown.pipe(
Effect.timeout(config.timeout),
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(
`[STREAM] Shutdown timeout or failed: ${error.message}`
);
})
)
);
yield* Effect.log(`[STREAM] Shutdown complete`);
})
)
);| Pattern | Use Case | Cleanup |
|---|---|---|
| Bracket | Simple acquire/release | Finally block |
| Resource.make | Complex setup | Built-in cleanup |
| Scope | Hierarchical resources | Nested cleanup |
| Pool | Reuse connections | Return to pool |
| Stream.bracket | Stream elements | Per-element cleanup |
✅ Use brackets when:
- File I/O operations
- Database connections
- Network sockets
- Temporary resources
✅ Use pools when:
- Expensive resource creation
- Connection reuse
- Resource scarcity
- Performance-critical
✅ Use scopes when:
- Hierarchical resources
- Complex resource graphs
- Nested allocation
- Type-safe guarantees
- Complexity increases
- More error cases to handle
- Debugging harder
- Performance impact
- ✅ Always use try/finally or bracket
- ✅ Test error paths explicitly
- ✅ Verify cleanup on timeout
- ✅ Test concurrent access
- ✅ Monitor for resource leaks
- ✅ Set appropriate timeouts
- ✅ Log resource lifecycle
- ✅ Plan graceful shutdown
- Platform Pattern 2: FileSystem Operations - File resource patterns
- Platform Pattern 3: Key-Value Storage - Persistence resources
- Error Handling Pattern 2: Propagation - Error safety
- Concurrency Pattern 3: Latch Coordination - Synchronized cleanup