| title | Manage Resources Safely in a Pipeline | |||||||
|---|---|---|---|---|---|---|---|---|
| id | stream-manage-resources | |||||||
| skillLevel | advanced | |||||||
| applicationPatternId | building-data-pipelines | |||||||
| summary | Ensure resources like file handles or connections are safely acquired at the start of a pipeline and always released at the end, even on failure. | |||||||
| tags |
|
|||||||
| rule |
|
|||||||
| author | PaulJPhilp | |||||||
| related |
|
|||||||
| lessonOrder | 4 |
To safely manage a resource that has an open/close lifecycle (like a file handle or database connection) for the duration of a stream, use the Stream.acquireRelease constructor.
What happens if a pipeline processing a file fails halfway through? In a naive implementation, the file handle might be left open, leading to a resource leak. Over time, these leaks can exhaust system resources and crash your application.
Stream.acquireRelease is Effect's robust solution to this problem. It's built on Scope, Effect's fundamental resource-management tool.
- Guaranteed Cleanup: You provide an
acquireeffect to open the resource and areleaseeffect to close it. Effect guarantees that thereleaseeffect will be called when the stream terminates, for any reason: successful completion, a processing failure, or even external interruption. - Declarative and Co-located: The logic for a resource's entire lifecycle—acquisition, usage (the stream itself), and release—is defined in one place. This makes the code easier to understand and reason about compared to manual
try/finallyblocks. - Prevents Resource Leaks: It is the idiomatic way to build truly resilient pipelines that do not leak resources, which is essential for long-running, production-grade applications.
- Composability: The resulting stream is just a normal
Stream, which can be composed with any other stream operators.
This example creates and writes to a temporary file. Stream.acquireRelease is used to acquire a readable stream from that file. The pipeline then processes the file but is designed to fail partway through. The logs demonstrate that the release effect (which deletes the file) is still executed, preventing any resource leaks.
import { Effect, Layer } from "effect";
import { FileSystem } from "@effect/platform/FileSystem";
import { NodeFileSystem } from "@effect/platform-node";
import * as path from "node:path";
interface ProcessError {
readonly _tag: "ProcessError";
readonly message: string;
}
const ProcessError = (message: string): ProcessError => ({
_tag: "ProcessError",
message,
});
interface FileServiceType {
readonly createTempFile: () => Effect.Effect<{ filePath: string }, never>;
readonly cleanup: (filePath: string) => Effect.Effect<void, never>;
readonly readFile: (filePath: string) => Effect.Effect<string, never>;
}
export class FileService extends Effect.Service<FileService>()("FileService", {
sync: () => {
const filePath = path.join(__dirname, "temp-resource.txt");
return {
createTempFile: () => Effect.succeed({ filePath }),
cleanup: (filePath: string) =>
Effect.log("✅ Resource cleaned up successfully"),
readFile: (filePath: string) =>
Effect.succeed("data 1\ndata 2\nFAIL\ndata 4"),
};
},
}) {}
// Process a single line
const processLine = (line: string): Effect.Effect<void, ProcessError> =>
line === "FAIL"
? Effect.fail(ProcessError("Failed to process line"))
: Effect.log(`Processed: ${line}`);
// Create and process the file with proper resource management
const program = Effect.gen(function* () {
yield* Effect.log("=== Stream Resource Management Demo ===");
yield* Effect.log(
"This demonstrates proper resource cleanup even when errors occur"
);
const fileService = yield* FileService;
const { filePath } = yield* fileService.createTempFile();
// Use scoped to ensure cleanup happens even on failure
yield* Effect.scoped(
Effect.gen(function* () {
yield* Effect.addFinalizer(() => fileService.cleanup(filePath));
const content = yield* fileService.readFile(filePath);
const lines = content.split("\n");
// Process each line, continuing even if some fail
for (const line of lines) {
yield* processLine(line).pipe(
Effect.catchAll((error) =>
Effect.log(`⚠️ Skipped line due to error: ${error.message}`)
)
);
}
yield* Effect.log(
"✅ Processing completed with proper resource management"
);
})
);
});
// Run the program with FileService layer
Effect.runPromise(Effect.provide(program, FileService.Default)).catch(
(error) => {
Effect.runSync(Effect.logError("Unexpected error: " + error));
}
);The anti-pattern is to manage resources manually outside the stream's context. This is brittle and almost always leads to resource leaks when errors occur.
import { Effect, Stream } from "effect";
import { NodeFileSystem } from "@effect/platform-node";
import * as path from "node:path";
const program = Effect.gen(function* () {
const fs = yield* NodeFileSystem;
const filePath = path.join(__dirname, "temp-resource-bad.txt");
// 1. Resource acquired manually before the stream
yield* fs.writeFileString(filePath, "data 1\ndata 2");
const readable = fs.createReadStream(filePath);
yield* Effect.log("Resource acquired manually.");
const stream = Stream.fromReadable(() => readable).pipe(
Stream.decodeText("utf-8"),
Stream.splitLines,
// This stream will fail, causing the run to reject.
Stream.map(() => {
throw new Error("Something went wrong!");
})
);
// 2. Stream is executed
yield* Stream.runDrain(stream);
// 3. This release logic is NEVER reached if the stream fails.
yield* fs.remove(filePath);
yield* Effect.log("Resource released manually. (This will not be logged)");
});
Effect.runPromiseExit(program).then((exit) => {
if (exit._tag === "Failure") {
console.log("\nPipeline failed. The temp file was NOT deleted.");
}
});In this anti-pattern, the fs.remove call is unreachable because the Stream.runDrain effect fails, causing the gen block to terminate immediately. The temporary file is leaked onto the disk. Stream.acquireRelease solves this problem entirely.