Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ibm_mq/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ files:
value:
example: false
type: boolean
- name: auto_discover_queues_via_names
description: |
Autodiscover queues via names. Discovers all queue names, then collects metrics per queue. More resilient to
individual queue failures (e.g., permissions, broken queues), but uses more resources than bulk collection.
value:
example: false
type: boolean
- name: collect_statistics_metrics
description: |
Collect metrics from Statistics Messages. Statistics collected are:
Expand Down
1 change: 1 addition & 0 deletions ibm_mq/changelog.d/20549.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make queue discovery logic more resilient and add new metric to track broken queues
84 changes: 82 additions & 2 deletions ibm_mq/datadog_checks/ibm_mq/collectors/queue_metric_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,18 @@ def collect_queue_metrics(self, queue_manager):

def discover_queues(self, queue_manager):
# type: (pymqi.QueueManager) -> Set[str]

_discover = (
self._discover_queues_via_names if self.config.auto_discover_queues_via_names else self._discover_queues
)

discovered_queues = set()
if self.config.auto_discover_queues and not self.config.queue_patterns or self.config.queue_regex:
discovered_queues.update(self._discover_queues(queue_manager, '*'))
discovered_queues.update(_discover(queue_manager, '*'))

if self.config.queue_patterns:
for pattern in self.config.queue_patterns:
discovered_queues.update(self._discover_queues(queue_manager, pattern))
discovered_queues.update(_discover(queue_manager, pattern))

if self.config.queue_regex:
keep_queues = set()
Expand All @@ -99,6 +104,7 @@ def discover_queues(self, queue_manager):

def _discover_queues(self, queue_manager, mq_pattern_filter):
# type: (pymqi.QueueManager, str) -> List[str]
self.log.debug("Using _discover_queues to discover queues")
queues = []

for queue_type in SUPPORTED_QUEUE_TYPES:
Expand Down Expand Up @@ -141,6 +147,80 @@ def _discover_queues(self, queue_manager, mq_pattern_filter):

return queues

def _discover_queues_via_names(self, queue_manager, mq_pattern_filter):
# type: (pymqi.QueueManager, str) -> List[str]
self.log.debug("Using _discover_queues_via_names to discover queues")
queues = []

for queue_type in SUPPORTED_QUEUE_TYPES:
args = {pymqi.CMQC.MQCA_Q_NAME: pymqi.ensure_bytes(mq_pattern_filter), pymqi.CMQC.MQIA_Q_TYPE: queue_type}
pcf = None
try:
pcf = pymqi.PCFExecute(
queue_manager, response_wait_interval=self.config.timeout, convert=self.config.convert_endianness
)
# Use MQCMD_INQUIRE_Q_NAMES to get only the queue names rather than the full queue info
response = pcf.MQCMD_INQUIRE_Q_NAMES(args)
queue_names = response[0].get(pymqi.CMQCFC.MQCACF_Q_NAMES, []) if response else []
for queue in queue_names:
queue_name = to_string(queue).strip()
if not queue_name:
self.log.debug('Discovered queue with empty name, skipping.')
continue
# For each queue name inquire the queue info
inquire_args = {
pymqi.CMQC.MQCA_Q_NAME: pymqi.ensure_bytes(queue_name),
pymqi.CMQC.MQIA_Q_TYPE: queue_type,
}
try:
queue_info_response = pcf.MQCMD_INQUIRE_Q(inquire_args)
if queue_info_response:
self.log.debug("Discovered queue: %s", queue_name)
queues.append(queue_name)
except pymqi.MQMIError as e:
# Don't warn if no messages, see:
# https://github.com/dsuch/pymqi/blob/v1.12.0/docs/examples.rst#how-to-wait-for-multiple-messages
if e.comp == pymqi.CMQC.MQCC_FAILED and e.reason == pymqi.CMQC.MQRC_NO_MSG_AVAILABLE:
self.log.debug("No queue info available for queue %s", queue_name)
elif e.comp == pymqi.CMQC.MQCC_FAILED and e.reason == pymqi.CMQC.MQRC_UNKNOWN_OBJECT_NAME:
self.log.debug("No matching queue of type %d for queue %s", queue_type, queue_name)
else:
self.log.debug("Error inquiring queue %s: %s", queue_name, e)
self._submit_discovery_error_metric(e, [f"queue:{queue_name}"])
self.log.debug("%s queues discovered", str(len(queues)))
except pymqi.MQMIError as e:
self.log.debug("Error inquiring queue names for pattern %s: %s", mq_pattern_filter, e)
self._submit_discovery_error_metric(e, [f"queue_pattern:{mq_pattern_filter}"])
except Exception as e:
self.log.debug("Error retrieving queue info for %s: %s", mq_pattern_filter, e)
finally:
# Close internal reply queue to prevent filling up a dead-letter queue.
# https://github.com/dsuch/pymqi/blob/084ab0b2638f9d27303a2844badc76635c4ad6de/code/pymqi/__init__.py#L2892-L2902
# https://dsuch.github.io/pymqi/examples.html#how-to-specify-dynamic-reply-to-queues
if pcf is not None:
pcf.disconnect()

if not queues:
self.warning("No matching queue of type MQQT_LOCAL or MQQT_REMOTE for pattern %s", mq_pattern_filter)

return queues

def _submit_discovery_error_metric(self, error, tags):
error_tags = list(tags)
reason = getattr(error, "reason", None)
if reason is not None:
error_tags.append(f"ibm_error_code:{reason}")
error_str = None
if hasattr(error, "errorAsString"):
try:
error_str = error.errorAsString()
except Exception:
error_str = None
if error_str and ":" in error_str:
error_name = error_str.split(":")[-1].strip()
error_tags.append(f"ibm_error:{error_name}")
self.send_metric(GAUGE, "ibm_mq.queue.discovery.error", 1, tags=error_tags)

def queue_manager_stats(self, queue_manager, tags):
"""
Get stats from the queue manager
Expand Down
1 change: 1 addition & 0 deletions ibm_mq/datadog_checks/ibm_mq/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def __init__(self, instance, init_config):
self.qm_timezone = instance.get('queue_manager_timezone', 'UTC') # type: str
self.auto_discover_channels = instance.get('auto_discover_channels', True) # type: bool
self.use_qm_tz_for_metrics = is_affirmative(instance.get('use_qm_tz_for_metrics', False)) # type: bool
self.auto_discover_queues_via_names = is_affirmative(instance.get('auto_discover_queues_via_names', False)) # type: bool

# Initialize timezone handling
# First validate the timezone if it's not UTC
Expand Down
4 changes: 4 additions & 0 deletions ibm_mq/datadog_checks/ibm_mq/config_models/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ def instance_auto_discover_queues():
return False


def instance_auto_discover_queues_via_names():
return False


def instance_collect_connection_metrics():
return False

Expand Down
1 change: 1 addition & 0 deletions ibm_mq/datadog_checks/ibm_mq/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class InstanceConfig(BaseModel):
)
auto_discover_channels: Optional[bool] = None
auto_discover_queues: Optional[bool] = None
auto_discover_queues_via_names: Optional[bool] = None
channel: str = Field(..., min_length=1)
channel_status_mapping: Optional[MappingProxyType[str, Any]] = None
channels: Optional[tuple[str, ...]] = None
Expand Down
6 changes: 6 additions & 0 deletions ibm_mq/datadog_checks/ibm_mq/data/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ instances:
#
# auto_discover_queues: false

## @param auto_discover_queues_via_names - boolean - optional - default: false
## Autodiscover queues via names. Discovers all queue names, then collects metrics per queue. More resilient to
## individual queue failures (e.g., permissions, broken queues), but uses more resources than bulk collection.
#
# auto_discover_queues_via_names: false

## @param collect_statistics_metrics - boolean - optional - default: false
## Collect metrics from Statistics Messages. Statistics collected are:
## - channel statistics (MQCMD_STATISTICS_CHANNEL)
Expand Down
1 change: 1 addition & 0 deletions ibm_mq/metadata.csv
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ibm_mq.queue.depth_low_limit,gauge,,item,,"This attribute specifies low limit fo
ibm_mq.queue.depth_max,gauge,,message,,"Maximum queue depth (parameter identifier: `MQIA_MAX_Q_DEPTH`). The maximum number of messages allowed on the queue. Note that other factors may cause the queue to be treated as full; for example, it will appear to be full if there is no storage available for a message.",0,ibm_mq,queue max depth,
ibm_mq.queue.depth_max_event,gauge,,event,,Controls whether Queue Full events are generated (parameter identifier: `MQIA_Q_DEPTH_MAX_EVENT`).,0,ibm_mq,messages,
ibm_mq.queue.depth_percent,gauge,,percent,,The percent of the queue that is currently utilized.,0,ibm_mq,queue usage percentage,
ibm_mq.queue.discovery.error,gauge,,error,,Submits a metrics along tagged with error code and error tags when there is an error discovering queues,0,ibm_mq,discovery error,
ibm_mq.queue.harden_get_backout,gauge,,request,,Whether to harden backout count. Specifies whether the count of backed out messages should be saved (hardened) across restarts of the message queue manager (parameter identifier: `MQIA_HARDEN_GET_BACKOUT`).,0,ibm_mq,times messages retrieved,
ibm_mq.queue.high_q_depth,gauge,,message,,This attribute specifies the maximum number of messages on a queue (parameter identifier: `MQIA_HIGH_Q_DEPTH`).,0,ibm_mq,high q depth,
ibm_mq.queue.inhibit_get,gauge,,occurrence,,Whether get operations are allowed (parameter identifier: `MQIA_INHIBIT_GET`).,0,ibm_mq,gets inhibited,
Expand Down
5 changes: 5 additions & 0 deletions ibm_mq/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@
'channels': [CHANNEL, BAD_CHANNEL],
}

INSTANCE_COLLECT_ALL_VIA_NAMES = {
**INSTANCE_COLLECT_ALL,
'auto_discover_queues_via_names': True,
}

INSTANCE_QUEUE_REGEX_TAG = {
'channel': CHANNEL,
'queue_manager': QUEUE_MANAGER,
Expand Down
5 changes: 5 additions & 0 deletions ibm_mq/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ def instance_collect_all():
return copy.deepcopy(common.INSTANCE_COLLECT_ALL)


@pytest.fixture
def instance_collect_all_via_names():
return copy.deepcopy(common.INSTANCE_COLLECT_ALL_VIA_NAMES)


@pytest.fixture
def instance_queue_regex_tag():
return copy.deepcopy(common.INSTANCE_QUEUE_REGEX_TAG)
Expand Down
7 changes: 7 additions & 0 deletions ibm_mq/tests/test_ibm_mq_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ def test_e2e_check_all(dd_agent_check, instance_collect_all):
aggregator.assert_metrics_using_metadata(get_metadata_metrics())


def test_e2e_check_all_via_names(dd_agent_check, instance_collect_all_via_names):
aggregator = dd_agent_check(instance_collect_all_via_names, rate=True)

assert_all_metrics(aggregator)
aggregator.assert_metrics_using_metadata(get_metadata_metrics())


@pytest.mark.skipif(
MQ_VERSION < 9, reason='Only test for for version >=9, for v8 use a custom image with custom setup.'
)
Expand Down
Loading
Loading