From 488a1aabb060a1927f728a4aa2f9fd0fd42d7853 Mon Sep 17 00:00:00 2001 From: JY Tan Date: Fri, 10 Apr 2026 18:30:28 -0700 Subject: [PATCH] Commit --- docs/environment-variables.md | 15 +- docs/initialization.md | 88 ++++- docs/nextjs-initialization.md | 86 ++++- docs/quickstart.md | 2 +- docs/troubleshooting.md | 6 +- src/core/TuskDrift.test.ts | 68 ++++ src/core/TuskDrift.ts | 319 +++++++++++++++--- .../AdaptiveSamplingController.test.ts | 64 ++++ .../sampling/AdaptiveSamplingController.ts | 283 ++++++++++++++++ .../tracing/DriftBatchSpanProcessor.test.ts | 68 ++++ src/core/tracing/DriftBatchSpanProcessor.ts | 152 +++++++++ src/core/tracing/TdSpanExporter.ts | 44 ++- src/core/tracing/adapters/ApiSpanAdapter.ts | 193 +++++++---- src/core/tracing/adapters/resilience.ts | 100 ++++++ src/core/utils/configUtils.ts | 5 + .../http/HttpInstrumentation.test.ts | 45 +++ .../libraries/http/Instrumentation.ts | 51 +-- .../libraries/nextjs/Instrumentation.ts | 16 +- 18 files changed, 1424 insertions(+), 181 deletions(-) create mode 100644 src/core/TuskDrift.test.ts create mode 100644 src/core/sampling/AdaptiveSamplingController.test.ts create mode 100644 src/core/sampling/AdaptiveSamplingController.ts create mode 100644 src/core/tracing/DriftBatchSpanProcessor.test.ts create mode 100644 src/core/tracing/DriftBatchSpanProcessor.ts create mode 100644 src/core/tracing/adapters/resilience.ts diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 9dd1e58a..d93b3c58 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -110,24 +110,29 @@ Your Tusk Drift API key, required when using Tusk Cloud for storing and managing This will securely store your auth key for future replay sessions. -## TUSK_SAMPLING_RATE +## TUSK_RECORDING_SAMPLING_RATE Controls what percentage of requests are recorded during trace collection. - **Type:** Number between 0.0 and 1.0 -- **Default:** 1.0 (100% of requests) -- **Precedence:** This environment variable is overridden by the `samplingRate` parameter in `TuskDrift.initialize()`, but takes precedence over the `sampling_rate` setting in `.tusk/config.yaml` +- **If unset:** Falls back to `.tusk/config.yaml` and then the default base rate of `1.0` +- **Precedence:** This environment variable is overridden by the `samplingRate` parameter in `TuskDrift.initialize()`, but takes precedence over `recording.sampling.base_rate` and the legacy `recording.sampling_rate` setting in `.tusk/config.yaml` +- **Scope:** This only overrides the base rate. It does not change `recording.sampling.mode` or `recording.sampling.min_rate` **Examples:** ```bash # Record all requests (100%) -TUSK_SAMPLING_RATE=1.0 npm start +TUSK_RECORDING_SAMPLING_RATE=1.0 npm start # Record 10% of requests -TUSK_SAMPLING_RATE=0.1 npm start +TUSK_RECORDING_SAMPLING_RATE=0.1 npm start ``` +`TUSK_SAMPLING_RATE` is still accepted as a backward-compatible alias, but `TUSK_RECORDING_SAMPLING_RATE` is the canonical variable going forward. + +If `recording.sampling.mode: adaptive` is enabled in `.tusk/config.yaml`, this environment variable still only changes the base rate; adaptive load shedding remains active. + For more details on sampling rate configuration methods and precedence, see the [Initialization Guide](./initialization.md#3-configure-sampling-rate). ## TUSK_USE_RUST_CORE diff --git a/docs/initialization.md b/docs/initialization.md index 2f63e6be..160c81b2 100644 --- a/docs/initialization.md +++ b/docs/initialization.md @@ -71,7 +71,7 @@ export { TuskDrift }; samplingRate number 1.0 - Override sampling rate (0.0 - 1.0) for recording. Takes precedence over TUSK_SAMPLING_RATE env var and config file. + Override the base sampling rate (0.0 - 1.0) for recording. Takes precedence over TUSK_RECORDING_SAMPLING_RATE and config file base-rate settings. Does not change recording.sampling.mode. registerEsmLoaderHooks @@ -137,17 +137,31 @@ For ESM applications, you **cannot** control import order within your applicatio ### 3. Configure Sampling Rate -The sampling rate determines what percentage of requests are recorded during replay tests. Tusk Drift supports three ways to configure the sampling rate, with the following precedence (highest to lowest): +Sampling controls what percentage of inbound requests are recorded in `RECORD` mode. -1. **Init Parameter** -2. **Environment Variable** (`TUSK_SAMPLING_RATE`) -3. **Configuration File** (`.tusk/config.yaml`) +Tusk Drift supports two sampling modes in `.tusk/config.yaml`: -If not specified, the default sampling rate is `1.0` (100%). +- `fixed`: record requests at a constant base rate. +- `adaptive`: start from a base rate and automatically shed load when queue pressure, export failures, export timeouts, event loop lag, or memory pressure indicate the SDK should back off. In severe conditions the SDK can temporarily pause recording entirely. -#### Method 1: Init Parameter (Programmatic Override) +Sampling configuration is resolved in two layers: -Set the sampling rate directly in your initialization code: +1. **Base rate precedence** (highest to lowest): + - `TuskDrift.initialize({ samplingRate: ... })` + - `TUSK_RECORDING_SAMPLING_RATE` + - legacy alias `TUSK_SAMPLING_RATE` + - `.tusk/config.yaml` `recording.sampling.base_rate` + - `.tusk/config.yaml` legacy `recording.sampling_rate` + - default base rate `1.0` +2. **Mode and minimum rate**: + - `recording.sampling.mode` comes from `.tusk/config.yaml` and defaults to `fixed` + - `recording.sampling.min_rate` is only used in `adaptive` mode and defaults to `0.001` when omitted + +> **Note:** Requests before `TuskDrift.markAppAsReady()` are always recorded. Sampling applies to normal inbound traffic after startup. + +#### Method 1: Init Parameter (Programmatic Base-Rate Override) + +Set the base sampling rate directly in your initialization code: ```typescript TuskDrift.initialize({ @@ -159,29 +173,53 @@ TuskDrift.initialize({ #### Method 2: Environment Variable -Set the `TUSK_SAMPLING_RATE` environment variable: +Set the `TUSK_RECORDING_SAMPLING_RATE` environment variable to override the base sampling rate: ```bash # Development - record everything -TUSK_SAMPLING_RATE=1.0 npm run dev +TUSK_RECORDING_SAMPLING_RATE=1.0 npm run dev # Production - sample 10% of requests -TUSK_SAMPLING_RATE=0.1 npm start +TUSK_RECORDING_SAMPLING_RATE=0.1 npm start ``` +`TUSK_SAMPLING_RATE` is still supported as a backward-compatible alias, but new setups should prefer `TUSK_RECORDING_SAMPLING_RATE`. + #### Method 3: Configuration File -Update the configuration file `.tusk/config.yaml` to include a `recording` section. Example `recording` configuration: +Use the nested `recording.sampling` config to choose `fixed` vs `adaptive` mode and set the base/minimum rates. ```yaml # ... existing configuration ... recording: - sampling_rate: 0.1 + sampling: + mode: fixed + base_rate: 0.1 export_spans: true enable_env_var_recording: true ``` +**Adaptive sampling example:** + +```yaml +# ... existing configuration ... + +recording: + sampling: + mode: adaptive + base_rate: 0.25 + min_rate: 0.01 + export_spans: true +``` + +**Legacy config still supported:** + +```yaml +recording: + sampling_rate: 0.1 +``` + #### Additional Recording Configuration Options @@ -195,10 +233,28 @@ recording: - + + + + + + + - + + + + + + + + + + + + + @@ -249,7 +305,7 @@ In rare cases, the wrapping can cause issues with instrumented packages: If you encounter errors like: -``` +```text SyntaxError: The requested module '...' does not provide an export named '...' (node:1234) Error: 'import-in-the-middle' failed to wrap 'file://../../path/to/file.js' ``` diff --git a/docs/nextjs-initialization.md b/docs/nextjs-initialization.md index 02ddfb77..ab8d4c16 100644 --- a/docs/nextjs-initialization.md +++ b/docs/nextjs-initialization.md @@ -166,7 +166,7 @@ More context on setting up instrumentations for Next.js apps can be found [here] - +
sampling_ratesampling.mode"fixed" | "adaptive""fixed"Selects constant sampling or adaptive load shedding.
sampling.base_rate number 1.0The sampling rate (0.0 - 1.0). 1.0 means 100% of requests are recorded, 0.0 means 0% of requests are recorded.The base sampling rate (0.0 - 1.0). This is the preferred config key and can be overridden by TUSK_RECORDING_SAMPLING_RATE or the samplingRate init parameter.
sampling.min_ratenumber0.001 in adaptive modeThe minimum steady-state sampling rate for adaptive mode. In critical conditions the SDK can still temporarily pause recording.
sampling_ratenumberNoneLegacy fallback for the base sampling rate. Still supported for backward compatibility, but recording.sampling.base_rate is preferred.
export_spanssamplingRate number 1.0Override sampling rate (0.0 - 1.0) for recording. Takes precedence over TUSK_SAMPLING_RATE env var and config file.Override the base sampling rate (0.0 - 1.0) for recording. Takes precedence over TUSK_RECORDING_SAMPLING_RATE and config file base-rate settings. Does not change recording.sampling.mode.
@@ -184,17 +184,31 @@ More context on setting up instrumentations for Next.js apps can be found [here] ## Step 3: Configure Sampling Rate -The sampling rate determines what percentage of requests are recorded during replay tests. Tusk Drift supports three ways to configure the sampling rate, with the following precedence (highest to lowest): +Sampling controls what percentage of inbound requests are recorded in `RECORD` mode. -1. **Init Parameter** -2. **Environment Variable** (`TUSK_SAMPLING_RATE`) -3. **Configuration File** (`.tusk/config.yaml`) +Tusk Drift supports two sampling modes in `.tusk/config.yaml`: -If not specified, the default sampling rate is `1.0` (100%). +- `fixed`: record requests at a constant base rate. +- `adaptive`: start from a base rate and automatically shed load when queue pressure, export failures, export timeouts, event loop lag, or memory pressure indicate the SDK should back off. In severe conditions the SDK can temporarily pause recording entirely. + +Sampling configuration is resolved in two layers: + +1. **Base rate precedence** (highest to lowest): + - `TuskDrift.initialize({ samplingRate: ... })` + - `TUSK_RECORDING_SAMPLING_RATE` + - legacy alias `TUSK_SAMPLING_RATE` + - `.tusk/config.yaml` `recording.sampling.base_rate` + - `.tusk/config.yaml` legacy `recording.sampling_rate` + - default base rate `1.0` +2. **Mode and minimum rate**: + - `recording.sampling.mode` comes from `.tusk/config.yaml` and defaults to `fixed` + - `recording.sampling.min_rate` is only used in `adaptive` mode and defaults to `0.001` when omitted + +> **Note:** Requests before `TuskDrift.markAppAsReady()` are always recorded. Sampling applies to normal inbound traffic after startup. ### Method 1: Init Parameter -Set the sampling rate directly in your initialization code: +Set the base sampling rate directly in your initialization code: ```typescript // instrumentation.ts @@ -215,29 +229,53 @@ export async function register() { ### Method 2: Environment Variable -Set the `TUSK_SAMPLING_RATE` environment variable: +Set the `TUSK_RECORDING_SAMPLING_RATE` environment variable to override the base sampling rate: ```bash # Development - record everything -TUSK_SAMPLING_RATE=1.0 npm run dev +TUSK_RECORDING_SAMPLING_RATE=1.0 npm run dev # Production - sample 10% of requests -TUSK_SAMPLING_RATE=0.1 npm start +TUSK_RECORDING_SAMPLING_RATE=0.1 npm start ``` +`TUSK_SAMPLING_RATE` is still supported as a backward-compatible alias, but new setups should prefer `TUSK_RECORDING_SAMPLING_RATE`. + ### Method 3: Configuration File -Update the `.tusk/config.yaml` file in your project root to include recording configuration: +Use the nested `recording.sampling` config to choose `fixed` vs `adaptive` mode and set the base/minimum rates. ```yaml # ... existing configuration ... recording: - sampling_rate: 0.1 + sampling: + mode: fixed + base_rate: 0.1 export_spans: true enable_env_var_recording: true ``` +**Adaptive sampling example:** + +```yaml +# ... existing configuration ... + +recording: + sampling: + mode: adaptive + base_rate: 0.25 + min_rate: 0.01 + export_spans: true +``` + +**Legacy config still supported:** + +```yaml +recording: + sampling_rate: 0.1 +``` + ### Additional Recording Configuration Options @@ -251,10 +289,28 @@ recording: - + + + + + + + - + + + + + + + + + + + + + @@ -270,7 +326,7 @@ recording:
sampling_ratesampling.mode"fixed" | "adaptive""fixed"Selects constant sampling or adaptive load shedding.
sampling.base_rate number 1.0The sampling rate (0.0 - 1.0). 1.0 means 100% of requests are recorded, 0.0 means no requests are recorded.The base sampling rate (0.0 - 1.0). This is the preferred config key and can be overridden by TUSK_RECORDING_SAMPLING_RATE or the samplingRate init parameter.
sampling.min_ratenumber0.001 in adaptive modeThe minimum steady-state sampling rate for adaptive mode. In critical conditions the SDK can still temporarily pause recording.
sampling_ratenumberNoneLegacy fallback for the base sampling rate. Still supported for backward compatibility, but recording.sampling.base_rate is preferred.
export_spans
- + ## Troubleshooting ### Instrumentation not working diff --git a/docs/quickstart.md b/docs/quickstart.md index 305ec48f..c8aa01a4 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -4,7 +4,7 @@ Let's walk through recording and replaying your first trace: ## Step 1: Set sampling rate to 1.0 -Set the `sampling_rate` in `.tusk/config.yaml` to 1.0 to ensure that all requests are recorded. +Set `recording.sampling.mode: fixed` and `recording.sampling.base_rate: 1.0` in `.tusk/config.yaml` to ensure that all requests are recorded. ## Step 2: Start server in record mode diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md index 4feb97cb..af571f37 100644 --- a/docs/troubleshooting.md +++ b/docs/troubleshooting.md @@ -6,8 +6,10 @@ 1. **Check sampling rate**: Verify your sampling rate configuration is set to 1.0 for 100% recording. The SDK checks in this order: - `samplingRate` in `TuskDrift.initialize()` (highest priority) - - `TUSK_SAMPLING_RATE` environment variable - - `sampling_rate` in `.tusk/config.yaml` + - `TUSK_RECORDING_SAMPLING_RATE` environment variable + - legacy alias `TUSK_SAMPLING_RATE` + - `recording.sampling.base_rate` in `.tusk/config.yaml` + - legacy `recording.sampling_rate` in `.tusk/config.yaml` - Default: 1.0 2. **Verify app readiness**: Make sure you're calling `TuskDrift.markAppAsReady()` 3. **Use debug mode in SDK**: Add `logLevel: 'debug'` to the initialization parameters diff --git a/src/core/TuskDrift.test.ts b/src/core/TuskDrift.test.ts new file mode 100644 index 00000000..96d473de --- /dev/null +++ b/src/core/TuskDrift.test.ts @@ -0,0 +1,68 @@ +import test from "ava"; +import type { ExecutionContext } from "ava"; + +import { TuskDriftCore } from "./TuskDrift"; +import { OriginalGlobalUtils } from "./utils"; + +type EnvVars = Record; +type SamplingConfigResult = { + baseRate: number; + minRate: number; + mode: "fixed" | "adaptive"; +}; +type TestableTuskDrift = { + config: unknown; + determineSamplingConfig(initParams: { samplingRate?: number }): SamplingConfigResult; +}; +type OriginalGlobalUtilsOverride = { + getOriginalProcessEnvVar(key: string): string | undefined; +}; + +function createTestDrift(t: ExecutionContext, envVars: EnvVars): TestableTuskDrift { + const patchedGlobalUtils = OriginalGlobalUtils as unknown as OriginalGlobalUtilsOverride; + const originalGetEnvVar = patchedGlobalUtils.getOriginalProcessEnvVar; + t.teardown(() => { + patchedGlobalUtils.getOriginalProcessEnvVar = originalGetEnvVar; + }); + + patchedGlobalUtils.getOriginalProcessEnvVar = (key: string) => envVars[key]; + + const drift = new TuskDriftCore() as unknown as TestableTuskDrift; + drift.config = {}; + return drift; +} + +test("prefers TUSK_RECORDING_SAMPLING_RATE over the legacy alias", (t) => { + const drift = createTestDrift(t, { + TUSK_DRIFT_MODE: "DISABLED", + TUSK_RECORDING_SAMPLING_RATE: "0.25", + TUSK_SAMPLING_RATE: "0.1", + }); + + const samplingConfig = drift.determineSamplingConfig({}); + + t.is(samplingConfig.baseRate, 0.25); +}); + +test("falls back to TUSK_SAMPLING_RATE when the canonical env var is unset", (t) => { + const drift = createTestDrift(t, { + TUSK_DRIFT_MODE: "DISABLED", + TUSK_SAMPLING_RATE: "0.2", + }); + + const samplingConfig = drift.determineSamplingConfig({}); + + t.is(samplingConfig.baseRate, 0.2); +}); + +test("falls back to the legacy alias when TUSK_RECORDING_SAMPLING_RATE is invalid", (t) => { + const drift = createTestDrift(t, { + TUSK_DRIFT_MODE: "DISABLED", + TUSK_RECORDING_SAMPLING_RATE: "invalid", + TUSK_SAMPLING_RATE: "0.4", + }); + + const samplingConfig = drift.determineSamplingConfig({}); + + t.is(samplingConfig.baseRate, 0.4); +}); diff --git a/src/core/TuskDrift.ts b/src/core/TuskDrift.ts index 16d6b12f..614e2b48 100644 --- a/src/core/TuskDrift.ts +++ b/src/core/TuskDrift.ts @@ -24,11 +24,10 @@ import { MongodbInstrumentation, } from "../instrumentation/libraries"; import { TdSpanExporter } from "./tracing/TdSpanExporter"; -import { trace, Tracer, SpanKind, SpanStatusCode } from "@opentelemetry/api"; +import { context, trace, Tracer, SpanKind, SpanStatusCode } from "@opentelemetry/api"; import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node"; import { ProtobufCommunicator, MockRequestInput, MockResponseOutput } from "./ProtobufCommunicator"; -import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-node"; -import { CleanSpanData, TD_INSTRUMENTATION_LIBRARY_NAME } from "./types"; +import { CleanSpanData, TD_INSTRUMENTATION_LIBRARY_NAME, STOP_RECORDING_CHILD_SPANS_CONTEXT_KEY } from "./types"; import { TuskDriftInstrumentationModuleNames } from "./TuskDriftInstrumentationModuleNames"; import { SDK_VERSION } from "../version"; import { SpanUtils } from "./tracing/SpanUtils"; @@ -47,6 +46,14 @@ import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions"; import { Resource } from "@opentelemetry/resources"; import { getRustCoreStartupStatus } from "./rustCoreBinding"; import { initializeEsmLoader } from "./esmLoader"; +import { monitorEventLoopDelay } from "perf_hooks"; +import type { IntervalHistogram } from "perf_hooks"; +import { + AdaptiveSamplingController, + RootSamplingDecision, + SamplingMode, +} from "./sampling/AdaptiveSamplingController"; +import { DriftBatchSpanProcessor } from "./tracing/DriftBatchSpanProcessor"; export interface InitParams { apiKey?: string; @@ -73,14 +80,20 @@ export class TuskDriftCore { private initialized = false; private appReady = false; private mode: TuskDriftMode; - private initParams: InitParams; + private initParams: InitParams = {}; private config: TuskConfig; private communicator?: ProtobufCommunicator | undefined; - private samplingRate: number; - private cliConnectionPromise: Promise | null; - // Add a flag to track connection status + private samplingRate = 1; + private samplingMode: SamplingMode = "fixed"; + private minSamplingRate = 0; + private adaptiveSamplingController?: AdaptiveSamplingController; + private adaptiveSamplingInterval: NodeJS.Timeout | null = null; + private eventLoopDelayHistogram: IntervalHistogram | null = null; + private effectiveMemoryLimitBytes: number | null = null; + private cliConnectionPromise: Promise | null = null; private isConnectedWithCLI = false; spanExporter?: TdSpanExporter; + private driftBatchSpanProcessor?: DriftBatchSpanProcessor; constructor() { this.mode = this.detectMode(); @@ -190,7 +203,7 @@ export class TuskDriftCore { const exportSpans = this.config.recording?.export_spans || false; logger.info( - `SDK initialized successfully (version=${SDK_VERSION}, mode=${this.mode}, env=${environment}, service=${serviceName}, serviceId=${serviceId}, exportSpans=${exportSpans}, samplingRate=${this.samplingRate}, logLevel=${logger.getLogLevel()}, runtime=node ${process.version}, platform=${process.platform}/${process.arch}).`, + `SDK initialized successfully (version=${SDK_VERSION}, mode=${this.mode}, env=${environment}, service=${serviceName}, serviceId=${serviceId}, exportSpans=${exportSpans}, samplingMode=${this.samplingMode}, samplingBaseRate=${this.samplingRate}, samplingMinRate=${this.minSamplingRate}, logLevel=${logger.getLogLevel()}, runtime=node ${process.version}, platform=${process.platform}/${process.arch}).`, ); } @@ -208,40 +221,92 @@ export class TuskDriftCore { return true; } - private determineSamplingRate(initParams: InitParams): number { - // Precedence: InitParams > Env Var > Config YAML > Default (1.0) + private validateSamplingMode(value: string | undefined, source: string): value is SamplingMode { + if (!value) { + return false; + } + + if (value === "fixed" || value === "adaptive") { + return true; + } + + logger.warn( + `Invalid sampling mode from ${source}: ${value}. Must be 'fixed' or 'adaptive'. Ignoring.`, + ); + return false; + } + + private determineSamplingConfig(initParams: InitParams): { + mode: SamplingMode; + baseRate: number; + minRate: number; + } { + const configSampling = this.config.recording?.sampling; + + let mode: SamplingMode = "fixed"; + if (this.validateSamplingMode(configSampling?.mode, "config.yaml")) { + mode = configSampling!.mode!; + } - // 1. Check init params (highest priority) + let baseRate: number | undefined; if (initParams.samplingRate !== undefined) { if (this.validateSamplingRate(initParams.samplingRate, "init params")) { logger.debug(`Using sampling rate from init params: ${initParams.samplingRate}`); - return initParams.samplingRate; + baseRate = initParams.samplingRate; } } - // 2. Check environment variable - const envSamplingRate = OriginalGlobalUtils.getOriginalProcessEnvVar("TUSK_SAMPLING_RATE"); - if (envSamplingRate !== undefined) { - const parsed = parseFloat(envSamplingRate); - if (this.validateSamplingRate(parsed, "TUSK_SAMPLING_RATE env var")) { - logger.debug(`Using sampling rate from TUSK_SAMPLING_RATE env var: ${parsed}`); - return parsed; + if (baseRate === undefined) { + for (const envVarName of ["TUSK_RECORDING_SAMPLING_RATE", "TUSK_SAMPLING_RATE"] as const) { + const envSamplingRate = OriginalGlobalUtils.getOriginalProcessEnvVar(envVarName); + if (envSamplingRate === undefined) { + continue; + } + + const parsed = parseFloat(envSamplingRate); + if (this.validateSamplingRate(parsed, `${envVarName} env var`)) { + logger.debug(`Using sampling rate from ${envVarName} env var: ${parsed}`); + baseRate = parsed; + break; + } + } + } + + if (baseRate === undefined && configSampling?.base_rate !== undefined) { + if (this.validateSamplingRate(configSampling.base_rate, "config.yaml recording.sampling.base_rate")) { + baseRate = configSampling.base_rate; } } - // 3. Check config file - if (this.config.recording?.sampling_rate !== undefined) { - if (this.validateSamplingRate(this.config.recording.sampling_rate, "config.yaml")) { - logger.debug( - `Using sampling rate from config.yaml: ${this.config.recording.sampling_rate}`, - ); - return this.config.recording.sampling_rate; + if (baseRate === undefined && this.config.recording?.sampling_rate !== undefined) { + if (this.validateSamplingRate(this.config.recording.sampling_rate, "config.yaml recording.sampling_rate")) { + baseRate = this.config.recording.sampling_rate; } } - // 4. Default to 1.0 (100%) - logger.debug("Using default sampling rate: 1.0"); - return 1; + if (baseRate === undefined) { + logger.debug("Using default sampling rate: 1.0"); + baseRate = 1; + } + + let minRate = 0; + if (mode === "adaptive") { + if (configSampling?.min_rate !== undefined) { + if (this.validateSamplingRate(configSampling.min_rate, "config.yaml recording.sampling.min_rate")) { + minRate = configSampling.min_rate; + } + } else { + minRate = 0.001; + } + + minRate = Math.min(baseRate, minRate); + } + + return { + mode, + baseRate, + minRate, + }; } private registerDefaultInstrumentations(): void { @@ -351,6 +416,11 @@ export class TuskDriftCore { private initializeTracing({ baseDirectory }: { baseDirectory: string }): void { const serviceName = this.config.service?.name || "unknown"; + const batchProcessorConfig = { + maxQueueSize: 2048, + maxExportBatchSize: 512, + scheduledDelayMillis: 2000, + }; logger.debug(`Initializing OpenTelemetry tracing for service: ${serviceName}`); @@ -364,24 +434,20 @@ export class TuskDriftCore { environment: this.initParams.env, sdkVersion: SDK_VERSION, sdkInstanceId: this.generateSdkInstanceId(), + exportTimeoutMillis: 30000, + }); + + this.driftBatchSpanProcessor = new DriftBatchSpanProcessor({ + exporter: this.spanExporter, + config: batchProcessorConfig, + mode: this.mode, }); const tracerProvider = new NodeTracerProvider({ resource: new Resource({ [ATTR_SERVICE_NAME]: serviceName, }), - spanProcessors: [ - new BatchSpanProcessor(this.spanExporter, { - // Maximum queue size before spans are dropped, default 2048 - maxQueueSize: 2048, - // Maximum batch size per export, default 512 - maxExportBatchSize: 512, - // Interval between exports, default 5s - scheduledDelayMillis: 2000, - // Max time for export before timeout, default 30s - exportTimeoutMillis: 30000, - }), - ], + spanProcessors: [this.driftBatchSpanProcessor], }); // Register the tracer provider @@ -394,6 +460,169 @@ export class TuskDriftCore { return `sdk-${originalDate.getTime()}-${Math.random().toString(36).substr(2, 9)}`; } + private startAdaptiveSamplingController(): void { + if (this.mode !== TuskDriftMode.RECORD || this.samplingMode !== "adaptive") { + return; + } + + this.adaptiveSamplingController = new AdaptiveSamplingController({ + mode: this.samplingMode, + baseRate: this.samplingRate, + minRate: this.minSamplingRate, + }); + + this.effectiveMemoryLimitBytes = this.detectEffectiveMemoryLimitBytes(); + + this.eventLoopDelayHistogram = monitorEventLoopDelay({ + resolution: 20, + }); + this.eventLoopDelayHistogram.enable(); + + this.adaptiveSamplingInterval = setInterval(() => { + this.updateAdaptiveSamplingHealth(); + }, 2000); + this.adaptiveSamplingInterval.unref?.(); + + this.updateAdaptiveSamplingHealth(); + } + + private updateAdaptiveSamplingHealth(): void { + if (!this.adaptiveSamplingController) { + return; + } + + const batchHealth = this.driftBatchSpanProcessor?.getHealthSnapshot(); + const exporterHealth = this.spanExporter?.getHealthSnapshot(); + + const eventLoopLagP95Ms = + this.eventLoopDelayHistogram && this.eventLoopDelayHistogram.exceeds > 0 + ? this.eventLoopDelayHistogram.percentile(95) / 1_000_000 + : this.eventLoopDelayHistogram + ? this.eventLoopDelayHistogram.percentile(95) / 1_000_000 + : null; + + this.eventLoopDelayHistogram?.reset(); + + this.adaptiveSamplingController.update({ + queueFillRatio: batchHealth?.queueFillRatio ?? null, + droppedSpanCount: batchHealth?.droppedSpanCount ?? 0, + exportFailureCount: + (batchHealth?.exportFailureCount ?? 0) + (exporterHealth?.failureCount ?? 0), + exportTimeoutCount: exporterHealth?.timeoutCount ?? 0, + exportCircuitOpen: exporterHealth?.circuitOpen ?? false, + eventLoopLagP95Ms, + memoryPressureRatio: this.getMemoryPressureRatio(), + }); + } + + private stopAdaptiveSamplingController(): void { + if (this.adaptiveSamplingInterval) { + clearInterval(this.adaptiveSamplingInterval); + this.adaptiveSamplingInterval = null; + } + + if (this.eventLoopDelayHistogram) { + this.eventLoopDelayHistogram.disable(); + this.eventLoopDelayHistogram = null; + } + } + + private detectEffectiveMemoryLimitBytes(): number | null { + const candidates = [ + "/sys/fs/cgroup/memory.max", + "/sys/fs/cgroup/memory/memory.limit_in_bytes", + ]; + + for (const filePath of candidates) { + const parsed = this.readNumericControlFile(filePath); + if (parsed === null) { + continue; + } + if (parsed <= 0 || parsed > 1_000_000_000_000_000) { + continue; + } + return parsed; + } + + return null; + } + + private getMemoryPressureRatio(): number | null { + if (!this.effectiveMemoryLimitBytes || this.effectiveMemoryLimitBytes <= 0) { + return null; + } + + const cgroupCurrent = this.readNumericControlFile("/sys/fs/cgroup/memory.current"); + if (cgroupCurrent !== null) { + return cgroupCurrent / this.effectiveMemoryLimitBytes; + } + + const cgroupV1Current = this.readNumericControlFile("/sys/fs/cgroup/memory/memory.usage_in_bytes"); + if (cgroupV1Current !== null) { + return cgroupV1Current / this.effectiveMemoryLimitBytes; + } + + return process.memoryUsage().rss / this.effectiveMemoryLimitBytes; + } + + private readNumericControlFile(filePath: string): number | null { + try { + if (!fs.existsSync(filePath)) { + return null; + } + + const rawValue = fs.readFileSync(filePath, "utf8").trim(); + if (!rawValue || rawValue === "max") { + return null; + } + + const parsed = Number.parseInt(rawValue, 10); + return Number.isFinite(parsed) ? parsed : null; + } catch { + return null; + } + } + + executeWithoutRecording(fn: () => T): T { + const suppressedContext = context + .active() + .setValue(STOP_RECORDING_CHILD_SPANS_CONTEXT_KEY, true); + return context.with(suppressedContext, fn); + } + + shouldRecordRootRequest({ isPreAppStart }: { isPreAppStart: boolean }): RootSamplingDecision { + if (this.adaptiveSamplingController) { + return this.adaptiveSamplingController.getDecision({ + isPreAppStart, + }); + } + + if (isPreAppStart) { + return { + shouldRecord: true, + reason: "pre_app_start", + mode: this.samplingMode, + state: "fixed", + baseRate: this.samplingRate, + minRate: this.minSamplingRate, + effectiveRate: 1, + admissionMultiplier: 1, + }; + } + + const shouldRecord = Math.random() < this.samplingRate; + return { + shouldRecord, + reason: shouldRecord ? "sampled" : "not_sampled", + mode: this.samplingMode, + state: "fixed", + baseRate: this.samplingRate, + minRate: this.minSamplingRate, + effectiveRate: this.samplingRate, + admissionMultiplier: 1, + }; + } + /** * Creates a pre-app-start span containing a snapshot of all environment variables. * Only runs in RECORD mode when env var recording is enabled. @@ -491,7 +720,10 @@ export class TuskDriftCore { this.initParams.env = nodeEnv; } - this.samplingRate = this.determineSamplingRate(initParams); + const samplingConfig = this.determineSamplingConfig(initParams); + this.samplingMode = samplingConfig.mode; + this.samplingRate = samplingConfig.baseRate; + this.minSamplingRate = samplingConfig.minRate; // Need to have observable service id if exporting spans to Tusk backend if (this.config.recording?.export_spans && !this.config.service?.id) { @@ -576,6 +808,7 @@ export class TuskDriftCore { // Important to do this after registering instrumentations since initializeTracing lazy imports the NodeSDK from OpenTelemetry // which imports the gRPC exporter this.initializeTracing({ baseDirectory }); + this.startAdaptiveSamplingController(); // Create env vars snapshot span (only in RECORD mode with env var recording enabled) this.createEnvVarsSnapshot(); @@ -758,7 +991,7 @@ interface TuskDriftPublicAPI { * - apiKey: string - Your TuskDrift API key (required) * - env: string - The environment name (e.g., 'development', 'staging', 'production') (required) * - logLevel?: LogLevel - Optional logging level ('silent' | 'error' | 'warn' | 'info' | 'debug'), defaults to 'info' - * - samplingRate?: number - Optional sampling rate (0.0-1.0) for recording requests. Overrides TUSK_SAMPLING_RATE env var and config.yaml. Defaults to 1.0 + * - samplingRate?: number - Optional sampling rate (0.0-1.0) for recording requests. Overrides TUSK_RECORDING_SAMPLING_RATE, the legacy TUSK_SAMPLING_RATE alias, and config.yaml. Defaults to 1.0 * * @returns void - Initializes the SDK * diff --git a/src/core/sampling/AdaptiveSamplingController.test.ts b/src/core/sampling/AdaptiveSamplingController.test.ts new file mode 100644 index 00000000..9de4ff1c --- /dev/null +++ b/src/core/sampling/AdaptiveSamplingController.test.ts @@ -0,0 +1,64 @@ +import test from "ava"; +import { AdaptiveSamplingController } from "./AdaptiveSamplingController"; + +test("pre-app-start requests bypass sampling and always record", (t) => { + const controller = new AdaptiveSamplingController( + { + mode: "adaptive", + baseRate: 0, + minRate: 0, + }, + { + randomFn: () => 0.99, + nowFn: () => 0, + }, + ); + + const decision = controller.getDecision({ + isPreAppStart: true, + }); + + t.true(decision.shouldRecord); + t.is(decision.reason, "pre_app_start"); + t.is(decision.effectiveRate, 1); +}); + +test("adaptive controller sheds load and enters critical pause on drops", (t) => { + let now = 0; + const controller = new AdaptiveSamplingController( + { + mode: "adaptive", + baseRate: 0.5, + minRate: 0.1, + }, + { + randomFn: () => 0.3, + nowFn: () => now, + }, + ); + + controller.update({ + queueFillRatio: 0.9, + }); + + const loadShedDecision = controller.getDecision({ + isPreAppStart: false, + }); + t.is(loadShedDecision.state, "hot"); + t.true(loadShedDecision.effectiveRate < 0.5); + t.false(loadShedDecision.shouldRecord); + t.is(loadShedDecision.reason, "load_shed"); + + now += 1; + controller.update({ + queueFillRatio: 0.2, + droppedSpanCount: 1, + }); + + const pausedDecision = controller.getDecision({ + isPreAppStart: false, + }); + t.is(pausedDecision.state, "critical_pause"); + t.false(pausedDecision.shouldRecord); + t.is(pausedDecision.reason, "critical_pause"); +}); diff --git a/src/core/sampling/AdaptiveSamplingController.ts b/src/core/sampling/AdaptiveSamplingController.ts new file mode 100644 index 00000000..a24e3f97 --- /dev/null +++ b/src/core/sampling/AdaptiveSamplingController.ts @@ -0,0 +1,283 @@ +import { logger } from "../utils/logger"; + +export type SamplingMode = "fixed" | "adaptive"; +export type AdaptiveSamplingState = "fixed" | "healthy" | "warm" | "hot" | "critical_pause"; +export type RootSamplingDecisionReason = + | "pre_app_start" + | "sampled" + | "not_sampled" + | "load_shed" + | "critical_pause"; + +export interface ResolvedSamplingConfig { + mode: SamplingMode; + baseRate: number; + minRate: number; +} + +export interface AdaptiveSamplingHealthSnapshot { + queueFillRatio?: number | null; + droppedSpanCount?: number; + exportFailureCount?: number; + exportTimeoutCount?: number; + exportCircuitOpen?: boolean; + eventLoopLagP95Ms?: number | null; + memoryPressureRatio?: number | null; +} + +export interface RootSamplingDecision { + shouldRecord: boolean; + reason: RootSamplingDecisionReason; + mode: SamplingMode; + state: AdaptiveSamplingState; + baseRate: number; + minRate: number; + effectiveRate: number; + admissionMultiplier: number; +} + +function clamp(value: number, min: number, max: number): number { + return Math.min(max, Math.max(min, value)); +} + +function clamp01(value: number): number { + return clamp(value, 0, 1); +} + +function normalizeBetween(value: number | null | undefined, zeroPoint: number, onePoint: number): number { + if (value === null || value === undefined || Number.isNaN(value)) { + return 0; + } + if (onePoint <= zeroPoint) { + return 0; + } + return clamp01((value - zeroPoint) / (onePoint - zeroPoint)); +} + +export class AdaptiveSamplingController { + private readonly config: ResolvedSamplingConfig; + private readonly randomFn: () => number; + private readonly nowFn: () => number; + + private admissionMultiplier = 1; + private state: AdaptiveSamplingState; + private pausedUntilMs = 0; + private lastUpdatedAtMs = 0; + private lastDecreaseAtMs = 0; + + private prevDroppedSpanCount = 0; + private prevExportFailureCount = 0; + private prevExportTimeoutCount = 0; + + private queueFillEwma: number | null = null; + private recentDropSignal = 0; + private recentFailureSignal = 0; + private recentTimeoutSignal = 0; + + constructor( + config: ResolvedSamplingConfig, + { + randomFn = Math.random, + nowFn = Date.now, + }: { + randomFn?: () => number; + nowFn?: () => number; + } = {}, + ) { + this.config = config; + this.randomFn = randomFn; + this.nowFn = nowFn; + this.state = config.mode === "fixed" ? "fixed" : "healthy"; + } + + update(snapshot: AdaptiveSamplingHealthSnapshot): void { + if (this.config.mode !== "adaptive") { + this.state = "fixed"; + this.admissionMultiplier = 1; + return; + } + + const now = this.nowFn(); + const elapsedMs = this.lastUpdatedAtMs === 0 ? 2000 : Math.max(1, now - this.lastUpdatedAtMs); + this.lastUpdatedAtMs = now; + + const decay = Math.exp(-elapsedMs / 30000); + this.recentDropSignal *= decay; + this.recentFailureSignal *= decay; + this.recentTimeoutSignal *= decay; + + const droppedSpanCount = Math.max(0, snapshot.droppedSpanCount ?? 0); + const exportFailureCount = Math.max(0, snapshot.exportFailureCount ?? 0); + const exportTimeoutCount = Math.max(0, snapshot.exportTimeoutCount ?? 0); + + const droppedDelta = Math.max(0, droppedSpanCount - this.prevDroppedSpanCount); + const exportFailureDelta = Math.max(0, exportFailureCount - this.prevExportFailureCount); + const exportTimeoutDelta = Math.max(0, exportTimeoutCount - this.prevExportTimeoutCount); + + this.prevDroppedSpanCount = droppedSpanCount; + this.prevExportFailureCount = exportFailureCount; + this.prevExportTimeoutCount = exportTimeoutCount; + + this.recentDropSignal += droppedDelta; + this.recentFailureSignal += exportFailureDelta; + this.recentTimeoutSignal += exportTimeoutDelta; + + const queueFillRatio = + snapshot.queueFillRatio === null || snapshot.queueFillRatio === undefined + ? null + : clamp01(snapshot.queueFillRatio); + + if (queueFillRatio !== null) { + this.queueFillEwma = this.queueFillEwma === null ? queueFillRatio : 0.25 * queueFillRatio + 0.75 * this.queueFillEwma; + } + + const queuePressure = normalizeBetween(this.queueFillEwma, 0.2, 0.85); + const eventLoopPressure = normalizeBetween(snapshot.eventLoopLagP95Ms ?? null, 20, 150); + const memoryPressure = normalizeBetween(snapshot.memoryPressureRatio ?? null, 0.8, 0.92); + const exportFailurePressure = clamp01(this.recentFailureSignal / 5); + + const pressure = Math.max(queuePressure, eventLoopPressure, memoryPressure, exportFailurePressure); + const hardBrake = + droppedDelta > 0 || + exportTimeoutDelta > 0 || + Boolean(snapshot.exportCircuitOpen) || + (snapshot.eventLoopLagP95Ms ?? 0) >= 150 || + (snapshot.memoryPressureRatio ?? 0) >= 0.92; + + const previousState = this.state; + const previousMultiplier = this.admissionMultiplier; + + if (hardBrake) { + this.pausedUntilMs = now + 15000; + this.admissionMultiplier = 0; + this.state = "critical_pause"; + this.lastDecreaseAtMs = now; + this.logTransition(previousState, previousMultiplier, pressure, snapshot); + return; + } + + if (now < this.pausedUntilMs) { + this.state = "critical_pause"; + this.logTransition(previousState, previousMultiplier, pressure, snapshot); + return; + } + + const minMultiplier = this.getMinMultiplier(); + + if (pressure >= 0.7) { + this.admissionMultiplier = Math.max(minMultiplier, this.admissionMultiplier * 0.4); + this.state = "hot"; + this.lastDecreaseAtMs = now; + } else if (pressure >= 0.45) { + this.admissionMultiplier = Math.max(minMultiplier, this.admissionMultiplier * 0.7); + this.state = "warm"; + this.lastDecreaseAtMs = now; + } else { + if (pressure <= 0.2 && now - this.lastDecreaseAtMs >= 10000) { + this.admissionMultiplier = Math.min(1, this.admissionMultiplier + 0.05); + } + this.state = "healthy"; + } + + this.logTransition(previousState, previousMultiplier, pressure, snapshot); + } + + getDecision({ isPreAppStart }: { isPreAppStart: boolean }): RootSamplingDecision { + if (isPreAppStart) { + return { + shouldRecord: true, + reason: "pre_app_start", + mode: this.config.mode, + state: this.state, + baseRate: this.config.baseRate, + minRate: this.config.minRate, + effectiveRate: 1, + admissionMultiplier: 1, + }; + } + + const effectiveRate = + this.config.mode === "adaptive" ? this.getEffectiveSamplingRate() : clamp01(this.config.baseRate); + + if (effectiveRate <= 0) { + return { + shouldRecord: false, + reason: this.state === "critical_pause" ? "critical_pause" : "not_sampled", + mode: this.config.mode, + state: this.state, + baseRate: this.config.baseRate, + minRate: this.config.minRate, + effectiveRate, + admissionMultiplier: this.admissionMultiplier, + }; + } + + const shouldRecord = this.randomFn() < effectiveRate; + return { + shouldRecord, + reason: shouldRecord + ? "sampled" + : this.config.mode === "adaptive" && effectiveRate < this.config.baseRate + ? "load_shed" + : "not_sampled", + mode: this.config.mode, + state: this.state, + baseRate: this.config.baseRate, + minRate: this.config.minRate, + effectiveRate, + admissionMultiplier: this.config.mode === "adaptive" ? this.admissionMultiplier : 1, + }; + } + + getEffectiveSamplingRate(): number { + if (this.config.mode !== "adaptive") { + return clamp01(this.config.baseRate); + } + if (this.nowFn() < this.pausedUntilMs || this.state === "critical_pause") { + return 0; + } + const effectiveRate = this.config.baseRate * this.admissionMultiplier; + return clamp( + effectiveRate, + Math.min(this.config.baseRate, this.config.minRate), + this.config.baseRate, + ); + } + + getSnapshot(): Omit { + return { + mode: this.config.mode, + state: this.state, + baseRate: this.config.baseRate, + minRate: this.config.minRate, + effectiveRate: + this.config.mode === "adaptive" ? this.getEffectiveSamplingRate() : clamp01(this.config.baseRate), + admissionMultiplier: this.config.mode === "adaptive" ? this.admissionMultiplier : 1, + }; + } + + private getMinMultiplier(): number { + if (this.config.baseRate <= 0 || this.config.minRate <= 0) { + return 0; + } + return clamp01(this.config.minRate / this.config.baseRate); + } + + private logTransition( + previousState: AdaptiveSamplingState, + previousMultiplier: number, + pressure: number, + snapshot: AdaptiveSamplingHealthSnapshot, + ): void { + if ( + previousState === this.state && + Math.abs(previousMultiplier - this.admissionMultiplier) < 0.05 + ) { + return; + } + + logger.info( + `Adaptive sampling updated (state=${this.state}, multiplier=${this.admissionMultiplier.toFixed(2)}, effectiveRate=${this.getEffectiveSamplingRate().toFixed(4)}, pressure=${pressure.toFixed(2)}, queueFill=${this.queueFillEwma?.toFixed(2) ?? "n/a"}, eventLoopLagP95Ms=${snapshot.eventLoopLagP95Ms ?? "n/a"}, memoryPressureRatio=${snapshot.memoryPressureRatio ?? "n/a"}, exportCircuitOpen=${snapshot.exportCircuitOpen ? "true" : "false"}).`, + ); + } +} diff --git a/src/core/tracing/DriftBatchSpanProcessor.test.ts b/src/core/tracing/DriftBatchSpanProcessor.test.ts new file mode 100644 index 00000000..a454924c --- /dev/null +++ b/src/core/tracing/DriftBatchSpanProcessor.test.ts @@ -0,0 +1,68 @@ +import test from "ava"; +import { ExportResultCode } from "@opentelemetry/core"; +import { TuskDriftMode } from "../TuskDrift"; +import { DriftBatchSpanProcessor } from "./DriftBatchSpanProcessor"; +import type { TdSpanExporter } from "./TdSpanExporter"; + +function createMockSpan(name: string) { + return { + name, + }; +} + +test("exports a batch when max batch size is reached", async (t) => { + const exportedBatchSizes: number[] = []; + const exporter = { + export(spans: unknown[], resultCallback: (result: { code: ExportResultCode }) => void) { + exportedBatchSizes.push(spans.length); + resultCallback({ code: ExportResultCode.SUCCESS }); + }, + async shutdown() {}, + } as unknown as TdSpanExporter; + + const processor = new DriftBatchSpanProcessor({ + exporter, + config: { + maxQueueSize: 10, + maxExportBatchSize: 2, + scheduledDelayMillis: 1000, + }, + mode: TuskDriftMode.RECORD, + }); + + processor.onEnd(createMockSpan("one") as never); + processor.onEnd(createMockSpan("two") as never); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + t.deepEqual(exportedBatchSizes, [2]); + await processor.shutdown(); +}); + +test("drops spans when the queue is full", async (t) => { + const exporter = { + export(_spans: unknown[], resultCallback: (result: { code: ExportResultCode }) => void) { + resultCallback({ code: ExportResultCode.SUCCESS }); + }, + async shutdown() {}, + } as unknown as TdSpanExporter; + + const processor = new DriftBatchSpanProcessor({ + exporter, + config: { + maxQueueSize: 1, + maxExportBatchSize: 10, + scheduledDelayMillis: 1000, + }, + mode: TuskDriftMode.RECORD, + }); + + processor.onEnd(createMockSpan("one") as never); + processor.onEnd(createMockSpan("two") as never); + + const snapshot = processor.getHealthSnapshot(); + t.is(snapshot.queueSize, 1); + t.is(snapshot.droppedSpanCount, 1); + + await processor.shutdown(); +}); diff --git a/src/core/tracing/DriftBatchSpanProcessor.ts b/src/core/tracing/DriftBatchSpanProcessor.ts new file mode 100644 index 00000000..fa7e6119 --- /dev/null +++ b/src/core/tracing/DriftBatchSpanProcessor.ts @@ -0,0 +1,152 @@ +import type { Context, Span } from "@opentelemetry/api"; +import { ExportResultCode } from "@opentelemetry/core"; +import type { ReadableSpan, SpanProcessor } from "@opentelemetry/sdk-trace-base"; +import { TuskDriftMode } from "../TuskDrift"; +import { logger } from "../utils/logger"; +import { TdSpanExporter } from "./TdSpanExporter"; + +export interface DriftBatchSpanProcessorConfig { + maxQueueSize: number; + maxExportBatchSize: number; + scheduledDelayMillis: number; +} + +export interface DriftBatchProcessorHealthSnapshot { + queueSize: number; + maxQueueSize: number; + queueFillRatio: number; + droppedSpanCount: number; + exportFailureCount: number; + lastExportLatencyMs: number | null; +} + +export class DriftBatchSpanProcessor implements SpanProcessor { + private readonly exporter: TdSpanExporter; + private readonly config: DriftBatchSpanProcessorConfig; + private readonly mode: TuskDriftMode; + private readonly queue: ReadableSpan[] = []; + + private interval: NodeJS.Timeout; + private flushPromise: Promise | null = null; + private flushRequested = false; + private stopped = false; + + private droppedSpanCount = 0; + private exportFailureCount = 0; + private lastExportLatencyMs: number | null = null; + + constructor({ + exporter, + config, + mode, + }: { + exporter: TdSpanExporter; + config: DriftBatchSpanProcessorConfig; + mode: TuskDriftMode; + }) { + this.exporter = exporter; + this.config = config; + this.mode = mode; + this.interval = setInterval(() => { + void this.flushOneBatch(); + }, this.config.scheduledDelayMillis); + this.interval.unref?.(); + } + + onStart(_span: Span, _parentContext: Context): void {} + + onEnd(span: ReadableSpan): void { + if (this.stopped || this.mode !== TuskDriftMode.RECORD) { + return; + } + + if (this.queue.length >= this.config.maxQueueSize) { + this.droppedSpanCount += 1; + if (this.droppedSpanCount <= 5 || this.droppedSpanCount % 100 === 0) { + logger.warn( + `DriftBatchSpanProcessor queue full (${this.config.maxQueueSize}), dropping span. droppedSpans=${this.droppedSpanCount}`, + ); + } + return; + } + + this.queue.push(span); + if (this.queue.length >= this.config.maxExportBatchSize) { + this.requestFlushSoon(); + } + } + + forceFlush(): Promise { + return this.drainQueue(); + } + + async shutdown(): Promise { + this.stopped = true; + clearInterval(this.interval); + await this.drainQueue(); + await this.exporter.shutdown(); + } + + getHealthSnapshot(): DriftBatchProcessorHealthSnapshot { + return { + queueSize: this.queue.length, + maxQueueSize: this.config.maxQueueSize, + queueFillRatio: + this.config.maxQueueSize > 0 ? this.queue.length / this.config.maxQueueSize : 0, + droppedSpanCount: this.droppedSpanCount, + exportFailureCount: this.exportFailureCount, + lastExportLatencyMs: this.lastExportLatencyMs, + }; + } + + private requestFlushSoon(): void { + if (this.flushRequested) { + return; + } + this.flushRequested = true; + queueMicrotask(() => { + this.flushRequested = false; + void this.flushOneBatch(); + }); + } + + private async drainQueue(): Promise { + while (this.queue.length > 0 || this.flushPromise) { + await this.flushOneBatch(); + if (this.flushPromise) { + await this.flushPromise; + } + } + } + + private async flushOneBatch(): Promise { + if (this.flushPromise || this.queue.length === 0) { + return this.flushPromise ?? Promise.resolve(); + } + + const batch = this.queue.splice(0, this.config.maxExportBatchSize); + const startedAtMs = Date.now(); + + this.flushPromise = new Promise((resolve) => { + this.exporter.export(batch, (result) => { + this.lastExportLatencyMs = Date.now() - startedAtMs; + + if (result.code === ExportResultCode.FAILED) { + this.exportFailureCount += 1; + logger.warn( + `DriftBatchSpanProcessor export failed for batch of ${batch.length} span(s): ${result.error instanceof Error ? result.error.message : "unknown error"}`, + ); + } + + resolve(); + }); + }).finally(() => { + this.flushPromise = null; + if (!this.stopped && this.queue.length >= this.config.maxExportBatchSize) { + this.requestFlushSoon(); + } + }); + + return this.flushPromise; + } +} diff --git a/src/core/tracing/TdSpanExporter.ts b/src/core/tracing/TdSpanExporter.ts index 52086698..75ec88f5 100644 --- a/src/core/tracing/TdSpanExporter.ts +++ b/src/core/tracing/TdSpanExporter.ts @@ -3,7 +3,7 @@ import { ExportResult, ExportResultCode } from "@opentelemetry/core"; import { TuskDriftMode } from "../TuskDrift"; import { SpanTransformer } from "./SpanTransformer"; import { FilesystemSpanAdapter } from "./adapters/FilesystemSpanAdapter"; -import { ApiSpanAdapter } from "./adapters/ApiSpanAdapter"; +import { ApiSpanAdapter, ApiSpanAdapterHealthSnapshot } from "./adapters/ApiSpanAdapter"; import { logger } from "../utils/logger"; import { CleanSpanData, TD_INSTRUMENTATION_LIBRARY_NAME, TdSpanAttributes } from "../types"; import { TraceBlockingManager } from "./TraceBlockingManager"; @@ -20,6 +20,7 @@ export interface TdTraceExporterConfig { environment?: string; sdkVersion: string; sdkInstanceId: string; + exportTimeoutMillis: number; } /** A SpanExportAdapter defines the actual thing that exports to api/file/etc. @@ -30,6 +31,13 @@ export interface SpanExportAdapter { shutdown(): Promise; } +export interface TdExporterHealthSnapshot { + failureCount: number; + timeoutCount: number; + circuitOpen: boolean; + lastExportLatencyMs: number | null; +} + export class TdSpanExporter implements SpanExporter { private mode: TuskDriftMode; private environment?: string; @@ -55,6 +63,7 @@ export class TdSpanExporter implements SpanExporter { environment: config.environment, sdkVersion: config.sdkVersion, sdkInstanceId: config.sdkInstanceId, + exportTimeoutMillis: config.exportTimeoutMillis, }), ); } else { @@ -71,6 +80,39 @@ export class TdSpanExporter implements SpanExporter { return this.adapters; } + getHealthSnapshot(): TdExporterHealthSnapshot { + const apiAdapterSnapshots = this.adapters + .filter((adapter): adapter is ApiSpanAdapter => adapter instanceof ApiSpanAdapter) + .map((adapter) => adapter.getHealthSnapshot()); + + if (apiAdapterSnapshots.length === 0) { + return { + failureCount: 0, + timeoutCount: 0, + circuitOpen: false, + lastExportLatencyMs: null, + }; + } + + return apiAdapterSnapshots.reduce( + (accumulator, snapshot: ApiSpanAdapterHealthSnapshot) => ({ + failureCount: accumulator.failureCount + snapshot.failureCount, + timeoutCount: accumulator.timeoutCount + snapshot.timeoutCount, + circuitOpen: accumulator.circuitOpen || snapshot.circuitState === "open", + lastExportLatencyMs: Math.max( + accumulator.lastExportLatencyMs ?? 0, + snapshot.lastExportLatencyMs ?? 0, + ), + }), + { + failureCount: 0, + timeoutCount: 0, + circuitOpen: false, + lastExportLatencyMs: null, + }, + ); + } + /** * Add a custom export adapter */ diff --git a/src/core/tracing/adapters/ApiSpanAdapter.ts b/src/core/tracing/adapters/ApiSpanAdapter.ts index 76b69fdd..4fa6eac7 100644 --- a/src/core/tracing/adapters/ApiSpanAdapter.ts +++ b/src/core/tracing/adapters/ApiSpanAdapter.ts @@ -1,17 +1,21 @@ import { ExportResult, ExportResultCode } from "@opentelemetry/core"; import type { SpanExportAdapter } from "../TdSpanExporter"; import { CleanSpanData } from "../../types"; -import { SpanExportServiceClient } from "@use-tusk/drift-schemas/backend/span_export_service.client"; import { ExportSpansRequest, ExportSpansResponse, } from "@use-tusk/drift-schemas/backend/span_export_service"; -import { TwirpFetchTransport } from "@protobuf-ts/twirp-transport"; import { Span, PackageType, SpanKind as DriftSpanKind } from "@use-tusk/drift-schemas/core/span"; import { SpanKind as OtelSpanKind } from "@opentelemetry/api"; import { toStruct } from "../../utils/protobufUtils"; import { logger } from "../../utils/logger"; import { buildExportSpansRequestBytes } from "../../rustCoreBinding"; +import { + CircuitBreaker, + CircuitState, + NonRetryableError, + withRetries, +} from "./resilience"; export interface ApiSpanAdapterConfig { apiKey: string; @@ -20,10 +24,18 @@ export interface ApiSpanAdapterConfig { environment?: string; sdkVersion: string; sdkInstanceId: string; + exportTimeoutMillis: number; } const DRIFT_API_PATH = "/api/drift"; +export interface ApiSpanAdapterHealthSnapshot { + failureCount: number; + timeoutCount: number; + circuitState: CircuitState; + lastExportLatencyMs: number | null; +} + /** * Exports spans to Tusk backend API via protobuf */ @@ -31,11 +43,15 @@ export class ApiSpanAdapter implements SpanExportAdapter { readonly name = "api"; private apiKey: string; private tuskBackendBaseUrl: string; - private spanExportClient: SpanExportServiceClient; private observableServiceId: string; private environment?: string; private sdkVersion: string; private sdkInstanceId: string; + private exportTimeoutMillis: number; + private circuitBreaker: CircuitBreaker; + private failureCount = 0; + private timeoutCount = 0; + private lastExportLatencyMs: number | null = null; constructor(config: ApiSpanAdapterConfig) { this.apiKey = config.apiKey; @@ -44,82 +60,46 @@ export class ApiSpanAdapter implements SpanExportAdapter { this.environment = config.environment; this.sdkVersion = config.sdkVersion; this.sdkInstanceId = config.sdkInstanceId; - - const transport = new TwirpFetchTransport({ - baseUrl: `${config.tuskBackendBaseUrl}${DRIFT_API_PATH}`, - meta: { - "x-api-key": config.apiKey, - "x-td-skip-instrumentation": "true", - }, + this.exportTimeoutMillis = config.exportTimeoutMillis; + this.circuitBreaker = new CircuitBreaker({ + failureThreshold: 5, + resetTimeoutMs: 30000, }); - this.spanExportClient = new SpanExportServiceClient(transport); logger.debug("ApiSpanAdapter initialized"); } async exportSpans(spans: CleanSpanData[]): Promise { - try { - const rustRequestBytes = buildExportSpansRequestBytes( - this.observableServiceId, - this.environment || "", - this.sdkVersion, - this.sdkInstanceId, - spans.map((s) => s.protoSpanBytes).filter((s): s is Buffer => Buffer.isBuffer(s)), - ); - const allSpansHavePrebuiltBytes = - spans.length > 0 && spans.every((s) => Buffer.isBuffer(s.protoSpanBytes)); - - if (allSpansHavePrebuiltBytes && rustRequestBytes) { - const response = await fetch( - `${this.tuskBackendBaseUrl}${DRIFT_API_PATH}/tusk.drift.backend.v1.SpanExportService/ExportSpans`, - { - method: "POST", - headers: { - "x-api-key": this.apiKey, - "x-td-skip-instrumentation": "true", - "Content-Type": "application/protobuf", - Accept: "application/protobuf", - }, - body: new Uint8Array(rustRequestBytes), - }, - ); - - if (!response.ok) { - throw new Error(`Remote export failed with status ${response.status}`); - } - - const responseBytes = new Uint8Array(await response.arrayBuffer()); - const parsed = ExportSpansResponse.fromBinary(responseBytes); - if (!parsed.success) { - throw new Error(`Remote export failed: ${parsed.message}`); - } - - logger.debug( - `Successfully exported ${spans.length} spans to remote endpoint (rust binary path)`, - ); - return { code: ExportResultCode.SUCCESS }; - } - - // Transform spans to protobuf format - const protoSpans: Span[] = spans.map((span) => this.transformSpanToProtobuf(span)); - - const request: ExportSpansRequest = { - observableServiceId: this.observableServiceId, - environment: this.environment || "", - sdkVersion: this.sdkVersion, - sdkInstanceId: this.sdkInstanceId, - spans: protoSpans, + if (!this.circuitBreaker.allowRequest()) { + const error = new Error("Remote export circuit breaker is open"); + logger.warn(error.message); + return { + code: ExportResultCode.FAILED, + error, }; + } - const response = await this.spanExportClient.exportSpans(request); + const startedAtMs = Date.now(); - if (!response.response.success) { - throw new Error(`Remote export failed: ${response.response.message}`); - } + try { + const requestBytes = this.buildRequestBytes(spans); + await withRetries( + () => this.postExportRequest(requestBytes), + { + maxAttempts: 3, + initialDelayMs: 500, + maxDelayMs: 4000, + }, + ); + this.circuitBreaker.recordSuccess(); + this.lastExportLatencyMs = Date.now() - startedAtMs; logger.debug(`Successfully exported ${spans.length} spans to remote endpoint`); return { code: ExportResultCode.SUCCESS }; } catch (error) { + this.failureCount += 1; + this.lastExportLatencyMs = Date.now() - startedAtMs; + this.circuitBreaker.recordFailure(); logger.error(`Failed to export spans to remote:`, error); return { code: ExportResultCode.FAILED, @@ -128,6 +108,15 @@ export class ApiSpanAdapter implements SpanExportAdapter { } } + getHealthSnapshot(): ApiSpanAdapterHealthSnapshot { + return { + failureCount: this.failureCount, + timeoutCount: this.timeoutCount, + circuitState: this.circuitBreaker.getState(), + lastExportLatencyMs: this.lastExportLatencyMs, + }; + } + private transformSpanToProtobuf(cleanSpan: CleanSpanData): Span { return { traceId: cleanSpan.traceId, @@ -178,4 +167,76 @@ export class ApiSpanAdapter implements SpanExportAdapter { // No cleanup needed for API exporter return Promise.resolve(); } + + private buildRequestBytes(spans: CleanSpanData[]): Uint8Array { + const rustRequestBytes = buildExportSpansRequestBytes( + this.observableServiceId, + this.environment || "", + this.sdkVersion, + this.sdkInstanceId, + spans.map((span) => span.protoSpanBytes).filter((value): value is Buffer => Buffer.isBuffer(value)), + ); + const allSpansHavePrebuiltBytes = + spans.length > 0 && spans.every((span) => Buffer.isBuffer(span.protoSpanBytes)); + + if (allSpansHavePrebuiltBytes && rustRequestBytes) { + return new Uint8Array(rustRequestBytes); + } + + const protoSpans: Span[] = spans.map((span) => this.transformSpanToProtobuf(span)); + const request: ExportSpansRequest = { + observableServiceId: this.observableServiceId, + environment: this.environment || "", + sdkVersion: this.sdkVersion, + sdkInstanceId: this.sdkInstanceId, + spans: protoSpans, + }; + return ExportSpansRequest.toBinary(request); + } + + private async postExportRequest(requestBytes: Uint8Array): Promise { + const controller = new AbortController(); + const timeout = setTimeout(() => { + controller.abort(new Error("Remote export timed out")); + }, this.exportTimeoutMillis); + + try { + const response = await fetch( + `${this.tuskBackendBaseUrl}${DRIFT_API_PATH}/tusk.drift.backend.v1.SpanExportService/ExportSpans`, + { + method: "POST", + headers: { + "x-api-key": this.apiKey, + "x-td-skip-instrumentation": "true", + "Content-Type": "application/protobuf", + Accept: "application/protobuf", + }, + body: Buffer.from(requestBytes), + signal: controller.signal, + }, + ); + + if (response.status >= 500) { + throw new Error(`Remote export failed with status ${response.status}`); + } + + if (response.status !== 200) { + throw new NonRetryableError(`Remote export failed with status ${response.status}`); + } + + const responseBytes = new Uint8Array(await response.arrayBuffer()); + const parsed = ExportSpansResponse.fromBinary(responseBytes); + if (!parsed.success) { + throw new Error(`Remote export failed: ${parsed.message}`); + } + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + this.timeoutCount += 1; + throw error; + } + throw error; + } finally { + clearTimeout(timeout); + } + } } diff --git a/src/core/tracing/adapters/resilience.ts b/src/core/tracing/adapters/resilience.ts new file mode 100644 index 00000000..3fd59954 --- /dev/null +++ b/src/core/tracing/adapters/resilience.ts @@ -0,0 +1,100 @@ +export type CircuitState = "closed" | "open" | "half_open"; + +export interface CircuitBreakerConfig { + failureThreshold: number; + resetTimeoutMs: number; +} + +export interface RetryConfig { + maxAttempts: number; + initialDelayMs: number; + maxDelayMs: number; +} + +export class NonRetryableError extends Error {} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export class CircuitBreaker { + private readonly config: CircuitBreakerConfig; + private state: CircuitState = "closed"; + private failureCount = 0; + private openedAtMs = 0; + + constructor(config: CircuitBreakerConfig) { + this.config = config; + } + + allowRequest(): boolean { + if (this.state === "closed") { + return true; + } + + if (this.state === "open") { + if (Date.now() - this.openedAtMs < this.config.resetTimeoutMs) { + return false; + } + this.state = "half_open"; + return true; + } + + return true; + } + + recordSuccess(): void { + this.failureCount = 0; + this.state = "closed"; + this.openedAtMs = 0; + } + + recordFailure(): void { + if (this.state === "half_open") { + this.tripOpen(); + return; + } + + this.failureCount += 1; + if (this.failureCount >= this.config.failureThreshold) { + this.tripOpen(); + } + } + + getState(): CircuitState { + return this.state; + } + + private tripOpen(): void { + this.state = "open"; + this.failureCount = 0; + this.openedAtMs = Date.now(); + } +} + +export async function withRetries( + fn: () => Promise, + config: RetryConfig, +): Promise { + let attempt = 0; + let delayMs = config.initialDelayMs; + let lastError: unknown = new Error("retry loop did not execute"); + + while (attempt < config.maxAttempts) { + try { + return await fn(); + } catch (error) { + lastError = error; + attempt += 1; + + if (error instanceof NonRetryableError || attempt >= config.maxAttempts) { + break; + } + + await sleep(delayMs); + delayMs = Math.min(delayMs * 2, config.maxDelayMs); + } + } + + throw lastError; +} diff --git a/src/core/utils/configUtils.ts b/src/core/utils/configUtils.ts index 796b91f7..4aa9a2ff 100644 --- a/src/core/utils/configUtils.ts +++ b/src/core/utils/configUtils.ts @@ -33,6 +33,11 @@ export interface TuskConfig { }; recording?: { sampling_rate?: number; + sampling?: { + mode?: "fixed" | "adaptive"; + base_rate?: number; + min_rate?: number; + }; export_spans?: boolean; enable_env_var_recording?: boolean; enable_analytics?: boolean; diff --git a/src/instrumentation/libraries/http/HttpInstrumentation.test.ts b/src/instrumentation/libraries/http/HttpInstrumentation.test.ts index 05034055..efae760b 100644 --- a/src/instrumentation/libraries/http/HttpInstrumentation.test.ts +++ b/src/instrumentation/libraries/http/HttpInstrumentation.test.ts @@ -118,6 +118,51 @@ test.afterEach(() => { SpanUtilsErrorTesting.teardownErrorResilienceTest(); }); +test("should treat the first inbound HTTP request as pre-app-start before auto-marking ready", (t) => { + const callOrder: string[] = []; + let capturedIsPreAppStart: boolean | undefined; + + (httpInstrumentation as any).tuskDrift = { + isAppReady: () => { + callOrder.push("isAppReady"); + return false; + }, + shouldRecordRootRequest: ({ isPreAppStart }: { isPreAppStart: boolean }) => { + callOrder.push("shouldRecordRootRequest"); + capturedIsPreAppStart = isPreAppStart; + return { shouldRecord: true }; + }, + executeWithoutRecording: (fn: () => unknown) => { + callOrder.push("executeWithoutRecording"); + return fn(); + }, + markAppAsReady: () => { + callOrder.push("markAppAsReady"); + }, + }; + + (httpInstrumentation as any)._createServerSpan = () => { + callOrder.push("_createServerSpan"); + return "server-span-result"; + }; + + const wrappedEmit = (httpInstrumentation as any)._getServerEmitPatchFn("http")(() => { + callOrder.push("originalEmit"); + return "original-emit-result"; + }); + + const result = wrappedEmit.call({} as http.Server, "request", {} as http.IncomingMessage, {} as http.ServerResponse); + + t.is(result, "server-span-result"); + t.true(capturedIsPreAppStart === true); + t.deepEqual(callOrder, [ + "isAppReady", + "shouldRecordRootRequest", + "_createServerSpan", + "markAppAsReady", + ]); +}); + // Client Request Error Resilience test("should complete HTTP requests when SpanUtils.createSpan throws", async (t) => { SpanUtilsErrorTesting.mockCreateSpanWithError({ diff --git a/src/instrumentation/libraries/http/Instrumentation.ts b/src/instrumentation/libraries/http/Instrumentation.ts index 92dfc8a4..2478430b 100644 --- a/src/instrumentation/libraries/http/Instrumentation.ts +++ b/src/instrumentation/libraries/http/Instrumentation.ts @@ -44,7 +44,6 @@ import { SchemaMerges, } from "../../../core/tracing/JsonSchemaHelper"; import { - shouldSample, OriginalGlobalUtils, logger, isEsm, @@ -169,7 +168,7 @@ export class HttpInstrumentation extends TdInstrumentationBase { logger.debug( `[HttpInstrumentation] Dropping inbound request due to transforms: ${method} ${url}`, ); - return originalHandler.call(this); + return this.tuskDrift.executeWithoutRecording(() => originalHandler.call(this)); } // Handle replay mode using replay hooks (only if app is ready) @@ -1365,32 +1364,36 @@ export class HttpInstrumentation extends TdInstrumentationBase { return (originalEmit: Function) => { return function (this: Server, eventName: string, ...args: any[]) { if (eventName === "request") { - if (!self.tuskDrift.isAppReady()) { - self.tuskDrift.markAppAsReady(); - } + const isPreAppStart = !self.tuskDrift.isAppReady(); - if (self.mode === TuskDriftMode.RECORD) { - if ( - !shouldSample({ - samplingRate: self.tuskDrift.getSamplingRate(), - isAppReady: self.tuskDrift.isAppReady(), - }) - ) { - return originalEmit.apply(this, [eventName, ...args]); + try { + if (self.mode === TuskDriftMode.RECORD) { + const decision = self.tuskDrift.shouldRecordRootRequest({ + isPreAppStart, + }); + if (!decision.shouldRecord) { + return self.tuskDrift.executeWithoutRecording(() => + originalEmit.apply(this, [eventName, ...args]), + ); + } } - } - const req = args[0]; - const res = args[1]; + const req = args[0]; + const res = args[1]; - return self._createServerSpan({ - req, - res, - originalHandler: () => { - return originalEmit.apply(this, [eventName, ...args]); - }, - protocol, - }); + return self._createServerSpan({ + req, + res, + originalHandler: () => { + return originalEmit.apply(this, [eventName, ...args]); + }, + protocol, + }); + } finally { + if (isPreAppStart) { + self.tuskDrift.markAppAsReady(); + } + } } return originalEmit.apply(this, [eventName, ...args]); diff --git a/src/instrumentation/libraries/nextjs/Instrumentation.ts b/src/instrumentation/libraries/nextjs/Instrumentation.ts index 92607091..23202a4f 100644 --- a/src/instrumentation/libraries/nextjs/Instrumentation.ts +++ b/src/instrumentation/libraries/nextjs/Instrumentation.ts @@ -11,7 +11,7 @@ import { NextjsBaseServerModule, } from "./types"; import { wrap, handleRecordMode, handleReplayMode } from "../../core/utils"; -import { shouldSample, OriginalGlobalUtils, logger } from "../../../core/utils"; +import { OriginalGlobalUtils, logger } from "../../../core/utils"; import { PackageType, StatusCode } from "@use-tusk/drift-schemas/core/span"; import { EncodingType, JsonSchemaHelper } from "../../../core/tracing/JsonSchemaHelper"; import { @@ -123,13 +123,13 @@ export class NextjsInstrumentation extends TdInstrumentationBase { ) { // Sample as soon as we can to avoid additional overhead if this request is not sampled if (self.mode === TuskDriftMode.RECORD) { - if ( - !shouldSample({ - samplingRate: self.tuskDrift.getSamplingRate(), - isAppReady: self.tuskDrift.isAppReady(), - }) - ) { - return originalHandleRequest.call(this, req, res, parsedUrl); + const decision = self.tuskDrift.shouldRecordRootRequest({ + isPreAppStart: !self.tuskDrift.isAppReady(), + }); + if (!decision.shouldRecord) { + return self.tuskDrift.executeWithoutRecording(() => + originalHandleRequest.call(this, req, res, parsedUrl), + ); } }