diff --git a/extensions/lifecycle/LifecycleMetrics.js b/extensions/lifecycle/LifecycleMetrics.js index eac0d1346..ac793b816 100644 --- a/extensions/lifecycle/LifecycleMetrics.js +++ b/extensions/lifecycle/LifecycleMetrics.js @@ -5,12 +5,53 @@ const LIFECYCLE_LABEL_OP = 'op'; const LIFECYCLE_LABEL_STATUS = 'status'; const LIFECYCLE_LABEL_LOCATION = 'location'; const LIFECYCLE_LABEL_TYPE = 'type'; +const LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID = 'conductor_scan_id'; const LIFECYCLE_MARKER_METRICS_LOCATION = '-delete-marker-'; +// Keep per-scan series long enough for scraping and debugging recent overlap, +// but remove them from prom-client eventually to avoid unbounded process memory. +const BUCKET_PROCESSOR_SCAN_METRIC_RETENTION_MS = 24 * 60 * 60 * 1000; +const BUCKET_PROCESSOR_ORIGIN = 'bucket_processor'; + +// Conductor scheduling heartbeat: timestamp (ms since epoch) of the +// instant the conductor most recently *started* a scan. Use this to +// detect "the conductor is no longer scheduling scans" via the +// LifecycleLateScan alert; do NOT subtract it from latest_batch_end_time +// to derive the scan duration: while a scan is in progress, end_time is +// from the previous run and start_time has just been refreshed, so the +// difference is negative. Use s3_lifecycle_conductor_last_batch_duration_seconds +// instead. const conductorLatestBatchStartTime = ZenkoMetrics.createGauge({ name: 's3_lifecycle_latest_batch_start_time', - help: 'Timestamp of latest lifecycle batch start time', + help: 'Conductor scheduling heartbeat: ms-since-epoch timestamp of ' + + 'the most recent scan start. Use to detect that the conductor is ' + + 'still scheduling scans (LifecycleLateScan alert). Do NOT use to ' + + 'derive scan duration; use ' + + 's3_lifecycle_conductor_last_batch_duration_seconds for that.', + labelNames: [LIFECYCLE_LABEL_ORIGIN], +}); + +// Conductor scan-completion timestamp (ms since epoch) of the last +// successfully completed scan. Useful as a "scan completed at all" +// signal; combine with conductor_last_batch_duration_seconds to know +// "the most recent scan finished N seconds ago and took M seconds". +const conductorLatestBatchEndTime = ZenkoMetrics.createGauge({ + name: 's3_lifecycle_latest_batch_end_time', + help: 'Timestamp (ms since epoch) of the most recent successful ' + + 'lifecycle conductor scan completion.', + labelNames: [LIFECYCLE_LABEL_ORIGIN], +}); + +// Duration of the latest conductor scan, computed by the conductor itself +// at scan completion. Exposed as a gauge so dashboards can render the most +// recent batch duration directly, without computing end - start in PromQL +// (which would yield negative values mid-scan, when end is from the +// previous batch and start has just been refreshed). +const conductorLastBatchDurationSeconds = ZenkoMetrics.createGauge({ + name: 's3_lifecycle_conductor_last_batch_duration_seconds', + help: 'Duration in seconds of the latest lifecycle conductor batch, ' + + 'as measured by the conductor at scan completion.', labelNames: [LIFECYCLE_LABEL_ORIGIN], }); @@ -50,6 +91,48 @@ const lifecycleLegacyTask = ZenkoMetrics.createCounter({ labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_STATUS], }); +const conductorLatestBatchBucketCount = ZenkoMetrics.createGauge({ + name: 's3_lifecycle_latest_batch_bucket_count', + help: 'Number of buckets listed in the latest lifecycle conductor batch', + labelNames: [LIFECYCLE_LABEL_ORIGIN], +}); + +const bucketProcessorScanMessagesProcessed = ZenkoMetrics.createCounter({ + name: 's3_lifecycle_bucket_processor_scan_messages_processed_total', + help: 'Total number of bucket-tasks topic messages successfully processed ' + + 'by this bucket processor, grouped by conductor scan id. Each message ' + + 'corresponds to a single listing slice (initial or continuation), not ' + + 'a unique bucket: a bucket with multiple listings (truncated v1, or ' + + 'current/noncurrent/orphan splits in v2) increments this counter once ' + + 'per slice. Multiple conductor_scan_id label values appearing at the ' + + 'same time indicate overlapping scans. Scan-id series are retained ' + + 'locally for 24 hours, so cardinality scales with scan frequency and ' + + 'bucket processor pod count.', + labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID], +}); +const bucketProcessorScanMetricLastUpdated = new Map(); + +function removeStaleBucketProcessorScanMetrics(log, now) { + bucketProcessorScanMetricLastUpdated.forEach((lastUpdated, conductorScanId) => { + if (now - lastUpdated > BUCKET_PROCESSOR_SCAN_METRIC_RETENTION_MS) { + try { + bucketProcessorScanMessagesProcessed.remove({ + [LIFECYCLE_LABEL_ORIGIN]: BUCKET_PROCESSOR_ORIGIN, + [LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID]: conductorScanId, + }); + bucketProcessorScanMetricLastUpdated.delete(conductorScanId); + } catch (err) { + if (log) { + log.error('failed to remove stale bucket processor scan metric', { + error: err.toString(), + conductorScanId, + }); + } + } + } + }); +} + const lifecycleS3Operations = ZenkoMetrics.createCounter({ name: 's3_lifecycle_s3_operations_total', help: 'Total number of S3 operations by the lifecycle processes', @@ -113,6 +196,16 @@ class LifecycleMetrics { } } + /** + * Update the conductor scheduling heartbeat. Called at the start of + * every conductor scan; consumed by the LifecycleLateScan alert to + * detect that the conductor has stopped scheduling. Does NOT mark a + * scan as in progress and is NOT meant to be subtracted from + * latest_batch_end_time to derive a duration: use + * onConductorScanComplete's durationSeconds for that. + * + * @param {Object} log - logger + */ static onProcessBuckets(log) { try { conductorLatestBatchStartTime.set({ origin: 'conductor' }, Date.now()); @@ -172,6 +265,77 @@ class LifecycleMetrics { } } + /** + * Record metrics at the end of a full conductor scan. + * @param {Object} log - logger + * @param {number} bucketCount - total buckets listed + * @param {number} [durationSeconds] - duration of the scan in seconds, + * as measured by the conductor. When provided and finite, sets the + * s3_lifecycle_conductor_last_batch_duration_seconds gauge. Optional + * for forward-compatibility with callers that do not measure it. + */ + static onConductorScanComplete(log, bucketCount, durationSeconds) { + try { + conductorLatestBatchEndTime.set( + { [LIFECYCLE_LABEL_ORIGIN]: 'conductor' }, + Date.now()); + conductorLatestBatchBucketCount.set({ + [LIFECYCLE_LABEL_ORIGIN]: 'conductor', + }, bucketCount); + if (typeof durationSeconds === 'number' && + Number.isFinite(durationSeconds) && + durationSeconds >= 0) { + conductorLastBatchDurationSeconds.set({ + [LIFECYCLE_LABEL_ORIGIN]: 'conductor', + }, durationSeconds); + } + } catch (err) { + LifecycleMetrics.handleError( + log, err, 'LifecycleMetrics.onConductorScanComplete', { + bucketCount, + durationSeconds, + } + ); + } + } + + /** + * Increment the count of bucket-tasks topic messages successfully + * processed by this bucket processor for a specific conductor scan. + * Called after the bucket task completes successfully (not when it is + * dispatched to the scheduler), once per Kafka message regardless of how + * many objects it covers. + * + * Note: this counts messages (initial + continuation/listing slices), + * not unique buckets. Keep one time series per conductor_scan_id so that + * overlapping scans remain visible. Old scan series are removed + * opportunistically after BUCKET_PROCESSOR_SCAN_METRIC_RETENTION_MS to + * avoid unbounded prom-client memory growth. + * + * @param {Object} log - logger + * @param {string} conductorScanId - conductor scan id from contextInfo + */ + static onBucketProcessorScanMessageProcessed(log, conductorScanId) { + if (!conductorScanId) { + return; + } + try { + const now = Date.now(); + bucketProcessorScanMetricLastUpdated.set(conductorScanId, now); + bucketProcessorScanMessagesProcessed.inc({ + [LIFECYCLE_LABEL_ORIGIN]: BUCKET_PROCESSOR_ORIGIN, + [LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID]: conductorScanId, + }); + removeStaleBucketProcessorScanMetrics(log, now); + } catch (err) { + LifecycleMetrics.handleError( + log, err, + 'LifecycleMetrics.onBucketProcessorScanMessageProcessed', + { conductorScanId } + ); + } + } + static onLifecycleTriggered(log, process, type, location, latencyMs) { try { lifecycleTriggerLatency.observe({ diff --git a/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js b/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js index 6135b4186..c1e2e7191 100644 --- a/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js +++ b/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js @@ -26,6 +26,7 @@ const { extractBucketProcessorCircuitBreakerConfigs, } = require('../CircuitBreakerGroup'); const { lifecycleTaskVersions } = require('../../../lib/constants'); +const { LifecycleMetrics } = require('../LifecycleMetrics'); const locations = require('../../../conf/locationConfig.json'); const PROCESS_OBJECTS_ACTION = 'processObjects'; @@ -277,12 +278,15 @@ class LifecycleBucketProcessor { return process.nextTick(() => cb(errors.InternalError)); } const { bucket, owner, accountId, taskVersion } = result.target; + const conductorScanId = result.contextInfo && result.contextInfo.conductorScanId; + if (!bucket || !owner || (!accountId && this._authConfig.type === authTypeAssumeRole)) { this._log.error('kafka bucket entry missing required fields', { method: 'LifecycleBucketProcessor._processBucketEntry', bucket, owner, accountId, + conductorScanId, }); return process.nextTick(() => cb(errors.InternalError)); } @@ -291,6 +295,7 @@ class LifecycleBucketProcessor { bucket, owner, accountId, + conductorScanId, }); const s3Client = this.clientManager.getS3Client(accountId); @@ -345,6 +350,7 @@ class LifecycleBucketProcessor { owner, details: result.details, taskName: task.constructor.name, + conductorScanId, }); return this._internalTaskScheduler.push({ task, @@ -352,7 +358,13 @@ class LifecycleBucketProcessor { value: result, s3target: s3Client, backbeatMetadataProxy, - }, cb); + }, err => { + if (!err) { + LifecycleMetrics.onBucketProcessorScanMessageProcessed( + this._log, conductorScanId); + } + return cb(err); + }); }); } diff --git a/extensions/lifecycle/conductor/LifecycleConductor.js b/extensions/lifecycle/conductor/LifecycleConductor.js index fbbee05ba..2922d835c 100644 --- a/extensions/lifecycle/conductor/LifecycleConductor.js +++ b/extensions/lifecycle/conductor/LifecycleConductor.js @@ -3,6 +3,7 @@ const async = require('async'); const schedule = require('node-schedule'); const zookeeper = require('node-zookeeper-client'); +const { v4: uuid } = require('uuid'); const { constants, errors } = require('arsenal'); const Logger = require('werelogs').Logger; @@ -105,6 +106,7 @@ class LifecycleConductor { this._vaultClientCache = null; this._initialized = false; this._batchInProgress = false; + this._currentScanId = null; // this cache only needs to be the size of one listing. // worst case scenario is 1 account per bucket: @@ -314,6 +316,7 @@ class LifecycleConductor { action: 'processObjects', contextInfo: { reqId: log.getSerializedUids(), + conductorScanId: this._currentScanId, }, target: { bucket: task.bucketName, @@ -356,7 +359,8 @@ class LifecycleConductor { _createBucketTaskMessages(tasks, log, cb) { if (this.lcConfig.forceLegacyListing) { - return process.nextTick(cb, null, tasks.map(t => this._taskToMessage(t, lifecycleTaskVersions.v1, log))); + return process.nextTick(cb, null, tasks.map(t => + this._taskToMessage(t, lifecycleTaskVersions.v1, log))); } return async.mapLimit(tasks, 10, (t, taskDone) => @@ -372,8 +376,11 @@ class LifecycleConductor { processBuckets(cb) { const log = this.logger.newRequestLogger(); - const start = new Date(); + const start = Date.now(); + const scanId = uuid(); + let scanStarted = false; let nBucketsQueued = 0; + let totalBucketsListed = 0; let messageDeliveryReports = 0; const messageSendQueue = async.cargo((tasks, done) => { @@ -400,16 +407,18 @@ class LifecycleConductor { } return true; }))), - (tasksWithAccountId, next) => this._createBucketTaskMessages(tasksWithAccountId, log, next), + (tasksWithAccountId, next) => this._createBucketTaskMessages( + tasksWithAccountId, log, next), ], (err, messages) => { nBucketsQueued += tasks.length; log.info('bucket push progress', { + conductorScanId: scanId, nBucketsQueued, bucketsInCargo: tasks.length, kafkaBucketMessagesDeliveryReports: messageDeliveryReports, - kafkaEnqueueRateHz: Math.round(nBucketsQueued * 1000 / (new Date() - start)), + kafkaEnqueueRateHz: Math.round(nBucketsQueued * 1000 / (Date.now() - start)), }); this._accountIdCache.expireOldest(); @@ -428,22 +437,36 @@ class LifecycleConductor { // fallback to V1 listings next => this._indexesGetInProgressJobs(log, () => next(null)), next => { + scanStarted = true; + this._currentScanId = scanId; + log.addDefaultFields({ conductorScanId: scanId }); + LifecycleMetrics.onProcessBuckets(log); this._batchInProgress = true; - log.info('starting new lifecycle batch', { bucketSource: this._bucketSource }); - this.listBuckets(messageSendQueue, log, (err, nBucketsListed) => { + log.info('starting new lifecycle batch', { + bucketSource: this._bucketSource, + conductorScanId: scanId, + }); + this.listBuckets(messageSendQueue, scanId, log, (err, listedBucketsCount) => { LifecycleMetrics.onBucketListing(log, err); - return next(err, nBucketsListed); + totalBucketsListed = listedBucketsCount; + return next(err); }); }, - (nBucketsListed, next) => { + next => { async.until( - () => nBucketsQueued === nBucketsListed, + () => nBucketsQueued === totalBucketsListed, unext => setTimeout(unext, 1000), next); }, ], err => { + const elapsedMs = Date.now() - start; + const unknownCanonicalIds = this._accountIdCache.getMisses(); if (err && err.Throttling) { - log.info('not starting new lifecycle batch', { reason: err }); + log.info('not starting new lifecycle batch', { + reason: err, + conductorScanId: scanId, + fullScanElapsedMs: elapsedMs, + }); if (cb) { cb(err); } @@ -453,31 +476,52 @@ class LifecycleConductor { this.activeIndexingJobsRetrieved = false; this.activeIndexingJobs = []; this._batchInProgress = false; - const unknownCanonicalIds = this._accountIdCache.getMisses(); if (err) { - log.error('lifecycle batch failed', { error: err, unknownCanonicalIds }); + log.error('lifecycle batch failed', { + error: err, + unknownCanonicalIds, + conductorScanId: scanId, + fullScanElapsedMs: elapsedMs, + totalBucketsListed, + nBucketsQueued, + }); + if (scanStarted) { + this._currentScanId = null; + } if (cb) { cb(err); } return; } - log.info('finished pushing lifecycle batch', { nBucketsQueued, unknownCanonicalIds }); - LifecycleMetrics.onProcessBuckets(log); + LifecycleMetrics.onConductorScanComplete( + log, + totalBucketsListed, + elapsedMs / 1000, + ); + + log.info('finished pushing lifecycle batch', { + nBucketsQueued, + unknownCanonicalIds, + conductorScanId: scanId, + fullScanElapsedMs: elapsedMs, + totalBucketsListed, + }); + this._currentScanId = null; if (cb) { cb(null, nBucketsQueued); } }); } - listBuckets(queue, log, cb) { + listBuckets(queue, scanId, log, cb) { if (this._bucketSource === 'zookeeper') { - return this.listZookeeperBuckets(queue, log, cb); + return this.listZookeeperBuckets(queue, scanId, log, cb); } if (this._bucketSource === 'mongodb') { - return this.listMongodbBuckets(queue, log, cb); + return this.listMongodbBuckets(queue, scanId, log, cb); } return this.restoreBucketCheckpoint((err, marker) => { @@ -485,11 +529,11 @@ class LifecycleConductor { return cb(err); } - return this.listBucketdBuckets(queue, marker || null, log, cb); + return this.listBucketdBuckets(queue, marker || null, scanId, log, cb); }); } - listZookeeperBuckets(queue, log, cb) { + listZookeeperBuckets(queue, scanId, log, cb) { const zkBucketsPath = this.getBucketsZkPath(); this._zkClient.getChildren( zkBucketsPath, @@ -498,7 +542,7 @@ class LifecycleConductor { if (err) { log.error( 'error getting list of buckets from zookeeper', - { zkPath: zkBucketsPath, error: err.message }); + { zkPath: zkBucketsPath, error: err.message, conductorScanId: scanId }); return cb(err); } @@ -509,7 +553,7 @@ class LifecycleConductor { if (!canonicalId || !bucketUID || !bucketName) { log.error( 'malformed zookeeper bucket entry, skipping', - { zkPath: zkBucketsPath, bucket }); + { zkPath: zkBucketsPath, bucket, conductorScanId: scanId }); return false; } @@ -572,11 +616,11 @@ class LifecycleConductor { }); } - listBucketdBuckets(queue, initMarker, log, cb) { + listBucketdBuckets(queue, initMarker, scanId, log, cb) { let isTruncated = true; let marker = initMarker; let nEnqueued = 0; - const start = new Date(); + const start = Date.now(); const retryWrapper = new BackbeatTask(this.lcConfig.conductor.retry); this.lastSentId = null; @@ -589,8 +633,9 @@ class LifecycleConductor { nEnqueuedToDownstream: nEnqueued, inFlight: queue.length(), maxInFlight: this._maxInFlightBatchSize, - bucketListingPushRateHz: Math.round(nEnqueued * 1000 / (new Date() - start)), + bucketListingPushRateHz: Math.round(nEnqueued * 1000 / (Date.now() - start)), breakerState, + conductorScanId: scanId, }; if (queue.length() > this._maxInFlightBatchSize || @@ -672,9 +717,9 @@ class LifecycleConductor { ); } - listMongodbBuckets(queue, log, cb) { + listMongodbBuckets(queue, scanId, log, cb) { let nEnqueued = 0; - const start = new Date(); + const start = Date.now(); const lastEntryPath = this.getBucketProgressZkPath(); let lastSentId = null; @@ -744,8 +789,9 @@ class LifecycleConductor { nEnqueuedToDownstream: nEnqueued, inFlight: queue.length(), maxInFlight: this._maxInFlightBatchSize, - enqueueRateHz: Math.round(nEnqueued * 1000 / (new Date() - start)), + enqueueRateHz: Math.round(nEnqueued * 1000 / (Date.now() - start)), breakerState, + conductorScanId: scanId, }; if (queue.length() > this._maxInFlightBatchSize || diff --git a/extensions/lifecycle/tasks/LifecycleDeleteObjectTask.js b/extensions/lifecycle/tasks/LifecycleDeleteObjectTask.js index 7139d8724..c0c288266 100644 --- a/extensions/lifecycle/tasks/LifecycleDeleteObjectTask.js +++ b/extensions/lifecycle/tasks/LifecycleDeleteObjectTask.js @@ -304,10 +304,16 @@ class LifecycleDeleteObjectTask extends BackbeatTask { processActionEntry(entry, done) { const startTime = Date.now(); const log = this.logger.newRequestLogger(); + const conductorScanId = entry.getContextAttribute( + 'conductorScanId'); + if (conductorScanId) { + log.addDefaultFields({ conductorScanId }); + } entry.addLoggedAttributes({ bucketName: 'target.bucket', objectKey: 'target.key', versionId: 'target.version', + conductorScanId: 'contextInfo.conductorScanId', }); return async.series([ diff --git a/extensions/lifecycle/tasks/LifecycleTask.js b/extensions/lifecycle/tasks/LifecycleTask.js index 591ce63e8..37a48d0a0 100644 --- a/extensions/lifecycle/tasks/LifecycleTask.js +++ b/extensions/lifecycle/tasks/LifecycleTask.js @@ -236,7 +236,10 @@ class LifecycleTask extends BackbeatTask { } const entry = Object.assign({}, bucketData, { - contextInfo: { reqId: log.getSerializedUids() }, + contextInfo: { + reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo?.conductorScanId, + }, details: { marker }, }); this._sendBucketEntry(entry, err => { @@ -411,6 +414,8 @@ class LifecycleTask extends BackbeatTask { const entry = Object.assign({}, bucketData, { contextInfo: { reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }, details: { keyMarker: data.NextKeyMarker, @@ -497,6 +502,8 @@ class LifecycleTask extends BackbeatTask { const entry = Object.assign({}, bucketData, { contextInfo: { reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }, details: { keyMarker: data.NextKeyMarker, @@ -1040,6 +1047,8 @@ class LifecycleTask extends BackbeatTask { origin: 'lifecycle', ruleType: 'expiration', reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }) .setAttribute('target.owner', bucketData.target.owner) .setAttribute('target.bucket', bucketData.target.bucket) @@ -1068,6 +1077,8 @@ class LifecycleTask extends BackbeatTask { origin: 'lifecycle', ruleType: 'expiration', reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }) .setAttribute('target.owner', bucketData.target.owner) .setAttribute('target.bucket', bucketData.target.bucket) @@ -1174,6 +1185,7 @@ class LifecycleTask extends BackbeatTask { origin: 'lifecycle', ruleType: 'transition', reqId: log.getSerializedUids(), + conductorScanId: params.conductorScanId, }); entry.setAttribute('source', { bucket: params.bucket, @@ -1374,6 +1386,8 @@ class LifecycleTask extends BackbeatTask { site: rules[ncvt].StorageClass, transitionTime: this._lifecycleDateTime.getTransitionTimestamp( { Days: rules[ncvt][ncd] }, staleDate), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }, log, cb); return; } @@ -1450,6 +1464,8 @@ class LifecycleTask extends BackbeatTask { origin: 'lifecycle', ruleType: 'expiration', reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }) .setAttribute('target.owner', bucketData.target.owner) .setAttribute('target.bucket', bucketData.target.bucket) @@ -1519,6 +1535,8 @@ class LifecycleTask extends BackbeatTask { origin: 'lifecycle', ruleType: 'expiration', reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }) .setAttribute('target.owner', bucketData.target.owner) .setAttribute('target.bucket', bucketData.target.bucket) @@ -1606,6 +1624,8 @@ class LifecycleTask extends BackbeatTask { site: rules.Transition.StorageClass, transitionTime: this._lifecycleDateTime.getTransitionTimestamp( rules.Transition, object.LastModified), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }, log, done); } return done(); @@ -1709,6 +1729,8 @@ class LifecycleTask extends BackbeatTask { encodedVersionId: undefined, transitionTime: this._lifecycleDateTime.getTransitionTimestamp( rules.Transition, version.LastModified), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }, log, done); } @@ -1758,6 +1780,8 @@ class LifecycleTask extends BackbeatTask { origin: 'lifecycle', ruleType: 'expiration', reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }) .setAttribute('target.owner', bucketData.target.owner) .setAttribute('target.bucket', bucketData.target.bucket) @@ -1827,6 +1851,11 @@ class LifecycleTask extends BackbeatTask { processBucketEntry(bucketLCRules, bucketData, s3target, backbeatMetadataProxy, nbRetries, done) { const log = this.log.newRequestLogger(); + const conductorScanId = bucketData.contextInfo + && bucketData.contextInfo.conductorScanId; + if (conductorScanId) { + log.addDefaultFields({ conductorScanId }); + } this.s3target = s3target; this.backbeatMetadataProxy = backbeatMetadataProxy; if (!this.backbeatMetadataProxy) { diff --git a/extensions/lifecycle/tasks/LifecycleTaskV2.js b/extensions/lifecycle/tasks/LifecycleTaskV2.js index f7ca26652..4a4d8b0ac 100644 --- a/extensions/lifecycle/tasks/LifecycleTaskV2.js +++ b/extensions/lifecycle/tasks/LifecycleTaskV2.js @@ -52,7 +52,11 @@ class LifecycleTaskV2 extends LifecycleTask { } = l; const entry = Object.assign({}, bucketData, { - contextInfo: { requestId: log.getSerializedUids() }, + contextInfo: { + requestId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, + }, details: { beforeDate, prefix, listType, storageClass }, }); @@ -115,7 +119,11 @@ class LifecycleTaskV2 extends LifecycleTask { // re-queue truncated listing only once. if (isTruncated && nbRetries === 0) { const entry = Object.assign({}, bucketData, { - contextInfo: { requestId: log.getSerializedUids() }, + contextInfo: { + requestId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, + }, details: { beforeDate: params.BeforeDate, prefix: params.Prefix, @@ -199,7 +207,11 @@ class LifecycleTaskV2 extends LifecycleTask { // re-queue truncated listing only once. if (isTruncated && nbRetries === 0) { const entry = Object.assign({}, bucketData, { - contextInfo: { requestId: log.getSerializedUids() }, + contextInfo: { + requestId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, + }, details: { beforeDate: params.BeforeDate, prefix: params.Prefix, @@ -350,6 +362,8 @@ class LifecycleTaskV2 extends LifecycleTask { site: rules.Transition.StorageClass, transitionTime: this._lifecycleDateTime.getTransitionTimestamp( rules.Transition, obj.LastModified), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }, log, cb); } @@ -426,6 +440,8 @@ class LifecycleTaskV2 extends LifecycleTask { origin: 'lifecycle', ruleType: 'expiration', reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }) .setAttribute('target.owner', bucketData.target.owner) .setAttribute('target.bucket', bucketData.target.bucket) diff --git a/extensions/lifecycle/tasks/LifecycleUpdateExpirationTask.js b/extensions/lifecycle/tasks/LifecycleUpdateExpirationTask.js index a94928ab9..38895a59c 100644 --- a/extensions/lifecycle/tasks/LifecycleUpdateExpirationTask.js +++ b/extensions/lifecycle/tasks/LifecycleUpdateExpirationTask.js @@ -150,10 +150,16 @@ class LifecycleUpdateExpirationTask extends BackbeatTask { processActionEntry(entry, done) { const startTime = Date.now(); const log = this.logger.newRequestLogger(); + const conductorScanId = entry.getContextAttribute( + 'conductorScanId'); + if (conductorScanId) { + log.addDefaultFields({ conductorScanId }); + } entry.addLoggedAttributes({ bucketName: 'target.bucket', objectKey: 'target.key', versionId: 'target.version', + conductorScanId: 'contextInfo.conductorScanId', }); async.waterfall([ diff --git a/extensions/lifecycle/tasks/LifecycleUpdateTransitionTask.js b/extensions/lifecycle/tasks/LifecycleUpdateTransitionTask.js index 55820bad8..74424fe08 100644 --- a/extensions/lifecycle/tasks/LifecycleUpdateTransitionTask.js +++ b/extensions/lifecycle/tasks/LifecycleUpdateTransitionTask.js @@ -260,10 +260,16 @@ class LifecycleUpdateTransitionTask extends BackbeatTask { */ processActionEntry(entry, done) { const log = this.logger.newRequestLogger(); + const conductorScanId = entry.getContextAttribute( + 'conductorScanId'); + if (conductorScanId) { + log.addDefaultFields({ conductorScanId }); + } entry.addLoggedAttributes({ bucketName: 'target.bucket', objectKey: 'target.key', versionId: 'target.version', + conductorScanId: 'contextInfo.conductorScanId', eTag: 'target.eTag', lastModified: 'target.lastModified', }); diff --git a/monitoring/lifecycle/alerts.test.yaml b/monitoring/lifecycle/alerts.test.yaml index d8cbe6988..48e841c21 100644 --- a/monitoring/lifecycle/alerts.test.yaml +++ b/monitoring/lifecycle/alerts.test.yaml @@ -328,8 +328,8 @@ tests: severity: warning exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 2m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 2m 0s ago." + summary: "Lifecycle scan not scheduled in time" - alertname: LifecycleLateScan eval_time: 4m exp_alerts: @@ -337,14 +337,14 @@ tests: severity: warning exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 2m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 2m 0s ago." + summary: "Lifecycle scan not scheduled in time" - exp_labels: severity: critical exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 3m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 3m 0s ago." + summary: "Lifecycle scan not scheduled in time" - alertname: LifecycleLateScan eval_time: 5m exp_alerts: @@ -352,14 +352,14 @@ tests: severity: warning exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 2m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 2m 0s ago." + summary: "Lifecycle scan not scheduled in time" - exp_labels: severity: critical exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 3m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 3m 0s ago." + summary: "Lifecycle scan not scheduled in time" - alertname: LifecycleLateScan eval_time: 6m exp_alerts: @@ -367,8 +367,8 @@ tests: severity: warning exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 2m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 2m 0s ago." + summary: "Lifecycle scan not scheduled in time" - alertname: LifecycleLateScan eval_time: 7m exp_alerts: @@ -376,8 +376,8 @@ tests: severity: warning exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 2m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 2m 0s ago." + summary: "Lifecycle scan not scheduled in time" - alertname: LifecycleLateScan eval_time: 8m exp_alerts: @@ -385,14 +385,14 @@ tests: severity: warning exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 2m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 2m 0s ago." + summary: "Lifecycle scan not scheduled in time" - exp_labels: severity: critical exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 3m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 3m 0s ago." + summary: "Lifecycle scan not scheduled in time" - alertname: LifecycleLateScan eval_time: 9m exp_alerts: @@ -400,14 +400,14 @@ tests: severity: warning exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 2m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 2m 0s ago." + summary: "Lifecycle scan not scheduled in time" - exp_labels: severity: critical exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 3m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 3m 0s ago." + summary: "Lifecycle scan not scheduled in time" - alertname: LifecycleLateScan eval_time: 10m exp_alerts: @@ -415,14 +415,14 @@ tests: severity: warning exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 2m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 2m 0s ago." + summary: "Lifecycle scan not scheduled in time" - exp_labels: severity: critical exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 3m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 3m 0s ago." + summary: "Lifecycle scan not scheduled in time" - alertname: LifecycleLateScan eval_time: 11m exp_alerts: @@ -430,14 +430,14 @@ tests: severity: warning exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 2m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 2m 0s ago." + summary: "Lifecycle scan not scheduled in time" - exp_labels: severity: critical exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 3m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 3m 0s ago." + summary: "Lifecycle scan not scheduled in time" - alertname: LifecycleLateScan eval_time: 12m exp_alerts: @@ -445,14 +445,14 @@ tests: severity: warning exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 2m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 2m 0s ago." + summary: "Lifecycle scan not scheduled in time" - exp_labels: severity: critical exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 3m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 3m 0s ago." + summary: "Lifecycle scan not scheduled in time" - alertname: LifecycleLateScan eval_time: 13m exp_alerts: @@ -460,12 +460,51 @@ tests: severity: warning exp_annotations: zenko_service: backbeat-lifecycle-producer - description: "Last lifecycle scan was performed more than 2m 0s ago." - summary: "Lifecycle scan not executed in time" + description: "Last lifecycle scan was scheduled more than 2m 0s ago." + summary: "Lifecycle scan not scheduled in time" - alertname: LifecycleLateScan eval_time: 14m exp_alerts: [] + - name: LifecycleStuckScan + interval: 1m + input_series: + - series: s3_lifecycle_latest_batch_start_time{namespace="zenko",job="artesca-data-backbeat-lifecycle-producer-headless",pod="foo"} + values: 60000+0x5 360000 + - series: s3_lifecycle_latest_batch_end_time{namespace="zenko",job="artesca-data-backbeat-lifecycle-producer-headless",pod="foo"} + values: 0+0x5 360000 + alert_rule_test: + - alertname: LifecycleStuckScan + eval_time: 2m + exp_alerts: [] + - alertname: LifecycleStuckScan + eval_time: 4m + exp_alerts: + - exp_labels: + severity: warning + exp_annotations: + zenko_service: backbeat-lifecycle-producer + description: "Last lifecycle scan has not completed more than 2m 0s after it was scheduled." + summary: "Lifecycle scan did not complete in time" + - alertname: LifecycleStuckScan + eval_time: 5m + exp_alerts: + - exp_labels: + severity: warning + exp_annotations: + zenko_service: backbeat-lifecycle-producer + description: "Last lifecycle scan has not completed more than 2m 0s after it was scheduled." + summary: "Lifecycle scan did not complete in time" + - exp_labels: + severity: critical + exp_annotations: + zenko_service: backbeat-lifecycle-producer + description: "Last lifecycle scan has not completed more than 3m 0s after it was scheduled." + summary: "Lifecycle scan did not complete in time" + - alertname: LifecycleStuckScan + eval_time: 6m + exp_alerts: [] + - name: LifecycleLatency interval: 10m input_series: diff --git a/monitoring/lifecycle/alerts.yaml b/monitoring/lifecycle/alerts.yaml index 99f969ff3..114df6747 100644 --- a/monitoring/lifecycle/alerts.yaml +++ b/monitoring/lifecycle/alerts.yaml @@ -57,9 +57,9 @@ groups: Annotations: zenko_service: backbeat-lifecycle-producer description: >- - Last lifecycle scan was performed more than + Last lifecycle scan was scheduled more than {{ ${lifecycle_latency_warning_threshold} | humanizeDuration }} ago. - summary: "Lifecycle scan not executed in time" + summary: "Lifecycle scan not scheduled in time" - alert: LifecycleLateScan Expr: | @@ -78,9 +78,65 @@ groups: Annotations: zenko_service: backbeat-lifecycle-producer description: >- - Last lifecycle scan was performed more than + Last lifecycle scan was scheduled more than {{ ${lifecycle_latency_critical_threshold} | humanizeDuration }} ago. - summary: "Lifecycle scan not executed in time" + summary: "Lifecycle scan not scheduled in time" + + - alert: LifecycleStuckScan + Expr: | + ( + max(s3_lifecycle_latest_batch_start_time{ + namespace="${namespace}", job="${job_lifecycle_producer}" + }) + > + max(s3_lifecycle_latest_batch_end_time{ + namespace="${namespace}", job="${job_lifecycle_producer}" + } or vector(0)) + ) + and + ( + time() - ( + max(s3_lifecycle_latest_batch_start_time{ + namespace="${namespace}", job="${job_lifecycle_producer}" + }) / 1000 + ) + ) / ${lifecycle_latency_warning_threshold} > 1 + Labels: + severity: warning + Annotations: + zenko_service: backbeat-lifecycle-producer + description: >- + Last lifecycle scan has not completed more than + {{ ${lifecycle_latency_warning_threshold} | humanizeDuration }} after it was scheduled. + summary: "Lifecycle scan did not complete in time" + + - alert: LifecycleStuckScan + Expr: | + ( + max(s3_lifecycle_latest_batch_start_time{ + namespace="${namespace}", job="${job_lifecycle_producer}" + }) + > + max(s3_lifecycle_latest_batch_end_time{ + namespace="${namespace}", job="${job_lifecycle_producer}" + } or vector(0)) + ) + and + ( + time() - ( + max(s3_lifecycle_latest_batch_start_time{ + namespace="${namespace}", job="${job_lifecycle_producer}" + }) / 1000 + ) + ) / ${lifecycle_latency_critical_threshold} > 1 + Labels: + severity: critical + Annotations: + zenko_service: backbeat-lifecycle-producer + description: >- + Last lifecycle scan has not completed more than + {{ ${lifecycle_latency_critical_threshold} | humanizeDuration }} after it was scheduled. + summary: "Lifecycle scan did not complete in time" - name: LifecycleBucketProcessor rules: diff --git a/monitoring/lifecycle/dashboard.json b/monitoring/lifecycle/dashboard.json index a714de95e..998cb5c69 100644 --- a/monitoring/lifecycle/dashboard.json +++ b/monitoring/lifecycle/dashboard.json @@ -4345,6 +4345,340 @@ "transparent": false, "type": "timeseries" }, + { + "datasource": "${DS_PROMETHEUS}", + "description": "Duration of the most recently completed conductor scan, as measured by the conductor itself at scan completion.", + "editable": true, + "error": false, + "fieldConfig": { + "defaults": { + "custom": {}, + "decimals": null, + "mappings": [], + "noValue": "-", + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "index": 0, + "line": true, + "op": "gt", + "value": "null", + "yaxis": "left" + }, + { + "color": "orange", + "index": 1, + "line": true, + "op": "gt", + "value": 60.0, + "yaxis": "left" + }, + { + "color": "red", + "index": 2, + "line": true, + "op": "gt", + "value": 300.0, + "yaxis": "left" + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 12, + "x": 0, + "y": 111 + }, + "hideTimeOverride": false, + "id": 50, + "links": [], + "maxDataPoints": 100, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "last" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": null, + "expr": "s3_lifecycle_conductor_last_batch_duration_seconds{job=\"${job_lifecycle_producer}\", namespace=\"${namespace}\"}", + "format": "time_series", + "hide": false, + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "", + "metric": "", + "refId": "", + "step": 10, + "target": "" + } + ], + "title": "Last Conductor Batch Duration", + "transformations": [], + "transparent": false, + "type": "stat" + }, + { + "datasource": "${DS_PROMETHEUS}", + "description": "Number of buckets listed during the latest conductor scan.", + "editable": true, + "error": false, + "fieldConfig": { + "defaults": { + "custom": {}, + "decimals": 0, + "mappings": [], + "noValue": "0", + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "semi-dark-blue", + "index": 0, + "line": true, + "op": "gt", + "value": "null", + "yaxis": "left" + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 12, + "x": 12, + "y": 111 + }, + "hideTimeOverride": false, + "id": 51, + "links": [], + "maxDataPoints": 100, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "last" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": null, + "expr": "s3_lifecycle_latest_batch_bucket_count{job=\"${job_lifecycle_producer}\", namespace=\"${namespace}\"}", + "format": "time_series", + "hide": false, + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "", + "metric": "", + "refId": "", + "step": 10, + "target": "" + } + ], + "title": "Buckets Listed", + "transformations": [], + "transparent": false, + "type": "stat" + }, + { + "datasource": "${DS_PROMETHEUS}", + "description": "Rate of bucket-tasks topic messages successfully processed by conductor scan id and bucket processor pod. Multiple scan ids active at the same time indicate overlapping conductor scans, which should not happen in normal operation. Each message corresponds to a single listing slice (initial or continuation), not a unique bucket.", + "editable": true, + "error": false, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "smooth", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "log": 2, + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": {}, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [] + }, + "unit": "" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 115 + }, + "hideTimeOverride": false, + "id": 52, + "links": [], + "maxDataPoints": 100, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": null, + "expr": "sum(rate(s3_lifecycle_bucket_processor_scan_messages_processed_total{job=\"${job_lifecycle_bucket_processor}\", namespace=\"${namespace}\"}[$__rate_interval])) by (pod, conductor_scan_id)", + "format": "time_series", + "hide": false, + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{pod}} / {{conductor_scan_id}}", + "metric": "", + "refId": "", + "step": 10, + "target": "" + } + ], + "title": "Bucket Processor Messages Processed by Scan", + "transformations": [], + "transparent": false, + "type": "timeseries" + }, + { + "datasource": "${DS_PROMETHEUS}", + "description": "Message processing rate grouped by conductor_scan_id across all bucket-processor pods. Should normally show one active scan id; multiple active scan ids indicate overlapping conductor scans.", + "editable": true, + "error": false, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "smooth", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "log": 2, + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": {}, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [] + }, + "unit": "" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 115 + }, + "hideTimeOverride": false, + "id": 53, + "links": [], + "maxDataPoints": 100, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": null, + "expr": "sum(rate(s3_lifecycle_bucket_processor_scan_messages_processed_total{job=\"${job_lifecycle_bucket_processor}\", namespace=\"${namespace}\"}[$__rate_interval])) by (conductor_scan_id)", + "format": "time_series", + "hide": false, + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{conductor_scan_id}}", + "metric": "", + "refId": "", + "step": 10, + "target": "" + } + ], + "title": "Bucket Processor Active Conductor Scans", + "transformations": [], + "transparent": false, + "type": "timeseries" + }, { "datasource": "${DS_PROMETHEUS}", "editable": true, @@ -4442,10 +4776,10 @@ "h": 7, "w": 8, "x": 0, - "y": 111 + "y": 122 }, "hideTimeOverride": false, - "id": 50, + "id": 54, "links": [], "maxDataPoints": 100, "options": { @@ -4555,10 +4889,10 @@ "h": 7, "w": 8, "x": 8, - "y": 111 + "y": 122 }, "hideTimeOverride": false, - "id": 51, + "id": 55, "links": [], "maxDataPoints": 100, "options": { @@ -4640,10 +4974,10 @@ "h": 7, "w": 8, "x": 16, - "y": 111 + "y": 122 }, "hideTimeOverride": false, - "id": 52, + "id": 56, "links": [], "maxDataPoints": 100, "options": { diff --git a/monitoring/lifecycle/dashboard.py b/monitoring/lifecycle/dashboard.py index bf11220ac..249087371 100644 --- a/monitoring/lifecycle/dashboard.py +++ b/monitoring/lifecycle/dashboard.py @@ -52,6 +52,16 @@ class Metrics: job='${job_lifecycle_producer}', namespace='${namespace}', ) + LATEST_BATCH_END_TIME = metrics.Metric( + 's3_lifecycle_latest_batch_end_time', + job='${job_lifecycle_producer}', namespace='${namespace}', + ) + + CONDUCTOR_LAST_BATCH_DURATION = metrics.Metric( + 's3_lifecycle_conductor_last_batch_duration_seconds', + job='${job_lifecycle_producer}', namespace='${namespace}', + ) + BUCKET_LISTING_SUCCESS, BUCKET_LISTING_ERROR, BUCKET_LISTING_THROTTLING = [ metrics.CounterMetric( name, job='${job_lifecycle_producer}', namespace='${namespace}', @@ -73,6 +83,16 @@ class Metrics: 'status', job='${job_lifecycle_producer}', namespace='${namespace}', ) + LATEST_BATCH_BUCKET_COUNT = metrics.Metric( + 's3_lifecycle_latest_batch_bucket_count', + job='${job_lifecycle_producer}', namespace='${namespace}', + ) + + BUCKET_PROCESSOR_SCAN_MESSAGES_PROCESSED = metrics.CounterMetric( + 's3_lifecycle_bucket_processor_scan_messages_processed_total', + 'conductor_scan_id', job='${job_lifecycle_bucket_processor}', namespace='${namespace}', + ) + S3_OPS = metrics.CounterMetric( 's3_lifecycle_s3_operations_total', 'origin', 'op', 'status', job=['$jobs'], namespace='${namespace}', @@ -730,6 +750,80 @@ def color_override(name, color): ] ) +lifecycle_full_scan_elapsed = Stat( + title="Last Conductor Batch Duration", + description="Duration of the most recently completed conductor scan, " + "as measured by the conductor itself at scan completion.", + dataSource="${DS_PROMETHEUS}", + reduceCalc="last", + format=UNITS.SECONDS, + noValue='-', + targets=[ + Target( + expr=Metrics.CONDUCTOR_LAST_BATCH_DURATION(), + ), + ], + thresholds=[ + Threshold('green', 0, 0.0), + Threshold('orange', 1, 60.0), + Threshold('red', 2, 300.0), + ], +) + +lifecycle_scan_count_buckets = Stat( + title="Buckets Listed", + description="Number of buckets listed during the latest conductor scan.", + dataSource="${DS_PROMETHEUS}", + reduceCalc="last", + decimals=0, + noValue='0', + targets=[ + Target( + expr=Metrics.LATEST_BATCH_BUCKET_COUNT(), + ), + ], + thresholds=[ + Threshold('semi-dark-blue', 0, 0.0), + ], +) + +bucket_processor_messages_processed = TimeSeries( + title="Bucket Processor Messages Processed by Scan", + description="Rate of bucket-tasks topic messages successfully processed " + "by conductor scan id and bucket processor pod. Multiple scan ids " + "active at the same time indicate overlapping conductor scans, which " + "should not happen in normal operation. Each message corresponds to " + "a single listing slice (initial or continuation), not a unique " + "bucket.", + dataSource="${DS_PROMETHEUS}", + lineInterpolation="smooth", + decimals=0, + targets=[ + Target( + expr='sum(rate(' + Metrics.BUCKET_PROCESSOR_SCAN_MESSAGES_PROCESSED() + ')) ' + 'by (pod, conductor_scan_id)', + legendFormat='{{pod}} / {{conductor_scan_id}}', + ), + ], +) + +bucket_processor_active_scan_ids = TimeSeries( + title="Bucket Processor Active Conductor Scans", + description="Message processing rate grouped by conductor_scan_id across " + "all bucket-processor pods. Should normally show one active scan id; " + "multiple active scan ids indicate overlapping conductor scans.", + dataSource="${DS_PROMETHEUS}", + lineInterpolation="smooth", + decimals=0, + targets=[ + Target( + expr='sum(rate(' + Metrics.BUCKET_PROCESSOR_SCAN_MESSAGES_PROCESSED() + ')) ' + 'by (conductor_scan_id)', + legendFormat='{{conductor_scan_id}}', + ), + ], +) + active_indexing_jobs = TimeSeries( title="Active Indexing jobs", dataSource="${DS_PROMETHEUS}", @@ -897,6 +991,8 @@ def color_override(name, color): layout.row([s3_delete_object_ops, s3_delete_mpu_ops], height=8), RowPanel(title="Lifecycle Conductor"), layout.row([lifecycle_scans, trigger_latency], height=7), + layout.row([lifecycle_full_scan_elapsed, lifecycle_scan_count_buckets], height=4), + layout.row([bucket_processor_messages_processed, bucket_processor_active_scan_ids], height=7), layout.row([lifecycle_scan_rate, active_indexing_jobs, legacy_tasks], height=7), ]), ) diff --git a/tests/functional/lifecycle/LifecycleConductor.spec.js b/tests/functional/lifecycle/LifecycleConductor.spec.js index 9943ab4c2..68b525254 100644 --- a/tests/functional/lifecycle/LifecycleConductor.spec.js +++ b/tests/functional/lifecycle/LifecycleConductor.spec.js @@ -41,7 +41,7 @@ const expected2Messages = (version='v2') => ([ { value: { action: 'processObjects', - contextInfo: { reqId: 'test-request-id' }, + contextInfo: { reqId: 'test-request-id', conductorScanId: 'test-scan-id' }, target: { bucket: 'bucket1', owner: 'owner1', taskVersion: version }, details: {}, }, @@ -49,7 +49,7 @@ const expected2Messages = (version='v2') => ([ { value: { action: 'processObjects', - contextInfo: { reqId: 'test-request-id' }, + contextInfo: { reqId: 'test-request-id', conductorScanId: 'test-scan-id' }, target: { bucket: 'bucket1-2', owner: 'owner1', taskVersion: version }, details: {}, }, @@ -60,7 +60,7 @@ const expected4Messages = (version='v2') => ([ { value: { action: 'processObjects', - contextInfo: { reqId: 'test-request-id' }, + contextInfo: { reqId: 'test-request-id', conductorScanId: 'test-scan-id' }, target: { bucket: 'bucket1', owner: 'owner1', taskVersion: version }, details: {}, }, @@ -68,7 +68,7 @@ const expected4Messages = (version='v2') => ([ { value: { action: 'processObjects', - contextInfo: { reqId: 'test-request-id' }, + contextInfo: { reqId: 'test-request-id', conductorScanId: 'test-scan-id' }, target: { bucket: 'bucket1-2', owner: 'owner1', taskVersion: version }, details: {}, }, @@ -76,7 +76,7 @@ const expected4Messages = (version='v2') => ([ { value: { action: 'processObjects', - contextInfo: { reqId: 'test-request-id' }, + contextInfo: { reqId: 'test-request-id', conductorScanId: 'test-scan-id' }, target: { bucket: 'bucket3', owner: 'owner3', taskVersion: version }, details: {}, }, @@ -84,7 +84,7 @@ const expected4Messages = (version='v2') => ([ { value: { action: 'processObjects', - contextInfo: { reqId: 'test-request-id' }, + contextInfo: { reqId: 'test-request-id', conductorScanId: 'test-scan-id' }, target: { bucket: 'bucket4', owner: 'owner4', taskVersion: version }, details: {}, }, diff --git a/tests/unit/lifecycle/LifecycleConductor.spec.js b/tests/unit/lifecycle/LifecycleConductor.spec.js index 598d0b4df..f75ebaf9a 100644 --- a/tests/unit/lifecycle/LifecycleConductor.spec.js +++ b/tests/unit/lifecycle/LifecycleConductor.spec.js @@ -3,10 +3,12 @@ const assert = require('assert'); const sinon = require('sinon'); const fakeLogger = require('../../utils/fakeLogger'); +const { errors } = require('arsenal'); const { splitter } = require('arsenal').constants; const LifecycleConductor = require( '../../../extensions/lifecycle/conductor/LifecycleConductor'); +const { LifecycleMetrics } = require('../../../extensions/lifecycle/LifecycleMetrics'); const { lifecycleTaskVersions, indexesForFeature @@ -190,6 +192,103 @@ describe('Lifecycle Conductor', () => { conductor.processBuckets(done); }); + it('should publish full scan metrics at end of scan', done => { + conductor._mongodbClient = { + getIndexingJobs: (_, cb) => cb(null, ['job1']), + getCollection: () => ({ + find: () => ({ + project: () => ({ + hasNext: () => Promise.resolve(false), + }), + }), + }), + }; + conductor._zkClient = { + getData: (_, cb) => cb(null, null, null), + setData: (path, data, version, cb) => cb(null, { version: 1 }), + }; + + sinon.stub(conductor, '_controlBacklog').callsFake(cb => cb(null)); + const metricStub = sinon.stub(LifecycleMetrics, 'onConductorScanComplete'); + + conductor.processBuckets(err => { + assert.ifError(err); + assert(metricStub.calledOnce); + const [, bucketCount, durationSeconds] = metricStub.firstCall.args; + assert.strictEqual(bucketCount, 0); + assert.strictEqual(typeof durationSeconds, 'number'); + assert(Number.isFinite(durationSeconds) && durationSeconds >= 0, + `durationSeconds should be a finite non-negative number, got ${durationSeconds}`); + done(); + }); + }); + + it('should keep the in-flight scan id when backlog control throttles a new scan', done => { + const inFlightScanId = 'in-flight-scan-id'; + conductor._currentScanId = inFlightScanId; + conductor._batchInProgress = true; + + sinon.stub(conductor, '_controlBacklog') + .callsFake(cb => cb(errors.Throttling.customizeDescription('Batch in progress'))); + const onProcessBucketsStub = sinon.stub(LifecycleMetrics, 'onProcessBuckets'); + + conductor.processBuckets(err => { + assert(err && err.Throttling); + assert.strictEqual(conductor._currentScanId, inFlightScanId); + assert(onProcessBucketsStub.notCalled); + done(); + }); + }); + + it('should generate a conductorScanId', done => { + conductor._mongodbClient = { + getIndexingJobs: (_, cb) => cb(null, []), + getCollection: () => ({ + find: () => ({ + project: () => ({ + hasNext: () => Promise.resolve(false), + }), + }), + }), + }; + conductor._zkClient = { + getData: (_, cb) => cb(null, null, null), + setData: (path, data, version, cb) => cb(null, { version: 1 }), + }; + conductor._producer = { send: (msg, cb) => cb(null, {}) }; + const addDefaultFieldsStub = sinon.stub(); + const testLog = conductor.logger.newRequestLogger(); + testLog.addDefaultFields = addDefaultFieldsStub; + sinon.stub(conductor.logger, 'newRequestLogger').returns(testLog); + + sinon.stub(conductor, '_controlBacklog').callsFake(cb => cb(null)); + let capturedScanId = null; + sinon.stub(conductor, 'listBuckets') + .callsFake((queue, scanId, log, cb) => { + // Verify scanId is a valid UUID + const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; + assert(uuidRegex.test(scanId), + `conductorScanId should be a valid UUID v4, got: ${scanId}`); + capturedScanId = scanId; + assert(addDefaultFieldsStub.calledOnce); + assert.strictEqual(addDefaultFieldsStub.firstCall.args[0].conductorScanId, scanId); + cb(null, 0); + }); + const metricStub = sinon.stub(LifecycleMetrics, 'onConductorScanComplete'); + + conductor.processBuckets(err => { + assert.ifError(err); + assert(metricStub.calledOnce); + const logPassedToMetric = metricStub.firstCall.args[0]; + assert.strictEqual(logPassedToMetric, testLog); + assert(capturedScanId); + const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; + assert(uuidRegex.test(capturedScanId), + `captured conductorScanId should be a valid UUID v4, got: ${capturedScanId}`); + done(); + }); + }); + // tests that `activeIndexingJobRetrieved` is not reset until the e it('should not reset `activeIndexingJobsRetrieved` while async operations are in progress', done => { const order = []; @@ -202,7 +301,7 @@ describe('Lifecycle Conductor', () => { sinon.stub(conductor._mongodbClient, 'getIndexingJobs') .callsFake((_, cb) => cb(null, ['job1', 'job2'])); sinon.stub(conductor, 'listBuckets') - .callsFake((mQueue, log, cb) => { + .callsFake((mQueue, scanId, log, cb) => { mQueue.push({ bucketName: 'testbucket', canonicalId: 'testId', @@ -244,6 +343,14 @@ describe('Lifecycle Conductor', () => { }); describe('_indexesGetOrCreate', () => { + it('should include conductor scan id in task context', () => { + conductor._currentScanId = 'scan-id-test'; + const taskMessage = conductor._taskToMessage( + getTask(true), lifecycleTaskVersions.v2, log); + const parsed = JSON.parse(taskMessage.message); + assert.strictEqual(parsed.contextInfo.conductorScanId, 'scan-id-test'); + }); + it('should return v2 for bucketd bucket sources', done => { conductor._bucketSource = 'bucketd'; conductor._indexesGetOrCreate(getTask(undefined), log, (err, taskVersion) => { @@ -435,7 +542,7 @@ describe('Lifecycle Conductor', () => { const lcConductor = makeLifecycleConductorWithFilters({ bucketSource: 'zookeeper', }); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 2); assert.strictEqual(queue.length(), 2); const expectedQueue = [ @@ -458,7 +565,7 @@ describe('Lifecycle Conductor', () => { const lcConductor = makeLifecycleConductorWithFilters({ bucketSource: 'bucketd', }, markers); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 2); assert.strictEqual(queue.length(), 2); @@ -489,7 +596,7 @@ describe('Lifecycle Conductor', () => { 'invalid:bucketuid789', 'invalid', ]); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 2); assert.strictEqual(queue.length(), 2); const expectedQueue = [ @@ -512,7 +619,7 @@ describe('Lifecycle Conductor', () => { bucketsDenied: [bucket1], bucketSource: 'zookeeper', }); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 1); assert.strictEqual(queue.length(), 1); const expectedQueue = [ @@ -532,7 +639,7 @@ describe('Lifecycle Conductor', () => { bucketsDenied: [bucket1], bucketSource: 'bucketd', }, markers); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 1); assert.strictEqual(queue.length(), 1); const expectedQueue = [ @@ -553,7 +660,7 @@ describe('Lifecycle Conductor', () => { accountsDenied: [`${accountName1}:${account1}`], bucketSource: 'zookeeper', }); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 1); assert.strictEqual(queue.length(), 1); const expectedQueue = [ @@ -573,7 +680,7 @@ describe('Lifecycle Conductor', () => { accountsDenied: [`${accountName1}:${account1}`], bucketSource: 'bucketd', }, markers); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 1); assert.strictEqual(queue.length(), 1); const expectedQueue = [ @@ -595,7 +702,7 @@ describe('Lifecycle Conductor', () => { bucketsDenied: [bucket2], bucketSource: 'zookeeper', }); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 0); assert.strictEqual(queue.length(), 0); const expectedQueue = []; @@ -611,7 +718,7 @@ describe('Lifecycle Conductor', () => { bucketsDenied: [bucket2], bucketSource: 'bucketd', }, markers); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 0); assert.strictEqual(queue.length(), 0); const expectedQueue = []; @@ -636,7 +743,7 @@ describe('Lifecycle Conductor', () => { getCollection: () => ({ find: findStub }) }; sinon.stub(lcConductor._zkClient, 'getData').yields(null, null, null); - lcConductor.listBuckets(queue, fakeLogger, err => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, err => { assert.ifError(err); assert.deepEqual(findStub.getCall(0).args[0], {}); done(); @@ -658,7 +765,7 @@ describe('Lifecycle Conductor', () => { getCollection: () => ({ find: findStub }) }; sinon.stub(lcConductor._zkClient, 'getData').yields(null, null, null); - lcConductor.listBuckets(queue, fakeLogger, err => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, err => { assert.ifError(err); assert.deepEqual(findStub.getCall(0).args[0], { 'value.owner': { @@ -686,7 +793,7 @@ describe('Lifecycle Conductor', () => { getCollection: () => ({ find: findStub }) }; sinon.stub(lcConductor._zkClient, 'getData').yields(null, null, null); - lcConductor.listBuckets(queue, fakeLogger, err => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, err => { assert.ifError(err); assert.deepEqual(findStub.getCall(0).args[0], { _id: { @@ -711,7 +818,7 @@ describe('Lifecycle Conductor', () => { getCollection: () => ({ find: findStub }) }; sinon.stub(lcConductor._zkClient, 'getData').yields(null, Buffer.from('bucket1'), null); - lcConductor.listBuckets(queue, fakeLogger, err => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, err => { assert.ifError(err); assert.deepEqual(findStub.getCall(0).args[0], { _id: { @@ -738,7 +845,7 @@ describe('Lifecycle Conductor', () => { getCollection: () => ({ find: findStub }) }; sinon.stub(lcConductor._zkClient, 'getData').yields(null, Buffer.from('bucket1'), null); - lcConductor.listBuckets(queue, fakeLogger, err => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, err => { assert.ifError(err); assert.deepEqual(findStub.getCall(0).args[0], { '_id': { diff --git a/tests/unit/lifecycle/LifecycleMetrics.spec.js b/tests/unit/lifecycle/LifecycleMetrics.spec.js index 2acdba249..910709d8b 100644 --- a/tests/unit/lifecycle/LifecycleMetrics.spec.js +++ b/tests/unit/lifecycle/LifecycleMetrics.spec.js @@ -79,6 +79,110 @@ describe('LifecycleMetrics', () => { })); }); + it('should catch errors in onConductorScanComplete', () => { + const metric = ZenkoMetrics.getMetric('s3_lifecycle_latest_batch_end_time'); + sinon.stub(metric, 'set').throws(new Error('Metric error')); + + LifecycleMetrics.onConductorScanComplete(log, 10); + + assert(log.error.calledOnce); + assert(log.error.calledWithMatch('failed to update prometheus metrics', { + method: 'LifecycleMetrics.onConductorScanComplete', + bucketCount: 10, + })); + }); + + it('should increment bucket processor messages processed counter by scan id', () => { + const messagesMetric = ZenkoMetrics.getMetric( + 's3_lifecycle_bucket_processor_scan_messages_processed_total'); + const messagesInc = sinon.stub(messagesMetric, 'inc'); + + LifecycleMetrics.onBucketProcessorScanMessageProcessed(log, 'scan-A'); + assert(messagesInc.calledOnce); + assert(messagesInc.calledWithMatch({ + origin: 'bucket_processor', + ['conductor_scan_id']: 'scan-A', + })); + + assert(log.error.notCalled); + }); + + it('should catch errors in onBucketProcessorScanMessageProcessed', () => { + const messagesMetric = ZenkoMetrics.getMetric( + 's3_lifecycle_bucket_processor_scan_messages_processed_total'); + sinon.stub(messagesMetric, 'inc').throws(new Error('Metric error')); + + LifecycleMetrics.onBucketProcessorScanMessageProcessed(log, 'scan-A'); + + assert(log.error.calledOnce); + assert(log.error.calledWithMatch('failed to update prometheus metrics', { + method: 'LifecycleMetrics.onBucketProcessorScanMessageProcessed', + conductorScanId: 'scan-A', + })); + }); + + it('should be a no-op when scan id is missing', () => { + const messagesMetric = ZenkoMetrics.getMetric( + 's3_lifecycle_bucket_processor_scan_messages_processed_total'); + const incStub = sinon.stub(messagesMetric, 'inc'); + + LifecycleMetrics.onBucketProcessorScanMessageProcessed(log, undefined); + LifecycleMetrics.onBucketProcessorScanMessageProcessed(log, ''); + + assert(incStub.notCalled); + assert(log.error.notCalled); + }); + + it('should remove stale bucket processor scan series', () => { + const messagesMetric = ZenkoMetrics.getMetric( + 's3_lifecycle_bucket_processor_scan_messages_processed_total'); + const removeStub = sinon.stub(messagesMetric, 'remove'); + sinon.stub(messagesMetric, 'inc'); + const now = 1700000000000; + sinon.stub(Date, 'now') + .onFirstCall().returns(now) + .onSecondCall().returns(now + 24 * 60 * 60 * 1000 + 1); + + LifecycleMetrics.onBucketProcessorScanMessageProcessed(log, 'scan-A'); + LifecycleMetrics.onBucketProcessorScanMessageProcessed(log, 'scan-B'); + + assert(removeStub.calledOnce); + assert(removeStub.calledWithMatch({ + origin: 'bucket_processor', + ['conductor_scan_id']: 'scan-A', + })); + assert(log.error.notCalled); + }); + + it('should continue stale series cleanup when one removal fails', () => { + const messagesMetric = ZenkoMetrics.getMetric( + 's3_lifecycle_bucket_processor_scan_messages_processed_total'); + const removeStub = sinon.stub(messagesMetric, 'remove') + .onFirstCall().throws(new Error('Metric remove error')); + const incStub = sinon.stub(messagesMetric, 'inc'); + const now = 1700000000000; + sinon.stub(Date, 'now') + .onFirstCall().returns(now) + .onSecondCall().returns(now + 1) + .onThirdCall().returns(now + 24 * 60 * 60 * 1000 + 2); + + LifecycleMetrics.onBucketProcessorScanMessageProcessed(log, 'scan-A'); + LifecycleMetrics.onBucketProcessorScanMessageProcessed(log, 'scan-B'); + LifecycleMetrics.onBucketProcessorScanMessageProcessed(log, 'scan-C'); + + assert(incStub.calledThrice); + assert(incStub.thirdCall.calledWithMatch({ + origin: 'bucket_processor', + ['conductor_scan_id']: 'scan-C', + })); + assert(removeStub.callCount >= 2); + assert(log.error.calledOnce); + assert(log.error.calledWithMatch('failed to remove stale bucket processor scan metric')); + assert(log.error.neverCalledWithMatch('failed to update prometheus metrics', { + method: 'LifecycleMetrics.onBucketProcessorScanMessageProcessed', + })); + }); + it('should catch errors in onLifecycleTriggered', () => { LifecycleMetrics.onLifecycleTriggered(log, 'conductor', 'expiration', 'us-east-1', NaN); @@ -169,5 +273,56 @@ describe('LifecycleMetrics', () => { process: 'conductor', })); }); + + it('should set latest batch bucket count metric', () => { + const endMetric = ZenkoMetrics.getMetric( + 's3_lifecycle_latest_batch_end_time'); + const countMetric = ZenkoMetrics.getMetric( + 's3_lifecycle_latest_batch_bucket_count'); + const endSet = sinon.stub(endMetric, 'set'); + const countSet = sinon.stub(countMetric, 'set'); + + LifecycleMetrics.onConductorScanComplete(log, 7); + + assert(endSet.calledOnce); + assert(endSet.calledWithMatch( + { origin: 'conductor' })); + assert.strictEqual(countSet.callCount, 1); + assert(countSet.getCall(0).calledWithMatch( + { origin: 'conductor' }, 7)); + assert(log.error.notCalled); + }); + + it('should set conductor last batch duration when provided', () => { + sinon.stub(ZenkoMetrics.getMetric( + 's3_lifecycle_latest_batch_end_time'), 'set'); + sinon.stub(ZenkoMetrics.getMetric( + 's3_lifecycle_latest_batch_bucket_count'), 'set'); + const durationSet = sinon.stub(ZenkoMetrics.getMetric( + 's3_lifecycle_conductor_last_batch_duration_seconds'), 'set'); + + LifecycleMetrics.onConductorScanComplete(log, 5, 12.5); + + assert(durationSet.calledOnce); + assert(durationSet.calledWithMatch( + { origin: 'conductor' }, 12.5)); + assert(log.error.notCalled); + }); + + it('should not set conductor last batch duration when omitted', () => { + sinon.stub(ZenkoMetrics.getMetric( + 's3_lifecycle_latest_batch_end_time'), 'set'); + sinon.stub(ZenkoMetrics.getMetric( + 's3_lifecycle_latest_batch_bucket_count'), 'set'); + const durationSet = sinon.stub(ZenkoMetrics.getMetric( + 's3_lifecycle_conductor_last_batch_duration_seconds'), 'set'); + + LifecycleMetrics.onConductorScanComplete(log, 5); + LifecycleMetrics.onConductorScanComplete(log, 5, NaN); + LifecycleMetrics.onConductorScanComplete(log, 5, -1); + + assert(durationSet.notCalled); + assert(log.error.notCalled); + }); }); }); diff --git a/tests/utils/BackbeatTestConsumer.js b/tests/utils/BackbeatTestConsumer.js index 00ed404dd..a227f8365 100644 --- a/tests/utils/BackbeatTestConsumer.js +++ b/tests/utils/BackbeatTestConsumer.js @@ -32,6 +32,14 @@ class BackbeatTestConsumer extends BackbeatConsumer { // present assert(parsedMsg.contextInfo?.reqId, 'expected contextInfo.reqId field'); parsedMsg.contextInfo.reqId = expectedMsg.value.contextInfo?.reqId; + // conductorScanId is also generated per scan: check it exists, + // then normalize for comparison + if (expectedMsg.value.contextInfo?.conductorScanId === 'test-scan-id') { + assert(parsedMsg.contextInfo?.conductorScanId, + 'expected contextInfo.conductorScanId field'); + parsedMsg.contextInfo.conductorScanId = + expectedMsg.value.contextInfo.conductorScanId; + } } assert.deepStrictEqual( parsedMsg, expectedMsg.value,