Skip to content

Latest commit

 

History

History
315 lines (254 loc) · 8.9 KB

File metadata and controls

315 lines (254 loc) · 8.9 KB
title Sink Pattern 3: Write Stream Lines to File
id sink-pattern-write-stream-lines-to-file
skillLevel intermediate
applicationPatternId streams-sinks
summary Use Sink to write stream data as lines to a file with buffering for efficiency, supporting log files and line-oriented formats.
tags
sink
stream
file-io
persistence
logging
buffering
rule
description
Write streaming lines to a file efficiently using buffered output and proper resource management.
related
process-streaming-data-with-stream
sink-pattern-batch-insert-stream-records-into-database
manage-resource-lifecycles-with-scope
author effect_website
lessonOrder 3

Guideline

When consuming a stream of data to persist as lines in a file, use Sink with a file writer. Buffer the output for efficiency and ensure proper resource cleanup using Effect's scope management.


Rationale

Writing stream data to files requires:

  • Buffering: Writing one line at a time is slow. Buffer multiple lines before flushing to disk
  • Efficiency: Reduce system calls and I/O overhead by batching writes
  • Resource Management: Ensure file handles are properly closed even on errors
  • Ordering: Maintain the order of lines as they appear in the stream

This pattern is essential for:

  • Log files and audit trails
  • CSV/JSON Line export
  • Streaming data archival
  • Data pipelines with file intermediates

Good Example

This example demonstrates streaming log entries and writing them to a file with buffering.

import { Effect, Stream, Sink, Chunk, FileSystem } from "effect";

interface LogEntry {
  readonly level: "debug" | "info" | "warn" | "error";
  readonly message: string;
  readonly timestamp: number;
}

// Format a log entry as a line
const formatLogLine = (entry: LogEntry): string => {
  const iso = new Date(entry.timestamp).toISOString();
  return `[${iso}] ${entry.level.toUpperCase()}: ${entry.message}`;
};

// Simulate a stream of log entries
const logStream: Stream.Stream<LogEntry> = Stream.fromIterable([
  { level: "info", message: "Server starting", timestamp: Date.now() },
  { level: "debug", message: "Loading config", timestamp: Date.now() + 100 },
  { level: "info", message: "Connected to database", timestamp: Date.now() + 200 },
  { level: "warn", message: "High memory usage detected", timestamp: Date.now() + 300 },
  { level: "info", message: "Processing request", timestamp: Date.now() + 400 },
  { level: "error", message: "Connection timeout", timestamp: Date.now() + 500 },
  { level: "info", message: "Retrying connection", timestamp: Date.now() + 600 },
  { level: "info", message: "Connection restored", timestamp: Date.now() + 700 },
]);

// Create a file writer sink with buffering
const createFileWriteSink = (
  filePath: string,
  bufferSize: number = 100
): Sink.Sink<number, Error, string> =>
  Effect.scoped(
    Effect.gen(function* () {
      // Open file in append mode
      const fs = yield* FileSystem.FileSystem;
      const handle = yield* fs.open(filePath, "a");

      let buffer: string[] = [];
      let lineCount = 0;

      // Flush buffered lines to disk
      const flush = Effect.gen(function* () {
        if (buffer.length === 0) return;

        const content = buffer.join("\n") + "\n";
        yield* fs.write(handle, content);
        buffer = [];
      });

      // Return the sink
      return Sink.fold(
        0,
        (count, line: string) =>
          Effect.gen(function* () {
            buffer.push(line);
            const newCount = count + 1;

            // Flush when buffer reaches size limit
            if (buffer.length >= bufferSize) {
              yield* flush;
            }

            return newCount;
          }),
        (count) =>
          Effect.gen(function* () {
            // Flush any remaining lines before closing
            yield* flush;
            yield* fs.close(handle);
            return count;
          })
      );
    })
  ).pipe(
    Effect.flatten
  );

// Process the log stream
const program = Effect.gen(function* () {
  const fs = yield* FileSystem.FileSystem;
  const filePath = "/tmp/app.log";

  // Clear the file first
  yield* fs.writeFileString(filePath, "");

  // Stream logs, format them, and write to file
  const written = yield* logStream.pipe(
    Stream.map(formatLogLine),
    Stream.run(createFileWriteSink(filePath, 50)) // Buffer 50 lines before flush
  );

  console.log(`Wrote ${written} log lines to ${filePath}`);

  // Read back the file to verify
  const content = yield* fs.readFileString(filePath);
  console.log("\nFile contents:");
  console.log(content);
});

Effect.runPromise(program);

This pattern:

  1. Opens a file for appending
  2. Buffers log lines in memory (50 lines before flush)
  3. Flushes periodically when buffer fills or stream ends
  4. Closes the file safely using scopes
  5. Tracks line count for confirmation

Advanced: Rotating Log Files

Handle log rotation when file size or time limit is reached:

interface RotationPolicy {
  readonly maxLines?: number; // Rotate after N lines
  readonly maxSizeBytes?: number; // Rotate after N bytes
  readonly maxAgeMs?: number; // Rotate after N milliseconds
}

const createRotatingFileSink = (
  baseFilePath: string,
  policy: RotationPolicy,
  bufferSize: number = 100
): Sink.Sink<number, Error, string> =>
  Effect.scoped(
    Effect.gen(function* () {
      const fs = yield* FileSystem.FileSystem;
      let fileIndex = 0;
      let currentLineCount = 0;
      let currentByteCount = 0;

      const getFilePath = () =>
        fileIndex === 0
          ? baseFilePath
          : baseFilePath.replace(/(\.[^.]*)?$/, `.${fileIndex}$1`);

      let handle = yield* fs.open(getFilePath(), "a");
      let buffer: string[] = [];

      const rotateFile = Effect.gen(function* () {
        // Flush current buffer
        if (buffer.length > 0) {
          const content = buffer.join("\n") + "\n";
          yield* fs.write(handle, content);
          buffer = [];
        }

        // Close current file
        yield* fs.close(handle);

        // Open new file
        fileIndex++;
        currentLineCount = 0;
        currentByteCount = 0;
        handle = yield* fs.open(getFilePath(), "a");
        console.log(`Rotated to ${getFilePath()}`);
      });

      const shouldRotate = (line: string): boolean => {
        if (policy.maxLines && currentLineCount >= policy.maxLines)
          return true;
        if (policy.maxSizeBytes && currentByteCount >= policy.maxSizeBytes)
          return true;
        return false;
      };

      return Sink.fold(
        0,
        (count, line: string) =>
          Effect.gen(function* () {
            if (shouldRotate(line)) {
              yield* rotateFile;
            }

            buffer.push(line);
            currentLineCount++;
            currentByteCount += line.length + 1; // +1 for newline

            if (buffer.length >= bufferSize) {
              const content = buffer.join("\n") + "\n";
              yield* fs.write(handle, content);
              buffer = [];
            }

            return count + 1;
          }),
        (count) =>
          Effect.gen(function* () {
            if (buffer.length > 0) {
              const content = buffer.join("\n") + "\n";
              yield* fs.write(handle, content);
            }
            yield* fs.close(handle);
            return count;
          })
      );
    })
  ).pipe(Effect.flatten);

Advanced: Compression and Archival

Compress rotated files automatically:

import { execSync } from "child_process";

const compressRotatedFile = (filePath: string): Effect.Effect<void> =>
  Effect.try(() => {
    execSync(`gzip -f ${filePath}`);
  }).pipe(
    Effect.catchAll((error) =>
      Effect.log(`Failed to compress ${filePath}: ${error}`)
    )
  );

// In the rotation logic:
const rotateFile = Effect.gen(function* () {
  // ... existing rotation code ...

  // Compress the old file
  const oldFilePath = getFilePath();
  yield* compressRotatedFile(oldFilePath);
});

When to Use This Pattern

Use file writing when:

  • Streaming logs or audit trails
  • Exporting large datasets to files
  • Creating line-oriented formats (JSON Lines, CSV)
  • Need persistent local storage
  • Data volume too large to fit in memory

⚠️ Trade-offs:

  • Disk I/O can be slower than in-memory processing
  • File system limitations on file size and path length
  • Requires cleanup of old/rotated files
  • Less efficient than databases for querying

See Also