Skip to content

Latest commit

 

History

History
474 lines (406 loc) · 14.3 KB

File metadata and controls

474 lines (406 loc) · 14.3 KB
title Sink Pattern 6: Retry Failed Stream Operations
id sink-pattern-retry-failed-stream-operations
skillLevel intermediate
applicationPatternId streams-sinks
summary Use Sink with configurable retry policies to automatically retry failed operations with exponential backoff, enabling recovery from transient failures without losing data.
tags
sink
stream
error-handling
retry
exponential-backoff
resilience
rule
description
Implement retry strategies in sinks to handle transient failures and improve resilience without manual intervention.
related
process-streaming-data-with-stream
handle-flaky-operations-with-retry-timeout
sink-pattern-fall-back-to-alternative-sink-on-failure
author effect_website
lessonOrder 6

Guideline

When consuming a stream to a destination that may experience transient failures (network timeouts, rate limiting, temporary unavailability), wrap the sink operation with a retry policy. Use exponential backoff to avoid overwhelming a recovering system while still recovering quickly.


Rationale

Transient failures are common in distributed systems:

  • Network timeouts: Temporary connectivity issues resolve themselves
  • Rate limiting: Service recovers once rate limit window resets
  • Temporary unavailability: Services restart or scale up
  • Circuit breaker trips: Service recovers after backoff period

Without retry logic:

  • Every transient failure causes data loss or stream interruption
  • Manual intervention required to restart
  • System appears less reliable than it actually is

With intelligent retry logic:

  • Automatic recovery from transient failures
  • Exponential backoff prevents thundering herd
  • Clear visibility into which operations failed permanently
  • Data flows continuously despite temporary issues

Good Example

This example demonstrates retrying database writes with exponential backoff, tracking attempts, and falling back on permanent failures.

import { Effect, Stream, Sink, Chunk, Duration, Schedule } from "effect";

interface UserRecord {
  readonly userId: string;
  readonly name: string;
  readonly email: string;
}

class WriteError extends Error {
  readonly isTransient: boolean;

  constructor(message: string, isTransient: boolean = true) {
    super(message);
    this.name = "WriteError";
    this.isTransient = isTransient;
  }
}

// Mock database that occasionally fails
const database = {
  failureRate: 0.3, // 30% transient failure rate
  permanentFailureRate: 0.05, // 5% permanent failure rate

  insertUser: (user: UserRecord): Effect.Effect<void, WriteError> =>
    Effect.gen(function* () {
      const rand = Math.random();

      // Permanent failure (e.g., constraint violation)
      if (rand < database.permanentFailureRate) {
        throw new WriteError(
          `Permanent: User ${user.userId} already exists`,
          false
        );
      }

      // Transient failure (e.g., connection timeout)
      if (rand < database.permanentFailureRate + database.failureRate) {
        throw new WriteError(
          `Transient: Connection timeout writing ${user.userId}`,
          true
        );
      }

      // Success
      console.log(`✓ Wrote user ${user.userId}`);
    }),
};

// Retry configuration
interface RetryConfig {
  readonly maxAttempts: number;
  readonly initialDelayMs: number;
  readonly maxDelayMs: number;
  readonly backoffFactor: number;
}

const defaultRetryConfig: RetryConfig = {
  maxAttempts: 5,
  initialDelayMs: 100, // Start with 100ms
  maxDelayMs: 5000, // Cap at 5 seconds
  backoffFactor: 2, // Double each time
};

// Result tracking
interface OperationResult {
  readonly succeeded: number;
  readonly transientFailures: number;
  readonly permanentFailures: number;
  readonly detailedStats: Array<{
    readonly userId: string;
    readonly attempts: number;
    readonly status: "success" | "transient-failed" | "permanent-failed";
  }>;
}

// Create a sink with retry logic
const createRetrySink = (config: RetryConfig): Sink.Sink<OperationResult, never, UserRecord> =>
  Sink.fold(
    {
      succeeded: 0,
      transientFailures: 0,
      permanentFailures: 0,
      detailedStats: [],
    },
    (state, user) =>
      Effect.gen(function* () {
        let lastError: WriteError | null = null;
        let attempts = 0;

        // Retry loop
        for (attempts = 1; attempts <= config.maxAttempts; attempts++) {
          try {
            yield* database.insertUser(user);

            // Success!
            console.log(
              `[${user.userId}] Success on attempt ${attempts}/${config.maxAttempts}`
            );

            return {
              ...state,
              succeeded: state.succeeded + 1,
              detailedStats: [
                ...state.detailedStats,
                {
                  userId: user.userId,
                  attempts,
                  status: "success",
                },
              ],
            };
          } catch (error) {
            lastError = error as WriteError;

            if (!lastError.isTransient) {
              // Permanent failure, don't retry
              console.log(
                `[${user.userId}] Permanent failure: ${lastError.message}`
              );

              return {
                ...state,
                permanentFailures: state.permanentFailures + 1,
                detailedStats: [
                  ...state.detailedStats,
                  {
                    userId: user.userId,
                    attempts,
                    status: "permanent-failed",
                  },
                ],
              };
            }

            // Transient failure, retry if attempts remain
            if (attempts < config.maxAttempts) {
              // Calculate delay with exponential backoff
              let delayMs = config.initialDelayMs * Math.pow(config.backoffFactor, attempts - 1);
              delayMs = Math.min(delayMs, config.maxDelayMs);

              // Add jitter (±10%)
              const jitter = delayMs * 0.1;
              delayMs = delayMs + (Math.random() - 0.5) * 2 * jitter;

              console.log(
                `[${user.userId}] Transient failure (attempt ${attempts}/${config.maxAttempts}): ${lastError.message}`
              );
              console.log(`  Retrying in ${Math.round(delayMs)}ms...`);

              yield* Effect.sleep(Duration.millis(Math.round(delayMs)));
            }
          }
        }

        // All retries exhausted
        console.log(
          `[${user.userId}] Failed after ${config.maxAttempts} attempts`
        );

        return {
          ...state,
          transientFailures: state.transientFailures + 1,
          detailedStats: [
            ...state.detailedStats,
            {
              userId: user.userId,
              attempts: config.maxAttempts,
              status: "transient-failed",
            },
          ],
        };
      }),
    (state) =>
      Effect.gen(function* () {
        console.log(`\n[SUMMARY]`);
        console.log(`  Succeeded:           ${state.succeeded}`);
        console.log(`  Transient Failures:  ${state.transientFailures}`);
        console.log(`  Permanent Failures:  ${state.permanentFailures}`);
        console.log(`  Total:               ${state.detailedStats.length}`);

        // Show detailed stats
        const failed = state.detailedStats.filter((s) => s.status !== "success");
        if (failed.length > 0) {
          console.log(`\n[FAILURES]`);
          failed.forEach((stat) => {
            console.log(
              `  ${stat.userId}: ${stat.attempts} attempts (${stat.status})`
            );
          });
        }

        return state;
      })
  );

// Simulate a stream of users to insert
const userStream: Stream.Stream<UserRecord> = Stream.fromIterable([
  { userId: "user-1", name: "Alice", email: "alice@example.com" },
  { userId: "user-2", name: "Bob", email: "bob@example.com" },
  { userId: "user-3", name: "Charlie", email: "charlie@example.com" },
  { userId: "user-4", name: "Diana", email: "diana@example.com" },
  { userId: "user-5", name: "Eve", email: "eve@example.com" },
]);

// Run the stream with retry sink
const program = Effect.gen(function* () {
  const result = yield* userStream.pipe(Stream.run(createRetrySink(defaultRetryConfig)));
  console.log(`\nProcessing complete.`);
});

Effect.runPromise(program);

This pattern:

  1. Attempts operation up to max retries
  2. Distinguishes transient vs. permanent failures
  3. Uses exponential backoff to space retries
  4. Adds jitter to prevent thundering herd
  5. Tracks detailed stats for monitoring
  6. Reports summary of outcomes

Advanced: Effect-Based Retry Schedules

Use Effect's built-in retry and schedule combinators:

const createEffectRetrySink = (
  maxRetries: number = 5
): Sink.Sink<OperationResult, never, UserRecord> =>
  Sink.fold(
    {
      succeeded: 0,
      transientFailures: 0,
      permanentFailures: 0,
      detailedStats: [],
    },
    (state, user) =>
      Effect.gen(function* () {
        const result = yield* database
          .insertUser(user)
          .pipe(
            // Retry with exponential backoff
            Effect.retry(
              Schedule.exponential("100 millis").pipe(
                Schedule.addDelay((attempt) =>
                  Duration.millis(Math.random() * 10) // Add jitter
                ),
                Schedule.compose(
                  Schedule.recurs(maxRetries),
                  Schedule.resetAfter(Duration.seconds(30)) // Reset backoff after 30s
                )
              )
            ),
            // Handle permanent failures
            Effect.catchAll((error: WriteError) =>
              Effect.gen(function* () {
                if (!error.isTransient) {
                  return "permanent-failed" as const;
                }
                return "transient-failed" as const;
              })
            )
          );

        if (result === "success") {
          return {
            ...state,
            succeeded: state.succeeded + 1,
          };
        } else if (result === "permanent-failed") {
          return {
            ...state,
            permanentFailures: state.permanentFailures + 1,
          };
        } else {
          return {
            ...state,
            transientFailures: state.transientFailures + 1,
          };
        }
      }),
    (state) => Effect.succeed(state)
  );

Advanced: Adaptive Retry with Metrics

Adjust retry strategy based on success rate:

interface AdaptiveRetryConfig {
  readonly initialMaxRetries: number;
  readonly minMaxRetries: number;
  readonly maxMaxRetries: number;
  readonly successThreshold: number; // Increase retries if success rate below this
}

const createAdaptiveRetrySink = (
  initialConfig: AdaptiveRetryConfig
): Sink.Sink<OperationResult, never, UserRecord> =>
  Sink.fold(
    {
      succeeded: 0,
      transientFailures: 0,
      permanentFailures: 0,
      detailedStats: [],
      currentMaxRetries: initialConfig.initialMaxRetries,
      recentSuccessRate: 1.0,
    },
    (state, user) =>
      Effect.gen(function* () {
        // Adjust max retries based on recent success rate
        let maxRetries = state.currentMaxRetries;

        if (state.recentSuccessRate < initialConfig.successThreshold) {
          // Low success rate, increase retries
          maxRetries = Math.min(
            maxRetries + 1,
            initialConfig.maxMaxRetries
          );
          console.log(
            `[ADAPTIVE] Success rate ${(state.recentSuccessRate * 100).toFixed(1)}% < ${(initialConfig.successThreshold * 100).toFixed(1)}%, increasing retries to ${maxRetries}`
          );
        } else if (state.recentSuccessRate > initialConfig.successThreshold * 1.2) {
          // High success rate, decrease retries
          maxRetries = Math.max(
            maxRetries - 1,
            initialConfig.minMaxRetries
          );
          console.log(
            `[ADAPTIVE] Success rate ${(state.recentSuccessRate * 100).toFixed(1)}%, decreasing retries to ${maxRetries}`
          );
        }

        // Attempt write with current retry config
        let attempts = 0;
        let succeeded = false;

        for (attempts = 1; attempts <= maxRetries; attempts++) {
          try {
            yield* database.insertUser(user);
            succeeded = true;
            break;
          } catch (error) {
            const writeError = error as WriteError;
            if (!writeError.isTransient) break;
            if (attempts < maxRetries) {
              yield* Effect.sleep(
                Duration.millis(100 * Math.pow(2, attempts - 1))
              );
            }
          }
        }

        // Update success rate (exponential moving average)
        const newSuccessRate =
          state.recentSuccessRate * 0.7 + (succeeded ? 1 : 0) * 0.3;

        return {
          succeeded: state.succeeded + (succeeded ? 1 : 0),
          transientFailures:
            state.transientFailures + (succeeded ? 0 : 1),
          permanentFailures: state.permanentFailures,
          detailedStats: [
            ...state.detailedStats,
            {
              userId: user.userId,
              attempts,
              status: succeeded ? "success" : "transient-failed",
            },
          ],
          currentMaxRetries: maxRetries,
          recentSuccessRate: newSuccessRate,
        };
      }),
    (state) => Effect.succeed(state)
  );

When to Use This Pattern

Use retry logic when:

  • Failures are often transient (network, temporary unavailability)
  • Can distinguish between transient and permanent failures
  • Want automatic recovery without manual intervention
  • Need to handle rate limiting gracefully
  • Building fault-tolerant systems

⚠️ Trade-offs:

  • Retries increase latency (each failed attempt adds delay)
  • May exacerbate problems if overused (retry storm)
  • Requires careful tuning of backoff parameters
  • Some failures can't be recovered by retry

See Also