Skip to content

Latest commit

 

History

History
577 lines (447 loc) · 14.6 KB

File metadata and controls

577 lines (447 loc) · 14.6 KB
title Stream Pattern 6: Resource Management in Streams
id stream-pattern-resource-management
skillLevel advanced
applicationPatternId streams
summary Properly manage resources (connections, files, memory) in streams using acquire/release patterns and ensuring cleanup on error or completion.
tags
streams
resource-management
cleanup
error-safety
file-handling
connection-pooling
rule
description
Use Stream.bracket or effect scoping to guarantee resource cleanup, preventing leaks even when streams fail or are interrupted.
related
stream-pattern-backpressure-control
platform-pattern-filesystem-operations
error-handling-pattern-custom-strategies
author effect_website
lessonOrder 2

Guideline

Streams must clean up resources deterministically:

  • Acquire/Release: Get resource, use, return resource
  • Bracket pattern: Ensure cleanup on success or failure
  • Scope safety: Guarantee cleanup even on exceptions
  • Connection pooling: Reuse connections, prevent exhaustion
  • Concurrent cleanup: Handle cleanup under concurrency

Pattern: Stream.bracket(), Resource.make(), Scope for resource safety


Rationale

Streams without resource management cause problems:

Problem 1: Resource exhaustion

  • Open file streams without closing → file descriptor limit exceeded
  • Get connections from pool, never return → connection starvation
  • System becomes unresponsive

Problem 2: Memory leaks

  • Stream emits large objects → memory grows
  • Without cleanup → garbage persists
  • GC can't reclaim

Problem 3: Data corruption

  • Write to file without flush → partial writes on crash
  • Read from connection while another thread writes → data race
  • Results are unpredictable

Problem 4: Silent failures

  • Resource cleanup fails → error lost
  • Application proceeds as if successful
  • Hidden bug becomes hard-to-trace crash later

Solutions:

Bracket pattern:

  • Acquire resource
  • Use resource (even if error)
  • Always release resource
  • Track errors separately

Resource scopes:

  • Nested resource management
  • Parent cleanup waits for children
  • Hierarchical resource graphs
  • Type-safe guarantees

Connection pooling:

  • Reuse connections
  • Track available/in-use
  • Prevent exhaustion
  • Support graceful shutdown

Good Example

This example demonstrates resource acquisition, use, and guaranteed cleanup.

import { Effect, Stream, Resource, Scope, Ref } from "effect";

interface FileHandle {
  readonly path: string;
  readonly fd: number;
}

interface Connection {
  readonly id: string;
  readonly isOpen: boolean;
}

// Simulate resource management
const program = Effect.gen(function* () {
  console.log(`\n[RESOURCE MANAGEMENT] Stream resource lifecycle\n`);

  // Example 1: Bracket pattern for file streams
  console.log(`[1] Bracket pattern (acquire → use → release):\n`);

  let openHandles = 0;
  let closedHandles = 0;

  const openFile = (path: string) =>
    Effect.gen(function* () {
      openHandles++;
      yield* Effect.log(`[OPEN] File "${path}" (total open: ${openHandles})`);

      return { path, fd: 1000 + openHandles };
    });

  const closeFile = (handle: FileHandle) =>
    Effect.gen(function* () {
      closedHandles++;
      yield* Effect.log(`[CLOSE] File "${handle.path}" (total closed: ${closedHandles})`);
    });

  const readFileWithBracket = (path: string) =>
    Effect.gen(function* () {
      let handle: FileHandle | null = null;

      try {
        handle = yield* openFile(path);

        yield* Effect.log(
          `[USE] Reading from fd ${handle.fd} ("${handle.path}")`
        );

        // Simulate reading
        return "file contents";
      } finally {
        // Guaranteed to run even if error occurs above
        if (handle) {
          yield* closeFile(handle);
        }
      }
    });

  // Test with success
  yield* Effect.log(`[TEST] Success case:`);

  const content = yield* readFileWithBracket("/data/file.txt");

  yield* Effect.log(`[RESULT] Got: "${content}"\n`);

  // Test with failure (simulated)
  yield* Effect.log(`[TEST] Error case:`);

  const failCase = Effect.gen(function* () {
    let handle: FileHandle | null = null;

    try {
      handle = yield* openFile("/data/missing.txt");

      // Simulate error mid-operation
      yield* Effect.fail(new Error("Read failed"));
    } finally {
      if (handle) {
        yield* closeFile(handle);
      }
    }
  }).pipe(
    Effect.catchAll((error) =>
      Effect.gen(function* () {
        yield* Effect.log(`[ERROR] Caught: ${error.message}`);
        yield* Effect.log(`[CHECK] Closed handles: ${closedHandles} (verifying cleanup)\n`);
      })
    )
  );

  yield* failCase;

  // Example 2: Connection pool management
  console.log(`[2] Connection pooling:\n`);

  interface ConnectionPool {
    acquire: () => Effect.Effect<Connection>;
    release: (conn: Connection) => Effect.Effect<void>;
  }

  const createConnectionPool = (maxSize: number): Effect.Effect<ConnectionPool> =>
    Effect.gen(function* () {
      const available = yield* Ref.make<Connection[]>([]);
      const inUse = yield* Ref.make<Set<string>>(new Set());
      let idCounter = 0;

      return {
        acquire: Effect.gen(function* () {
          const avail = yield* Ref.get(available);

          if (avail.length > 0) {
            yield* Effect.log(`[POOL] Reusing connection from pool`);

            const conn = avail.pop()!;

            yield* Ref.modify(inUse, (set) => [
              undefined,
              new Set(set).add(conn.id),
            ]);

            return conn;
          }

          const inUseCount = (yield* Ref.get(inUse)).size;

          if (inUseCount >= maxSize) {
            yield* Effect.fail(new Error("Pool exhausted"));
          }

          const connId = `conn-${++idCounter}`;

          yield* Effect.log(`[POOL] Creating new connection: ${connId}`);

          const conn = { id: connId, isOpen: true };

          yield* Ref.modify(inUse, (set) => [
            undefined,
            new Set(set).add(connId),
          ]);

          return conn;
        }),

        release: (conn: Connection) =>
          Effect.gen(function* () {
            yield* Ref.modify(inUse, (set) => {
              const updated = new Set(set);
              updated.delete(conn.id);
              return [undefined, updated];
            });

            yield* Ref.modify(available, (avail) => [
              undefined,
              [...avail, conn],
            ]);

            yield* Effect.log(`[POOL] Returned connection: ${conn.id}`);
          }),
      };
    });

  const pool = yield* createConnectionPool(3);

  // Acquire and release connections
  const conn1 = yield* pool.acquire();
  const conn2 = yield* pool.acquire();

  yield* pool.release(conn1);

  const conn3 = yield* pool.acquire(); // Reuses conn1

  yield* Effect.log(`\n`);

  // Example 3: Scope-based resource safety
  console.log(`[3] Scoped resources (hierarchical cleanup):\n`);

  let scopedCount = 0;

  const withScoped = <R,>(create: () => Effect.Effect<R>) =>
    Effect.gen(function* () {
      scopedCount++;
      const id = scopedCount;

      yield* Effect.log(`[SCOPE] Enter scope ${id}`);

      const resource = yield* create();

      yield* Effect.log(`[SCOPE] Using resource in scope ${id}`);

      // Cleanup is guaranteed via ensuring
      yield* Effect.unit.pipe(
        Effect.ensuring(
          Effect.log(`[SCOPE] Cleanup guaranteed for scope ${id}`)
        )
      );

      return resource;
    });

  // Nested scopes
  const result = yield* withScoped(() =>
    Effect.gen(function* () {
      const innerData = yield* withScoped(() => Effect.succeed("inner data"));
      return { level: 1, data: innerData };
    })
  ).pipe(
    Effect.catchAll(() => Effect.succeed({ level: 0, data: null }))
  );

  yield* Effect.log(`[SCOPES] Cleanup order: inner → outer\n`);

  // Example 4: Stream resource management
  console.log(`[4] Stream with resource cleanup:\n`);

  let streamResourceCount = 0;

  // Simulate stream that acquires resources
  const streamWithResources = Stream.empty.pipe(
    Stream.tap(() =>
      Effect.gen(function* () {
        streamResourceCount++;
        yield* Effect.log(`[STREAM-RES] Acquired resource ${streamResourceCount}`);
      })
    ),
    // Cleanup when stream ends
    Stream.ensuring(
      Effect.log(`[STREAM-RES] Cleaning up all ${streamResourceCount} resources`)
    )
  );

  yield* Stream.runDrain(streamWithResources);

  // Example 5: Error propagation with cleanup
  console.log(`\n[5] Error safety with cleanup:\n`);

  const safeRead = (retryCount: number) =>
    Effect.gen(function* () {
      let handle: FileHandle | null = null;

      try {
        handle = yield* openFile(`/data/file-${retryCount}.txt`);

        if (retryCount < 2) {
          yield* Effect.log(`[READ] Attempt ${retryCount}: failing intentionally`);
          yield* Effect.fail(new Error(`Attempt ${retryCount} failed`));
        }

        yield* Effect.log(`[READ] Success on attempt ${retryCount}`);

        return "success";
      } finally {
        if (handle) {
          yield* closeFile(handle);
        }
      }
    });

  // Retry with guaranteed cleanup
  const result2 = yield* safeRead(1).pipe(
    Effect.retry(
      Schedule.recurs(2).pipe(
        Schedule.compose(Schedule.fixed("10 millis"))
      )
    ),
    Effect.catchAll((error) =>
      Effect.gen(function* () {
        yield* Effect.log(`[FINAL] All retries failed: ${error.message}`);
        return "fallback";
      })
    )
  );

  yield* Effect.log(`\n[FINAL] Result: ${result2}`);
});

Effect.runPromise(program);

Advanced: Resource Acquisition with Effect.acquire

Build safe resource APIs:

// Safe resource pattern using Effect.acquire
const withDatabaseConnection = <R,>(
  operation: (conn: Connection) => Effect.Effect<R>
): Effect.Effect<R> =>
  Effect.gen(function* () {
    const conn = yield* Effect.acquire(
      Effect.gen(function* () {
        const connection = { id: "db-1", isOpen: true };

        yield* Effect.log(`[DB] Opened connection`);

        // Return release effect
        return Effect.gen(function* () {
          yield* Effect.log(`[DB] Closed connection`);
        });
      })
    );

    return yield* operation(conn);
  });

// Usage - cleanup guaranteed
const dbOperation = withDatabaseConnection((conn) =>
  Effect.gen(function* () {
    yield* Effect.log(`[DB] Using connection: ${conn.id}`);
    return "query result";
  })
);

Advanced: Concurrent Resource Cleanup

Handle cleanup under concurrency:

const createConcurrentResourcePool = <R,>(config: {
  createResource: () => Effect.Effect<R>;
  destroyResource: (r: R) => Effect.Effect<void>;
  maxConcurrent: number;
}) =>
  Effect.gen(function* () {
    const resources = yield* Ref.make<R[]>([]);
    const inUse = yield* Ref.make<Set<unknown>>(new Set());

    const withResource = <A,>(
      use: (resource: R) => Effect.Effect<A>
    ): Effect.Effect<A> =>
      Effect.gen(function* () {
        let resource: R;

        // Acquire
        const existing = yield* Ref.get(resources);

        if (existing.length > 0) {
          resource = existing.pop()!;
        } else {
          resource = yield* config.createResource();
        }

        const resourceId = Math.random();

        yield* Ref.modify(inUse, (set) =>
          [undefined, new Set(set).add(resourceId)]
        );

        try {
          return yield* use(resource);
        } finally {
          // Release - guaranteed
          yield* Ref.modify(inUse, (set) => {
            const updated = new Set(set);
            updated.delete(resourceId);
            return [undefined, updated];
          });

          yield* Ref.modify(resources, (list) => [
            undefined,
            [...list, resource],
          ]);
        }
      });

    const shutdownAll = Effect.gen(function* () {
      const all = yield* Ref.get(resources);

      for (const resource of all) {
        yield* config.destroyResource(resource);
      }

      const stillInUse = yield* Ref.get(inUse);

      if (stillInUse.size > 0) {
        yield* Effect.log(
          `[WARNING] ${stillInUse.size} resources still in use during shutdown`
        );
      }
    });

    return { withResource, shutdownAll };
  });

Advanced: Graceful Stream Shutdown

Ensure all resources clean up on interruption:

const createGracefulStream = <A,>(
  source: Stream.Stream<A>,
  config: {
    shutdown: () => Effect.Effect<void>;
    timeout: Duration.Duration;
  }
) =>
  source.pipe(
    Stream.ensuring(
      Effect.gen(function* () {
        yield* Effect.log(`[STREAM] Starting graceful shutdown`);

        yield* config.shutdown.pipe(
          Effect.timeout(config.timeout),
          Effect.catchAll((error) =>
            Effect.gen(function* () {
              yield* Effect.log(
                `[STREAM] Shutdown timeout or failed: ${error.message}`
              );
            })
          )
        );

        yield* Effect.log(`[STREAM] Shutdown complete`);
      })
    )
  );

Common Resource Patterns

Pattern Use Case Cleanup
Bracket Simple acquire/release Finally block
Resource.make Complex setup Built-in cleanup
Scope Hierarchical resources Nested cleanup
Pool Reuse connections Return to pool
Stream.bracket Stream elements Per-element cleanup

When to Use This Pattern

Use brackets when:

  • File I/O operations
  • Database connections
  • Network sockets
  • Temporary resources

Use pools when:

  • Expensive resource creation
  • Connection reuse
  • Resource scarcity
  • Performance-critical

Use scopes when:

  • Hierarchical resources
  • Complex resource graphs
  • Nested allocation
  • Type-safe guarantees

⚠️ Trade-offs:

  • Complexity increases
  • More error cases to handle
  • Debugging harder
  • Performance impact

Resource Cleanup Checklist

  • ✅ Always use try/finally or bracket
  • ✅ Test error paths explicitly
  • ✅ Verify cleanup on timeout
  • ✅ Test concurrent access
  • ✅ Monitor for resource leaks
  • ✅ Set appropriate timeouts
  • ✅ Log resource lifecycle
  • ✅ Plan graceful shutdown

See Also