| title | Sink Pattern 3: Write Stream Lines to File | ||||||
|---|---|---|---|---|---|---|---|
| id | sink-pattern-write-stream-lines-to-file | ||||||
| skillLevel | intermediate | ||||||
| applicationPatternId | streams-sinks | ||||||
| summary | Use Sink to write stream data as lines to a file with buffering for efficiency, supporting log files and line-oriented formats. | ||||||
| tags |
|
||||||
| rule |
|
||||||
| related |
|
||||||
| author | effect_website | ||||||
| lessonOrder | 3 |
When consuming a stream of data to persist as lines in a file, use Sink with a file writer. Buffer the output for efficiency and ensure proper resource cleanup using Effect's scope management.
Writing stream data to files requires:
- Buffering: Writing one line at a time is slow. Buffer multiple lines before flushing to disk
- Efficiency: Reduce system calls and I/O overhead by batching writes
- Resource Management: Ensure file handles are properly closed even on errors
- Ordering: Maintain the order of lines as they appear in the stream
This pattern is essential for:
- Log files and audit trails
- CSV/JSON Line export
- Streaming data archival
- Data pipelines with file intermediates
This example demonstrates streaming log entries and writing them to a file with buffering.
import { Effect, Stream, Sink, Chunk, FileSystem } from "effect";
interface LogEntry {
readonly level: "debug" | "info" | "warn" | "error";
readonly message: string;
readonly timestamp: number;
}
// Format a log entry as a line
const formatLogLine = (entry: LogEntry): string => {
const iso = new Date(entry.timestamp).toISOString();
return `[${iso}] ${entry.level.toUpperCase()}: ${entry.message}`;
};
// Simulate a stream of log entries
const logStream: Stream.Stream<LogEntry> = Stream.fromIterable([
{ level: "info", message: "Server starting", timestamp: Date.now() },
{ level: "debug", message: "Loading config", timestamp: Date.now() + 100 },
{ level: "info", message: "Connected to database", timestamp: Date.now() + 200 },
{ level: "warn", message: "High memory usage detected", timestamp: Date.now() + 300 },
{ level: "info", message: "Processing request", timestamp: Date.now() + 400 },
{ level: "error", message: "Connection timeout", timestamp: Date.now() + 500 },
{ level: "info", message: "Retrying connection", timestamp: Date.now() + 600 },
{ level: "info", message: "Connection restored", timestamp: Date.now() + 700 },
]);
// Create a file writer sink with buffering
const createFileWriteSink = (
filePath: string,
bufferSize: number = 100
): Sink.Sink<number, Error, string> =>
Effect.scoped(
Effect.gen(function* () {
// Open file in append mode
const fs = yield* FileSystem.FileSystem;
const handle = yield* fs.open(filePath, "a");
let buffer: string[] = [];
let lineCount = 0;
// Flush buffered lines to disk
const flush = Effect.gen(function* () {
if (buffer.length === 0) return;
const content = buffer.join("\n") + "\n";
yield* fs.write(handle, content);
buffer = [];
});
// Return the sink
return Sink.fold(
0,
(count, line: string) =>
Effect.gen(function* () {
buffer.push(line);
const newCount = count + 1;
// Flush when buffer reaches size limit
if (buffer.length >= bufferSize) {
yield* flush;
}
return newCount;
}),
(count) =>
Effect.gen(function* () {
// Flush any remaining lines before closing
yield* flush;
yield* fs.close(handle);
return count;
})
);
})
).pipe(
Effect.flatten
);
// Process the log stream
const program = Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem;
const filePath = "/tmp/app.log";
// Clear the file first
yield* fs.writeFileString(filePath, "");
// Stream logs, format them, and write to file
const written = yield* logStream.pipe(
Stream.map(formatLogLine),
Stream.run(createFileWriteSink(filePath, 50)) // Buffer 50 lines before flush
);
console.log(`Wrote ${written} log lines to ${filePath}`);
// Read back the file to verify
const content = yield* fs.readFileString(filePath);
console.log("\nFile contents:");
console.log(content);
});
Effect.runPromise(program);This pattern:
- Opens a file for appending
- Buffers log lines in memory (50 lines before flush)
- Flushes periodically when buffer fills or stream ends
- Closes the file safely using scopes
- Tracks line count for confirmation
Handle log rotation when file size or time limit is reached:
interface RotationPolicy {
readonly maxLines?: number; // Rotate after N lines
readonly maxSizeBytes?: number; // Rotate after N bytes
readonly maxAgeMs?: number; // Rotate after N milliseconds
}
const createRotatingFileSink = (
baseFilePath: string,
policy: RotationPolicy,
bufferSize: number = 100
): Sink.Sink<number, Error, string> =>
Effect.scoped(
Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem;
let fileIndex = 0;
let currentLineCount = 0;
let currentByteCount = 0;
const getFilePath = () =>
fileIndex === 0
? baseFilePath
: baseFilePath.replace(/(\.[^.]*)?$/, `.${fileIndex}$1`);
let handle = yield* fs.open(getFilePath(), "a");
let buffer: string[] = [];
const rotateFile = Effect.gen(function* () {
// Flush current buffer
if (buffer.length > 0) {
const content = buffer.join("\n") + "\n";
yield* fs.write(handle, content);
buffer = [];
}
// Close current file
yield* fs.close(handle);
// Open new file
fileIndex++;
currentLineCount = 0;
currentByteCount = 0;
handle = yield* fs.open(getFilePath(), "a");
console.log(`Rotated to ${getFilePath()}`);
});
const shouldRotate = (line: string): boolean => {
if (policy.maxLines && currentLineCount >= policy.maxLines)
return true;
if (policy.maxSizeBytes && currentByteCount >= policy.maxSizeBytes)
return true;
return false;
};
return Sink.fold(
0,
(count, line: string) =>
Effect.gen(function* () {
if (shouldRotate(line)) {
yield* rotateFile;
}
buffer.push(line);
currentLineCount++;
currentByteCount += line.length + 1; // +1 for newline
if (buffer.length >= bufferSize) {
const content = buffer.join("\n") + "\n";
yield* fs.write(handle, content);
buffer = [];
}
return count + 1;
}),
(count) =>
Effect.gen(function* () {
if (buffer.length > 0) {
const content = buffer.join("\n") + "\n";
yield* fs.write(handle, content);
}
yield* fs.close(handle);
return count;
})
);
})
).pipe(Effect.flatten);Compress rotated files automatically:
import { execSync } from "child_process";
const compressRotatedFile = (filePath: string): Effect.Effect<void> =>
Effect.try(() => {
execSync(`gzip -f ${filePath}`);
}).pipe(
Effect.catchAll((error) =>
Effect.log(`Failed to compress ${filePath}: ${error}`)
)
);
// In the rotation logic:
const rotateFile = Effect.gen(function* () {
// ... existing rotation code ...
// Compress the old file
const oldFilePath = getFilePath();
yield* compressRotatedFile(oldFilePath);
});✅ Use file writing when:
- Streaming logs or audit trails
- Exporting large datasets to files
- Creating line-oriented formats (JSON Lines, CSV)
- Need persistent local storage
- Data volume too large to fit in memory
- Disk I/O can be slower than in-memory processing
- File system limitations on file size and path length
- Requires cleanup of old/rotated files
- Less efficient than databases for querying
- Sink Pattern 1: Batch Insert - Bulk database persistence
- Sink Pattern 2: Event Log - Event sourcing
- Process Streaming Data with Stream - Stream fundamentals
- Manage Resource Lifecycles with Scope - File handle cleanup