| title | Sink Pattern 6: Retry Failed Stream Operations | ||||||
|---|---|---|---|---|---|---|---|
| id | sink-pattern-retry-failed-stream-operations | ||||||
| skillLevel | intermediate | ||||||
| applicationPatternId | streams-sinks | ||||||
| summary | Use Sink with configurable retry policies to automatically retry failed operations with exponential backoff, enabling recovery from transient failures without losing data. | ||||||
| tags |
|
||||||
| rule |
|
||||||
| related |
|
||||||
| author | effect_website | ||||||
| lessonOrder | 6 |
When consuming a stream to a destination that may experience transient failures (network timeouts, rate limiting, temporary unavailability), wrap the sink operation with a retry policy. Use exponential backoff to avoid overwhelming a recovering system while still recovering quickly.
Transient failures are common in distributed systems:
- Network timeouts: Temporary connectivity issues resolve themselves
- Rate limiting: Service recovers once rate limit window resets
- Temporary unavailability: Services restart or scale up
- Circuit breaker trips: Service recovers after backoff period
Without retry logic:
- Every transient failure causes data loss or stream interruption
- Manual intervention required to restart
- System appears less reliable than it actually is
With intelligent retry logic:
- Automatic recovery from transient failures
- Exponential backoff prevents thundering herd
- Clear visibility into which operations failed permanently
- Data flows continuously despite temporary issues
This example demonstrates retrying database writes with exponential backoff, tracking attempts, and falling back on permanent failures.
import { Effect, Stream, Sink, Chunk, Duration, Schedule } from "effect";
interface UserRecord {
readonly userId: string;
readonly name: string;
readonly email: string;
}
class WriteError extends Error {
readonly isTransient: boolean;
constructor(message: string, isTransient: boolean = true) {
super(message);
this.name = "WriteError";
this.isTransient = isTransient;
}
}
// Mock database that occasionally fails
const database = {
failureRate: 0.3, // 30% transient failure rate
permanentFailureRate: 0.05, // 5% permanent failure rate
insertUser: (user: UserRecord): Effect.Effect<void, WriteError> =>
Effect.gen(function* () {
const rand = Math.random();
// Permanent failure (e.g., constraint violation)
if (rand < database.permanentFailureRate) {
throw new WriteError(
`Permanent: User ${user.userId} already exists`,
false
);
}
// Transient failure (e.g., connection timeout)
if (rand < database.permanentFailureRate + database.failureRate) {
throw new WriteError(
`Transient: Connection timeout writing ${user.userId}`,
true
);
}
// Success
console.log(`✓ Wrote user ${user.userId}`);
}),
};
// Retry configuration
interface RetryConfig {
readonly maxAttempts: number;
readonly initialDelayMs: number;
readonly maxDelayMs: number;
readonly backoffFactor: number;
}
const defaultRetryConfig: RetryConfig = {
maxAttempts: 5,
initialDelayMs: 100, // Start with 100ms
maxDelayMs: 5000, // Cap at 5 seconds
backoffFactor: 2, // Double each time
};
// Result tracking
interface OperationResult {
readonly succeeded: number;
readonly transientFailures: number;
readonly permanentFailures: number;
readonly detailedStats: Array<{
readonly userId: string;
readonly attempts: number;
readonly status: "success" | "transient-failed" | "permanent-failed";
}>;
}
// Create a sink with retry logic
const createRetrySink = (config: RetryConfig): Sink.Sink<OperationResult, never, UserRecord> =>
Sink.fold(
{
succeeded: 0,
transientFailures: 0,
permanentFailures: 0,
detailedStats: [],
},
(state, user) =>
Effect.gen(function* () {
let lastError: WriteError | null = null;
let attempts = 0;
// Retry loop
for (attempts = 1; attempts <= config.maxAttempts; attempts++) {
try {
yield* database.insertUser(user);
// Success!
console.log(
`[${user.userId}] Success on attempt ${attempts}/${config.maxAttempts}`
);
return {
...state,
succeeded: state.succeeded + 1,
detailedStats: [
...state.detailedStats,
{
userId: user.userId,
attempts,
status: "success",
},
],
};
} catch (error) {
lastError = error as WriteError;
if (!lastError.isTransient) {
// Permanent failure, don't retry
console.log(
`[${user.userId}] Permanent failure: ${lastError.message}`
);
return {
...state,
permanentFailures: state.permanentFailures + 1,
detailedStats: [
...state.detailedStats,
{
userId: user.userId,
attempts,
status: "permanent-failed",
},
],
};
}
// Transient failure, retry if attempts remain
if (attempts < config.maxAttempts) {
// Calculate delay with exponential backoff
let delayMs = config.initialDelayMs * Math.pow(config.backoffFactor, attempts - 1);
delayMs = Math.min(delayMs, config.maxDelayMs);
// Add jitter (±10%)
const jitter = delayMs * 0.1;
delayMs = delayMs + (Math.random() - 0.5) * 2 * jitter;
console.log(
`[${user.userId}] Transient failure (attempt ${attempts}/${config.maxAttempts}): ${lastError.message}`
);
console.log(` Retrying in ${Math.round(delayMs)}ms...`);
yield* Effect.sleep(Duration.millis(Math.round(delayMs)));
}
}
}
// All retries exhausted
console.log(
`[${user.userId}] Failed after ${config.maxAttempts} attempts`
);
return {
...state,
transientFailures: state.transientFailures + 1,
detailedStats: [
...state.detailedStats,
{
userId: user.userId,
attempts: config.maxAttempts,
status: "transient-failed",
},
],
};
}),
(state) =>
Effect.gen(function* () {
console.log(`\n[SUMMARY]`);
console.log(` Succeeded: ${state.succeeded}`);
console.log(` Transient Failures: ${state.transientFailures}`);
console.log(` Permanent Failures: ${state.permanentFailures}`);
console.log(` Total: ${state.detailedStats.length}`);
// Show detailed stats
const failed = state.detailedStats.filter((s) => s.status !== "success");
if (failed.length > 0) {
console.log(`\n[FAILURES]`);
failed.forEach((stat) => {
console.log(
` ${stat.userId}: ${stat.attempts} attempts (${stat.status})`
);
});
}
return state;
})
);
// Simulate a stream of users to insert
const userStream: Stream.Stream<UserRecord> = Stream.fromIterable([
{ userId: "user-1", name: "Alice", email: "alice@example.com" },
{ userId: "user-2", name: "Bob", email: "bob@example.com" },
{ userId: "user-3", name: "Charlie", email: "charlie@example.com" },
{ userId: "user-4", name: "Diana", email: "diana@example.com" },
{ userId: "user-5", name: "Eve", email: "eve@example.com" },
]);
// Run the stream with retry sink
const program = Effect.gen(function* () {
const result = yield* userStream.pipe(Stream.run(createRetrySink(defaultRetryConfig)));
console.log(`\nProcessing complete.`);
});
Effect.runPromise(program);This pattern:
- Attempts operation up to max retries
- Distinguishes transient vs. permanent failures
- Uses exponential backoff to space retries
- Adds jitter to prevent thundering herd
- Tracks detailed stats for monitoring
- Reports summary of outcomes
Use Effect's built-in retry and schedule combinators:
const createEffectRetrySink = (
maxRetries: number = 5
): Sink.Sink<OperationResult, never, UserRecord> =>
Sink.fold(
{
succeeded: 0,
transientFailures: 0,
permanentFailures: 0,
detailedStats: [],
},
(state, user) =>
Effect.gen(function* () {
const result = yield* database
.insertUser(user)
.pipe(
// Retry with exponential backoff
Effect.retry(
Schedule.exponential("100 millis").pipe(
Schedule.addDelay((attempt) =>
Duration.millis(Math.random() * 10) // Add jitter
),
Schedule.compose(
Schedule.recurs(maxRetries),
Schedule.resetAfter(Duration.seconds(30)) // Reset backoff after 30s
)
)
),
// Handle permanent failures
Effect.catchAll((error: WriteError) =>
Effect.gen(function* () {
if (!error.isTransient) {
return "permanent-failed" as const;
}
return "transient-failed" as const;
})
)
);
if (result === "success") {
return {
...state,
succeeded: state.succeeded + 1,
};
} else if (result === "permanent-failed") {
return {
...state,
permanentFailures: state.permanentFailures + 1,
};
} else {
return {
...state,
transientFailures: state.transientFailures + 1,
};
}
}),
(state) => Effect.succeed(state)
);Adjust retry strategy based on success rate:
interface AdaptiveRetryConfig {
readonly initialMaxRetries: number;
readonly minMaxRetries: number;
readonly maxMaxRetries: number;
readonly successThreshold: number; // Increase retries if success rate below this
}
const createAdaptiveRetrySink = (
initialConfig: AdaptiveRetryConfig
): Sink.Sink<OperationResult, never, UserRecord> =>
Sink.fold(
{
succeeded: 0,
transientFailures: 0,
permanentFailures: 0,
detailedStats: [],
currentMaxRetries: initialConfig.initialMaxRetries,
recentSuccessRate: 1.0,
},
(state, user) =>
Effect.gen(function* () {
// Adjust max retries based on recent success rate
let maxRetries = state.currentMaxRetries;
if (state.recentSuccessRate < initialConfig.successThreshold) {
// Low success rate, increase retries
maxRetries = Math.min(
maxRetries + 1,
initialConfig.maxMaxRetries
);
console.log(
`[ADAPTIVE] Success rate ${(state.recentSuccessRate * 100).toFixed(1)}% < ${(initialConfig.successThreshold * 100).toFixed(1)}%, increasing retries to ${maxRetries}`
);
} else if (state.recentSuccessRate > initialConfig.successThreshold * 1.2) {
// High success rate, decrease retries
maxRetries = Math.max(
maxRetries - 1,
initialConfig.minMaxRetries
);
console.log(
`[ADAPTIVE] Success rate ${(state.recentSuccessRate * 100).toFixed(1)}%, decreasing retries to ${maxRetries}`
);
}
// Attempt write with current retry config
let attempts = 0;
let succeeded = false;
for (attempts = 1; attempts <= maxRetries; attempts++) {
try {
yield* database.insertUser(user);
succeeded = true;
break;
} catch (error) {
const writeError = error as WriteError;
if (!writeError.isTransient) break;
if (attempts < maxRetries) {
yield* Effect.sleep(
Duration.millis(100 * Math.pow(2, attempts - 1))
);
}
}
}
// Update success rate (exponential moving average)
const newSuccessRate =
state.recentSuccessRate * 0.7 + (succeeded ? 1 : 0) * 0.3;
return {
succeeded: state.succeeded + (succeeded ? 1 : 0),
transientFailures:
state.transientFailures + (succeeded ? 0 : 1),
permanentFailures: state.permanentFailures,
detailedStats: [
...state.detailedStats,
{
userId: user.userId,
attempts,
status: succeeded ? "success" : "transient-failed",
},
],
currentMaxRetries: maxRetries,
recentSuccessRate: newSuccessRate,
};
}),
(state) => Effect.succeed(state)
);✅ Use retry logic when:
- Failures are often transient (network, temporary unavailability)
- Can distinguish between transient and permanent failures
- Want automatic recovery without manual intervention
- Need to handle rate limiting gracefully
- Building fault-tolerant systems
- Retries increase latency (each failed attempt adds delay)
- May exacerbate problems if overused (retry storm)
- Requires careful tuning of backoff parameters
- Some failures can't be recovered by retry
- Handle Flaky Operations with Retry & Timeout - Retry fundamentals
- Sink Pattern 5: Fall Back - Fallback patterns
- Process Streaming Data with Stream - Stream fundamentals
- Control Repetition with Schedule - Schedule patterns