-
Notifications
You must be signed in to change notification settings - Fork 23
monitor lifecycle conductor #2723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: development/9.3
Are you sure you want to change the base?
Changes from all commits
7857978
a1661ea
592d911
d064eec
5561284
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,12 +5,53 @@ const LIFECYCLE_LABEL_OP = 'op'; | |
| const LIFECYCLE_LABEL_STATUS = 'status'; | ||
| const LIFECYCLE_LABEL_LOCATION = 'location'; | ||
| const LIFECYCLE_LABEL_TYPE = 'type'; | ||
| const LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID = 'conductor_scan_id'; | ||
|
|
||
| const LIFECYCLE_MARKER_METRICS_LOCATION = '-delete-marker-'; | ||
|
|
||
| // Keep per-scan series long enough for scraping and debugging recent overlap, | ||
| // but remove them from prom-client eventually to avoid unbounded process memory. | ||
| const BUCKET_PROCESSOR_SCAN_METRIC_RETENTION_MS = 24 * 60 * 60 * 1000; | ||
| const BUCKET_PROCESSOR_ORIGIN = 'bucket_processor'; | ||
|
|
||
| // Conductor scheduling heartbeat: timestamp (ms since epoch) of the | ||
| // instant the conductor most recently *started* a scan. Use this to | ||
| // detect "the conductor is no longer scheduling scans" via the | ||
| // LifecycleLateScan alert; do NOT subtract it from latest_batch_end_time | ||
| // to derive the scan duration: while a scan is in progress, end_time is | ||
| // from the previous run and start_time has just been refreshed, so the | ||
| // difference is negative. Use s3_lifecycle_conductor_last_batch_duration_seconds | ||
| // instead. | ||
| const conductorLatestBatchStartTime = ZenkoMetrics.createGauge({ | ||
| name: 's3_lifecycle_latest_batch_start_time', | ||
| help: 'Timestamp of latest lifecycle batch start time', | ||
| help: 'Conductor scheduling heartbeat: ms-since-epoch timestamp of ' + | ||
| 'the most recent scan start. Use to detect that the conductor is ' + | ||
| 'still scheduling scans (LifecycleLateScan alert). Do NOT use to ' + | ||
| 'derive scan duration; use ' + | ||
| 's3_lifecycle_conductor_last_batch_duration_seconds for that.', | ||
| labelNames: [LIFECYCLE_LABEL_ORIGIN], | ||
| }); | ||
|
|
||
| // Conductor scan-completion timestamp (ms since epoch) of the last | ||
| // successfully completed scan. Useful as a "scan completed at all" | ||
| // signal; combine with conductor_last_batch_duration_seconds to know | ||
| // "the most recent scan finished N seconds ago and took M seconds". | ||
| const conductorLatestBatchEndTime = ZenkoMetrics.createGauge({ | ||
| name: 's3_lifecycle_latest_batch_end_time', | ||
| help: 'Timestamp (ms since epoch) of the most recent successful ' + | ||
| 'lifecycle conductor scan completion.', | ||
| labelNames: [LIFECYCLE_LABEL_ORIGIN], | ||
| }); | ||
|
|
||
| // Duration of the latest conductor scan, computed by the conductor itself | ||
| // at scan completion. Exposed as a gauge so dashboards can render the most | ||
| // recent batch duration directly, without computing end - start in PromQL | ||
| // (which would yield negative values mid-scan, when end is from the | ||
| // previous batch and start has just been refreshed). | ||
| const conductorLastBatchDurationSeconds = ZenkoMetrics.createGauge({ | ||
| name: 's3_lifecycle_conductor_last_batch_duration_seconds', | ||
| help: 'Duration in seconds of the latest lifecycle conductor batch, ' + | ||
| 'as measured by the conductor at scan completion.', | ||
| labelNames: [LIFECYCLE_LABEL_ORIGIN], | ||
| }); | ||
|
|
||
|
|
@@ -50,6 +91,48 @@ const lifecycleLegacyTask = ZenkoMetrics.createCounter({ | |
| labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_STATUS], | ||
| }); | ||
|
|
||
| const conductorLatestBatchBucketCount = ZenkoMetrics.createGauge({ | ||
| name: 's3_lifecycle_latest_batch_bucket_count', | ||
| help: 'Number of buckets listed in the latest lifecycle conductor batch', | ||
| labelNames: [LIFECYCLE_LABEL_ORIGIN], | ||
| }); | ||
|
|
||
| const bucketProcessorScanMessagesProcessed = ZenkoMetrics.createCounter({ | ||
| name: 's3_lifecycle_bucket_processor_scan_messages_processed_total', | ||
| help: 'Total number of bucket-tasks topic messages successfully processed ' + | ||
| 'by this bucket processor, grouped by conductor scan id. Each message ' + | ||
| 'corresponds to a single listing slice (initial or continuation), not ' + | ||
| 'a unique bucket: a bucket with multiple listings (truncated v1, or ' + | ||
| 'current/noncurrent/orphan splits in v2) increments this counter once ' + | ||
| 'per slice. Multiple conductor_scan_id label values appearing at the ' + | ||
| 'same time indicate overlapping scans. Scan-id series are retained ' + | ||
| 'locally for 24 hours, so cardinality scales with scan frequency and ' + | ||
| 'bucket processor pod count.', | ||
| labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID], | ||
| }); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unbounded label cardinality on conductor_scan_id. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bucketProcessorScanStartTime and bucketProcessorScanBucketDoneCount use conductor_scan_id (a UUID) as a label. Each conductor scan creates new time series that are never cleaned up via remove() or reset(). Over days/weeks this will cause unbounded cardinality growth in Prometheus, leading to high memory usage.
benzekrimaha marked this conversation as resolved.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| const bucketProcessorScanMetricLastUpdated = new Map(); | ||
|
|
||
| function removeStaleBucketProcessorScanMetrics(log, now) { | ||
| bucketProcessorScanMetricLastUpdated.forEach((lastUpdated, conductorScanId) => { | ||
| if (now - lastUpdated > BUCKET_PROCESSOR_SCAN_METRIC_RETENTION_MS) { | ||
| try { | ||
| bucketProcessorScanMessagesProcessed.remove({ | ||
| [LIFECYCLE_LABEL_ORIGIN]: BUCKET_PROCESSOR_ORIGIN, | ||
| [LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID]: conductorScanId, | ||
| }); | ||
| bucketProcessorScanMetricLastUpdated.delete(conductorScanId); | ||
| } catch (err) { | ||
| if (log) { | ||
| log.error('failed to remove stale bucket processor scan metric', { | ||
| error: err.toString(), | ||
| conductorScanId, | ||
| }); | ||
| } | ||
| } | ||
| } | ||
| }); | ||
|
benzekrimaha marked this conversation as resolved.
|
||
| } | ||
|
|
||
| const lifecycleS3Operations = ZenkoMetrics.createCounter({ | ||
| name: 's3_lifecycle_s3_operations_total', | ||
| help: 'Total number of S3 operations by the lifecycle processes', | ||
|
|
@@ -113,6 +196,16 @@ class LifecycleMetrics { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Update the conductor scheduling heartbeat. Called at the start of | ||
| * every conductor scan; consumed by the LifecycleLateScan alert to | ||
| * detect that the conductor has stopped scheduling. Does NOT mark a | ||
| * scan as in progress and is NOT meant to be subtracted from | ||
| * latest_batch_end_time to derive a duration: use | ||
| * onConductorScanComplete's durationSeconds for that. | ||
| * | ||
| * @param {Object} log - logger | ||
| */ | ||
| static onProcessBuckets(log) { | ||
| try { | ||
| conductorLatestBatchStartTime.set({ origin: 'conductor' }, Date.now()); | ||
|
|
@@ -172,6 +265,77 @@ class LifecycleMetrics { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Record metrics at the end of a full conductor scan. | ||
| * @param {Object} log - logger | ||
| * @param {number} bucketCount - total buckets listed | ||
| * @param {number} [durationSeconds] - duration of the scan in seconds, | ||
| * as measured by the conductor. When provided and finite, sets the | ||
| * s3_lifecycle_conductor_last_batch_duration_seconds gauge. Optional | ||
| * for forward-compatibility with callers that do not measure it. | ||
| */ | ||
|
benzekrimaha marked this conversation as resolved.
|
||
| static onConductorScanComplete(log, bucketCount, durationSeconds) { | ||
| try { | ||
| conductorLatestBatchEndTime.set( | ||
| { [LIFECYCLE_LABEL_ORIGIN]: 'conductor' }, | ||
| Date.now()); | ||
| conductorLatestBatchBucketCount.set({ | ||
| [LIFECYCLE_LABEL_ORIGIN]: 'conductor', | ||
| }, bucketCount); | ||
| if (typeof durationSeconds === 'number' && | ||
| Number.isFinite(durationSeconds) && | ||
| durationSeconds >= 0) { | ||
| conductorLastBatchDurationSeconds.set({ | ||
| [LIFECYCLE_LABEL_ORIGIN]: 'conductor', | ||
| }, durationSeconds); | ||
| } | ||
| } catch (err) { | ||
| LifecycleMetrics.handleError( | ||
| log, err, 'LifecycleMetrics.onConductorScanComplete', { | ||
| bucketCount, | ||
| durationSeconds, | ||
| } | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Increment the count of bucket-tasks topic messages successfully | ||
| * processed by this bucket processor for a specific conductor scan. | ||
| * Called after the bucket task completes successfully (not when it is | ||
| * dispatched to the scheduler), once per Kafka message regardless of how | ||
| * many objects it covers. | ||
| * | ||
| * Note: this counts messages (initial + continuation/listing slices), | ||
| * not unique buckets. Keep one time series per conductor_scan_id so that | ||
| * overlapping scans remain visible. Old scan series are removed | ||
| * opportunistically after BUCKET_PROCESSOR_SCAN_METRIC_RETENTION_MS to | ||
| * avoid unbounded prom-client memory growth. | ||
| * | ||
| * @param {Object} log - logger | ||
| * @param {string} conductorScanId - conductor scan id from contextInfo | ||
| */ | ||
| static onBucketProcessorScanMessageProcessed(log, conductorScanId) { | ||
| if (!conductorScanId) { | ||
| return; | ||
| } | ||
| try { | ||
| const now = Date.now(); | ||
| bucketProcessorScanMetricLastUpdated.set(conductorScanId, now); | ||
| bucketProcessorScanMessagesProcessed.inc({ | ||
| [LIFECYCLE_LABEL_ORIGIN]: BUCKET_PROCESSOR_ORIGIN, | ||
| [LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID]: conductorScanId, | ||
| }); | ||
| removeStaleBucketProcessorScanMetrics(log, now); | ||
| } catch (err) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error handler references wrong method name. Should match the enclosing method. |
||
| LifecycleMetrics.handleError( | ||
| log, err, | ||
| 'LifecycleMetrics.onBucketProcessorScanMessageProcessed', | ||
| { conductorScanId } | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| static onLifecycleTriggered(log, process, type, location, latencyMs) { | ||
| try { | ||
| lifecycleTriggerLatency.observe({ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
conductor_scan_idlabel creates a new time series per scan (UUID). While the 24hremoveStaleBucketProcessorScanMetricscleanup bounds prom-client process memory, every unique scan ID still produces a distinct series in the Prometheus TSDB until it becomes stale (5 min after the last scrape). With a typical lifecycle interval of ~6 min, that is ~240 distinct label values/day/pod. This is manageable but worth documenting: if the scan interval is ever shortened (e.g. 1 min), cardinality rises proportionally. Consider adding a note in the metric help string about the expected cardinality bounds and the cleanup mechanism.— Claude Code