Skip to content

Latest commit

 

History

History
176 lines (143 loc) · 6.43 KB

File metadata and controls

176 lines (143 loc) · 6.43 KB
title Manage Resources Safely in a Pipeline
id stream-manage-resources
skillLevel advanced
applicationPatternId building-data-pipelines
summary Ensure resources like file handles or connections are safely acquired at the start of a pipeline and always released at the end, even on failure.
tags
stream
resource
scope
acquireRelease
bracket
safety
file
rule
description
Use Stream.acquireRelease to safely manage the lifecycle of a resource within a pipeline.
author PaulJPhilp
related
stream-from-file
stream-run-for-effects
lessonOrder 4

Guideline

To safely manage a resource that has an open/close lifecycle (like a file handle or database connection) for the duration of a stream, use the Stream.acquireRelease constructor.


Rationale

What happens if a pipeline processing a file fails halfway through? In a naive implementation, the file handle might be left open, leading to a resource leak. Over time, these leaks can exhaust system resources and crash your application.

Stream.acquireRelease is Effect's robust solution to this problem. It's built on Scope, Effect's fundamental resource-management tool.

  1. Guaranteed Cleanup: You provide an acquire effect to open the resource and a release effect to close it. Effect guarantees that the release effect will be called when the stream terminates, for any reason: successful completion, a processing failure, or even external interruption.
  2. Declarative and Co-located: The logic for a resource's entire lifecycle—acquisition, usage (the stream itself), and release—is defined in one place. This makes the code easier to understand and reason about compared to manual try/finally blocks.
  3. Prevents Resource Leaks: It is the idiomatic way to build truly resilient pipelines that do not leak resources, which is essential for long-running, production-grade applications.
  4. Composability: The resulting stream is just a normal Stream, which can be composed with any other stream operators.

Good Example

This example creates and writes to a temporary file. Stream.acquireRelease is used to acquire a readable stream from that file. The pipeline then processes the file but is designed to fail partway through. The logs demonstrate that the release effect (which deletes the file) is still executed, preventing any resource leaks.

import { Effect, Layer } from "effect";
import { FileSystem } from "@effect/platform/FileSystem";
import { NodeFileSystem } from "@effect/platform-node";
import * as path from "node:path";

interface ProcessError {
  readonly _tag: "ProcessError";
  readonly message: string;
}

const ProcessError = (message: string): ProcessError => ({
  _tag: "ProcessError",
  message,
});

interface FileServiceType {
  readonly createTempFile: () => Effect.Effect<{ filePath: string }, never>;
  readonly cleanup: (filePath: string) => Effect.Effect<void, never>;
  readonly readFile: (filePath: string) => Effect.Effect<string, never>;
}

export class FileService extends Effect.Service<FileService>()("FileService", {
  sync: () => {
    const filePath = path.join(__dirname, "temp-resource.txt");
    return {
      createTempFile: () => Effect.succeed({ filePath }),
      cleanup: (filePath: string) =>
        Effect.log("✅ Resource cleaned up successfully"),
      readFile: (filePath: string) =>
        Effect.succeed("data 1\ndata 2\nFAIL\ndata 4"),
    };
  },
}) {}

// Process a single line
const processLine = (line: string): Effect.Effect<void, ProcessError> =>
  line === "FAIL"
    ? Effect.fail(ProcessError("Failed to process line"))
    : Effect.log(`Processed: ${line}`);

// Create and process the file with proper resource management
const program = Effect.gen(function* () {
  yield* Effect.log("=== Stream Resource Management Demo ===");
  yield* Effect.log(
    "This demonstrates proper resource cleanup even when errors occur"
  );

  const fileService = yield* FileService;
  const { filePath } = yield* fileService.createTempFile();

  // Use scoped to ensure cleanup happens even on failure
  yield* Effect.scoped(
    Effect.gen(function* () {
      yield* Effect.addFinalizer(() => fileService.cleanup(filePath));

      const content = yield* fileService.readFile(filePath);
      const lines = content.split("\n");

      // Process each line, continuing even if some fail
      for (const line of lines) {
        yield* processLine(line).pipe(
          Effect.catchAll((error) =>
            Effect.log(`⚠️  Skipped line due to error: ${error.message}`)
          )
        );
      }

      yield* Effect.log(
        "✅ Processing completed with proper resource management"
      );
    })
  );
});

// Run the program with FileService layer
Effect.runPromise(Effect.provide(program, FileService.Default)).catch(
  (error) => {
    Effect.runSync(Effect.logError("Unexpected error: " + error));
  }
);

Anti-Pattern

The anti-pattern is to manage resources manually outside the stream's context. This is brittle and almost always leads to resource leaks when errors occur.

import { Effect, Stream } from "effect";
import { NodeFileSystem } from "@effect/platform-node";
import * as path from "node:path";

const program = Effect.gen(function* () {
  const fs = yield* NodeFileSystem;
  const filePath = path.join(__dirname, "temp-resource-bad.txt");

  // 1. Resource acquired manually before the stream
  yield* fs.writeFileString(filePath, "data 1\ndata 2");
  const readable = fs.createReadStream(filePath);
  yield* Effect.log("Resource acquired manually.");

  const stream = Stream.fromReadable(() => readable).pipe(
    Stream.decodeText("utf-8"),
    Stream.splitLines,
    // This stream will fail, causing the run to reject.
    Stream.map(() => {
      throw new Error("Something went wrong!");
    })
  );

  // 2. Stream is executed
  yield* Stream.runDrain(stream);

  // 3. This release logic is NEVER reached if the stream fails.
  yield* fs.remove(filePath);
  yield* Effect.log("Resource released manually. (This will not be logged)");
});

Effect.runPromiseExit(program).then((exit) => {
  if (exit._tag === "Failure") {
    console.log("\nPipeline failed. The temp file was NOT deleted.");
  }
});

In this anti-pattern, the fs.remove call is unreachable because the Stream.runDrain effect fails, causing the gen block to terminate immediately. The temporary file is leaked onto the disk. Stream.acquireRelease solves this problem entirely.