Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 165 additions & 1 deletion extensions/lifecycle/LifecycleMetrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,53 @@ const LIFECYCLE_LABEL_OP = 'op';
const LIFECYCLE_LABEL_STATUS = 'status';
const LIFECYCLE_LABEL_LOCATION = 'location';
const LIFECYCLE_LABEL_TYPE = 'type';
const LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID = 'conductor_scan_id';

const LIFECYCLE_MARKER_METRICS_LOCATION = '-delete-marker-';

// Keep per-scan series long enough for scraping and debugging recent overlap,
// but remove them from prom-client eventually to avoid unbounded process memory.
const BUCKET_PROCESSOR_SCAN_METRIC_RETENTION_MS = 24 * 60 * 60 * 1000;
const BUCKET_PROCESSOR_ORIGIN = 'bucket_processor';

// Conductor scheduling heartbeat: timestamp (ms since epoch) of the
// instant the conductor most recently *started* a scan. Use this to
// detect "the conductor is no longer scheduling scans" via the
// LifecycleLateScan alert; do NOT subtract it from latest_batch_end_time
// to derive the scan duration: while a scan is in progress, end_time is
// from the previous run and start_time has just been refreshed, so the
// difference is negative. Use s3_lifecycle_conductor_last_batch_duration_seconds
// instead.
const conductorLatestBatchStartTime = ZenkoMetrics.createGauge({
name: 's3_lifecycle_latest_batch_start_time',
help: 'Timestamp of latest lifecycle batch start time',
help: 'Conductor scheduling heartbeat: ms-since-epoch timestamp of ' +
'the most recent scan start. Use to detect that the conductor is ' +
'still scheduling scans (LifecycleLateScan alert). Do NOT use to ' +
'derive scan duration; use ' +
's3_lifecycle_conductor_last_batch_duration_seconds for that.',
labelNames: [LIFECYCLE_LABEL_ORIGIN],
});

// Conductor scan-completion timestamp (ms since epoch) of the last
// successfully completed scan. Useful as a "scan completed at all"
// signal; combine with conductor_last_batch_duration_seconds to know
// "the most recent scan finished N seconds ago and took M seconds".
const conductorLatestBatchEndTime = ZenkoMetrics.createGauge({
name: 's3_lifecycle_latest_batch_end_time',
help: 'Timestamp (ms since epoch) of the most recent successful ' +
'lifecycle conductor scan completion.',
labelNames: [LIFECYCLE_LABEL_ORIGIN],
});

// Duration of the latest conductor scan, computed by the conductor itself
// at scan completion. Exposed as a gauge so dashboards can render the most
// recent batch duration directly, without computing end - start in PromQL
// (which would yield negative values mid-scan, when end is from the
// previous batch and start has just been refreshed).
const conductorLastBatchDurationSeconds = ZenkoMetrics.createGauge({
name: 's3_lifecycle_conductor_last_batch_duration_seconds',
help: 'Duration in seconds of the latest lifecycle conductor batch, ' +
'as measured by the conductor at scan completion.',
labelNames: [LIFECYCLE_LABEL_ORIGIN],
});

Expand Down Expand Up @@ -50,6 +91,48 @@ const lifecycleLegacyTask = ZenkoMetrics.createCounter({
labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_STATUS],
});

const conductorLatestBatchBucketCount = ZenkoMetrics.createGauge({
name: 's3_lifecycle_latest_batch_bucket_count',
help: 'Number of buckets listed in the latest lifecycle conductor batch',
labelNames: [LIFECYCLE_LABEL_ORIGIN],
});

const bucketProcessorScanMessagesProcessed = ZenkoMetrics.createCounter({
name: 's3_lifecycle_bucket_processor_scan_messages_processed_total',
help: 'Total number of bucket-tasks topic messages successfully processed ' +
'by this bucket processor, grouped by conductor scan id. Each message ' +
'corresponds to a single listing slice (initial or continuation), not ' +
'a unique bucket: a bucket with multiple listings (truncated v1, or ' +
'current/noncurrent/orphan splits in v2) increments this counter once ' +
'per slice. Multiple conductor_scan_id label values appearing at the ' +
'same time indicate overlapping scans. Scan-id series are retained ' +
'locally for 24 hours, so cardinality scales with scan frequency and ' +
'bucket processor pod count.',
labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID],
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conductor_scan_id label creates a new time series per scan (UUID). While the 24h removeStaleBucketProcessorScanMetrics cleanup bounds prom-client process memory, every unique scan ID still produces a distinct series in the Prometheus TSDB until it becomes stale (5 min after the last scrape). With a typical lifecycle interval of ~6 min, that is ~240 distinct label values/day/pod. This is manageable but worth documenting: if the scan interval is ever shortened (e.g. 1 min), cardinality rises proportionally. Consider adding a note in the metric help string about the expected cardinality bounds and the cleanup mechanism.

— Claude Code

});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded label cardinality on conductor_scan_id.

bucketProcessorScanStartTime and bucketProcessorScanBucketDoneCount use conductor_scan_id (a new UUID every scan) as a label. Since old label values are never removed from the prom-client registry, the number of time series grows unboundedly with each scan cycle. This will cause increasing memory usage in the bucket processor over time.

Consider either: (1) calling .remove() on old label sets when a new scan starts, or (2) dropping the conductor_scan_id label entirely and resetting the gauge value on each new scan instead.

— Claude Code

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bucketProcessorScanStartTime and bucketProcessorScanBucketDoneCount use conductor_scan_id (a UUID) as a label. Each conductor scan creates new time series that are never cleaned up via remove() or reset(). Over days/weeks this will cause unbounded cardinality growth in Prometheus, leading to high memory usage.

Consider either:
- Removing the conductor_scan_id label and using a single gauge that gets overwritten each scan
- Calling .remove() on the previous scan's label set when a new scan starts
- Using a fixed set of labels (e.g. pod name only) and resetting the value on each new scan

— Claude Code

Comment thread
benzekrimaha marked this conversation as resolved.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conductor_scan_id label on this counter creates a new Prometheus time series for every scan (a fresh UUID each time). Since counters are never pruned within a process lifetime, label cardinality grows without bound — e.g. ~288 new series/day at 5-min scan intervals, per bucket processor pod. This is a classic cardinality bomb that will eventually pressure Prometheus memory and storage.

Consider tracking scan progress with a gauge (resettable per scan) instead of a per-scan-ID counter, or dropping the conductor_scan_id label from the metric and relying on logs for per-scan correlation.

— Claude Code

const bucketProcessorScanMetricLastUpdated = new Map();

function removeStaleBucketProcessorScanMetrics(log, now) {
bucketProcessorScanMetricLastUpdated.forEach((lastUpdated, conductorScanId) => {
if (now - lastUpdated > BUCKET_PROCESSOR_SCAN_METRIC_RETENTION_MS) {
try {
bucketProcessorScanMessagesProcessed.remove({
[LIFECYCLE_LABEL_ORIGIN]: BUCKET_PROCESSOR_ORIGIN,
[LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID]: conductorScanId,
});
bucketProcessorScanMetricLastUpdated.delete(conductorScanId);
} catch (err) {
if (log) {
log.error('failed to remove stale bucket processor scan metric', {
error: err.toString(),
conductorScanId,
});
}
}
}
});
Comment thread
benzekrimaha marked this conversation as resolved.
}

const lifecycleS3Operations = ZenkoMetrics.createCounter({
name: 's3_lifecycle_s3_operations_total',
help: 'Total number of S3 operations by the lifecycle processes',
Expand Down Expand Up @@ -113,6 +196,16 @@ class LifecycleMetrics {
}
}

/**
* Update the conductor scheduling heartbeat. Called at the start of
* every conductor scan; consumed by the LifecycleLateScan alert to
* detect that the conductor has stopped scheduling. Does NOT mark a
* scan as in progress and is NOT meant to be subtracted from
* latest_batch_end_time to derive a duration: use
* onConductorScanComplete's durationSeconds for that.
*
* @param {Object} log - logger
*/
static onProcessBuckets(log) {
try {
conductorLatestBatchStartTime.set({ origin: 'conductor' }, Date.now());
Expand Down Expand Up @@ -172,6 +265,77 @@ class LifecycleMetrics {
}
}

/**
* Record metrics at the end of a full conductor scan.
* @param {Object} log - logger
* @param {number} bucketCount - total buckets listed
* @param {number} [durationSeconds] - duration of the scan in seconds,
* as measured by the conductor. When provided and finite, sets the
* s3_lifecycle_conductor_last_batch_duration_seconds gauge. Optional
* for forward-compatibility with callers that do not measure it.
*/
Comment thread
benzekrimaha marked this conversation as resolved.
static onConductorScanComplete(log, bucketCount, durationSeconds) {
try {
conductorLatestBatchEndTime.set(
{ [LIFECYCLE_LABEL_ORIGIN]: 'conductor' },
Date.now());
conductorLatestBatchBucketCount.set({
[LIFECYCLE_LABEL_ORIGIN]: 'conductor',
}, bucketCount);
if (typeof durationSeconds === 'number' &&
Number.isFinite(durationSeconds) &&
durationSeconds >= 0) {
conductorLastBatchDurationSeconds.set({
[LIFECYCLE_LABEL_ORIGIN]: 'conductor',
}, durationSeconds);
}
} catch (err) {
LifecycleMetrics.handleError(
log, err, 'LifecycleMetrics.onConductorScanComplete', {
bucketCount,
durationSeconds,
}
);
}
}

/**
* Increment the count of bucket-tasks topic messages successfully
* processed by this bucket processor for a specific conductor scan.
* Called after the bucket task completes successfully (not when it is
* dispatched to the scheduler), once per Kafka message regardless of how
* many objects it covers.
*
* Note: this counts messages (initial + continuation/listing slices),
* not unique buckets. Keep one time series per conductor_scan_id so that
* overlapping scans remain visible. Old scan series are removed
* opportunistically after BUCKET_PROCESSOR_SCAN_METRIC_RETENTION_MS to
* avoid unbounded prom-client memory growth.
*
* @param {Object} log - logger
* @param {string} conductorScanId - conductor scan id from contextInfo
*/
static onBucketProcessorScanMessageProcessed(log, conductorScanId) {
if (!conductorScanId) {
return;
}
try {
const now = Date.now();
bucketProcessorScanMetricLastUpdated.set(conductorScanId, now);
bucketProcessorScanMessagesProcessed.inc({
[LIFECYCLE_LABEL_ORIGIN]: BUCKET_PROCESSOR_ORIGIN,
[LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID]: conductorScanId,
});
removeStaleBucketProcessorScanMetrics(log, now);
} catch (err) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error handler references wrong method name. Should match the enclosing method.

```suggestion
'LifecycleMetrics.onBucketProcessorBucketProcessed',

LifecycleMetrics.handleError(
log, err,
'LifecycleMetrics.onBucketProcessorScanMessageProcessed',
{ conductorScanId }
);
}
}

static onLifecycleTriggered(log, process, type, location, latencyMs) {
try {
lifecycleTriggerLatency.observe({
Expand Down
14 changes: 13 additions & 1 deletion extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const {
extractBucketProcessorCircuitBreakerConfigs,
} = require('../CircuitBreakerGroup');
const { lifecycleTaskVersions } = require('../../../lib/constants');
const { LifecycleMetrics } = require('../LifecycleMetrics');
const locations = require('../../../conf/locationConfig.json');

const PROCESS_OBJECTS_ACTION = 'processObjects';
Expand Down Expand Up @@ -277,12 +278,15 @@ class LifecycleBucketProcessor {
return process.nextTick(() => cb(errors.InternalError));
}
const { bucket, owner, accountId, taskVersion } = result.target;
const conductorScanId = result.contextInfo && result.contextInfo.conductorScanId;
Comment thread
benzekrimaha marked this conversation as resolved.

if (!bucket || !owner || (!accountId && this._authConfig.type === authTypeAssumeRole)) {
this._log.error('kafka bucket entry missing required fields', {
method: 'LifecycleBucketProcessor._processBucketEntry',
bucket,
owner,
accountId,
conductorScanId,
});
return process.nextTick(() => cb(errors.InternalError));
}
Expand All @@ -291,6 +295,7 @@ class LifecycleBucketProcessor {
bucket,
owner,
accountId,
conductorScanId,
});

const s3Client = this.clientManager.getS3Client(accountId);
Expand Down Expand Up @@ -345,14 +350,21 @@ class LifecycleBucketProcessor {
owner,
details: result.details,
taskName: task.constructor.name,
conductorScanId,
});
return this._internalTaskScheduler.push({
task,
rules: config.Rules,
value: result,
s3target: s3Client,
backbeatMetadataProxy,
}, cb);
}, err => {
if (!err) {
LifecycleMetrics.onBucketProcessorScanMessageProcessed(
this._log, conductorScanId);
}
return cb(err);
});
});
}

Expand Down
Loading
Loading