Skip to content

Commit ae958bf

Browse files
authored
Make queue discovery logic more resilient (DataDog#20549)
* refactor queue discovery * line too long lint * changelog * Rename 20550.added to 20549.added * keep original error handling * unit tests * comments * make feature togglable * sync * test for logic * refactor test * metadata.csv entry * remove unneeded continues * add e2e test * add e2e fixture * test refactor and add e2e to ensure nothing breaks
1 parent 270ac3e commit ae958bf

12 files changed

Lines changed: 345 additions & 3 deletions

File tree

ibm_mq/assets/configuration/spec.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,13 @@ files:
168168
value:
169169
example: false
170170
type: boolean
171+
- name: auto_discover_queues_via_names
172+
description: |
173+
Autodiscover queues via names. Discovers all queue names, then collects metrics per queue. More resilient to
174+
individual queue failures (e.g., permissions, broken queues), but uses more resources than bulk collection.
175+
value:
176+
example: false
177+
type: boolean
171178
- name: collect_statistics_metrics
172179
description: |
173180
Collect metrics from Statistics Messages. Statistics collected are:

ibm_mq/changelog.d/20549.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Make queue discovery logic more resilient and add new metric to track broken queues

ibm_mq/datadog_checks/ibm_mq/collectors/queue_metric_collector.py

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,18 @@ def collect_queue_metrics(self, queue_manager):
7575

7676
def discover_queues(self, queue_manager):
7777
# type: (pymqi.QueueManager) -> Set[str]
78+
79+
_discover = (
80+
self._discover_queues_via_names if self.config.auto_discover_queues_via_names else self._discover_queues
81+
)
82+
7883
discovered_queues = set()
7984
if self.config.auto_discover_queues and not self.config.queue_patterns or self.config.queue_regex:
80-
discovered_queues.update(self._discover_queues(queue_manager, '*'))
85+
discovered_queues.update(_discover(queue_manager, '*'))
8186

8287
if self.config.queue_patterns:
8388
for pattern in self.config.queue_patterns:
84-
discovered_queues.update(self._discover_queues(queue_manager, pattern))
89+
discovered_queues.update(_discover(queue_manager, pattern))
8590

8691
if self.config.queue_regex:
8792
keep_queues = set()
@@ -99,6 +104,7 @@ def discover_queues(self, queue_manager):
99104

100105
def _discover_queues(self, queue_manager, mq_pattern_filter):
101106
# type: (pymqi.QueueManager, str) -> List[str]
107+
self.log.debug("Using _discover_queues to discover queues")
102108
queues = []
103109

104110
for queue_type in SUPPORTED_QUEUE_TYPES:
@@ -141,6 +147,80 @@ def _discover_queues(self, queue_manager, mq_pattern_filter):
141147

142148
return queues
143149

150+
def _discover_queues_via_names(self, queue_manager, mq_pattern_filter):
151+
# type: (pymqi.QueueManager, str) -> List[str]
152+
self.log.debug("Using _discover_queues_via_names to discover queues")
153+
queues = []
154+
155+
for queue_type in SUPPORTED_QUEUE_TYPES:
156+
args = {pymqi.CMQC.MQCA_Q_NAME: pymqi.ensure_bytes(mq_pattern_filter), pymqi.CMQC.MQIA_Q_TYPE: queue_type}
157+
pcf = None
158+
try:
159+
pcf = pymqi.PCFExecute(
160+
queue_manager, response_wait_interval=self.config.timeout, convert=self.config.convert_endianness
161+
)
162+
# Use MQCMD_INQUIRE_Q_NAMES to get only the queue names rather than the full queue info
163+
response = pcf.MQCMD_INQUIRE_Q_NAMES(args)
164+
queue_names = response[0].get(pymqi.CMQCFC.MQCACF_Q_NAMES, []) if response else []
165+
for queue in queue_names:
166+
queue_name = to_string(queue).strip()
167+
if not queue_name:
168+
self.log.debug('Discovered queue with empty name, skipping.')
169+
continue
170+
# For each queue name inquire the queue info
171+
inquire_args = {
172+
pymqi.CMQC.MQCA_Q_NAME: pymqi.ensure_bytes(queue_name),
173+
pymqi.CMQC.MQIA_Q_TYPE: queue_type,
174+
}
175+
try:
176+
queue_info_response = pcf.MQCMD_INQUIRE_Q(inquire_args)
177+
if queue_info_response:
178+
self.log.debug("Discovered queue: %s", queue_name)
179+
queues.append(queue_name)
180+
except pymqi.MQMIError as e:
181+
# Don't warn if no messages, see:
182+
# https://github.com/dsuch/pymqi/blob/v1.12.0/docs/examples.rst#how-to-wait-for-multiple-messages
183+
if e.comp == pymqi.CMQC.MQCC_FAILED and e.reason == pymqi.CMQC.MQRC_NO_MSG_AVAILABLE:
184+
self.log.debug("No queue info available for queue %s", queue_name)
185+
elif e.comp == pymqi.CMQC.MQCC_FAILED and e.reason == pymqi.CMQC.MQRC_UNKNOWN_OBJECT_NAME:
186+
self.log.debug("No matching queue of type %d for queue %s", queue_type, queue_name)
187+
else:
188+
self.log.debug("Error inquiring queue %s: %s", queue_name, e)
189+
self._submit_discovery_error_metric(e, [f"queue:{queue_name}"])
190+
self.log.debug("%s queues discovered", str(len(queues)))
191+
except pymqi.MQMIError as e:
192+
self.log.debug("Error inquiring queue names for pattern %s: %s", mq_pattern_filter, e)
193+
self._submit_discovery_error_metric(e, [f"queue_pattern:{mq_pattern_filter}"])
194+
except Exception as e:
195+
self.log.debug("Error retrieving queue info for %s: %s", mq_pattern_filter, e)
196+
finally:
197+
# Close internal reply queue to prevent filling up a dead-letter queue.
198+
# https://github.com/dsuch/pymqi/blob/084ab0b2638f9d27303a2844badc76635c4ad6de/code/pymqi/__init__.py#L2892-L2902
199+
# https://dsuch.github.io/pymqi/examples.html#how-to-specify-dynamic-reply-to-queues
200+
if pcf is not None:
201+
pcf.disconnect()
202+
203+
if not queues:
204+
self.warning("No matching queue of type MQQT_LOCAL or MQQT_REMOTE for pattern %s", mq_pattern_filter)
205+
206+
return queues
207+
208+
def _submit_discovery_error_metric(self, error, tags):
209+
error_tags = list(tags)
210+
reason = getattr(error, "reason", None)
211+
if reason is not None:
212+
error_tags.append(f"ibm_error_code:{reason}")
213+
error_str = None
214+
if hasattr(error, "errorAsString"):
215+
try:
216+
error_str = error.errorAsString()
217+
except Exception:
218+
error_str = None
219+
if error_str and ":" in error_str:
220+
error_name = error_str.split(":")[-1].strip()
221+
error_tags.append(f"ibm_error:{error_name}")
222+
self.send_metric(GAUGE, "ibm_mq.queue.discovery.error", 1, tags=error_tags)
223+
144224
def queue_manager_stats(self, queue_manager, tags):
145225
"""
146226
Get stats from the queue manager

ibm_mq/datadog_checks/ibm_mq/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ def __init__(self, instance, init_config):
9898
self.qm_timezone = instance.get('queue_manager_timezone', 'UTC') # type: str
9999
self.auto_discover_channels = instance.get('auto_discover_channels', True) # type: bool
100100
self.use_qm_tz_for_metrics = is_affirmative(instance.get('use_qm_tz_for_metrics', False)) # type: bool
101+
self.auto_discover_queues_via_names = is_affirmative(instance.get('auto_discover_queues_via_names', False)) # type: bool
101102

102103
# Initialize timezone handling
103104
# First validate the timezone if it's not UTC

ibm_mq/datadog_checks/ibm_mq/config_models/defaults.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ def instance_auto_discover_queues():
2020
return False
2121

2222

23+
def instance_auto_discover_queues_via_names():
24+
return False
25+
26+
2327
def instance_collect_connection_metrics():
2428
return False
2529

ibm_mq/datadog_checks/ibm_mq/config_models/instance.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class InstanceConfig(BaseModel):
3737
)
3838
auto_discover_channels: Optional[bool] = None
3939
auto_discover_queues: Optional[bool] = None
40+
auto_discover_queues_via_names: Optional[bool] = None
4041
channel: str = Field(..., min_length=1)
4142
channel_status_mapping: Optional[MappingProxyType[str, Any]] = None
4243
channels: Optional[tuple[str, ...]] = None

ibm_mq/datadog_checks/ibm_mq/data/conf.yaml.example

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ instances:
150150
#
151151
# auto_discover_queues: false
152152

153+
## @param auto_discover_queues_via_names - boolean - optional - default: false
154+
## Autodiscover queues via names. Discovers all queue names, then collects metrics per queue. More resilient to
155+
## individual queue failures (e.g., permissions, broken queues), but uses more resources than bulk collection.
156+
#
157+
# auto_discover_queues_via_names: false
158+
153159
## @param collect_statistics_metrics - boolean - optional - default: false
154160
## Collect metrics from Statistics Messages. Statistics collected are:
155161
## - channel statistics (MQCMD_STATISTICS_CHANNEL)

ibm_mq/metadata.csv

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ ibm_mq.queue.depth_low_limit,gauge,,item,,"This attribute specifies low limit fo
3838
ibm_mq.queue.depth_max,gauge,,message,,"Maximum queue depth (parameter identifier: `MQIA_MAX_Q_DEPTH`). The maximum number of messages allowed on the queue. Note that other factors may cause the queue to be treated as full; for example, it will appear to be full if there is no storage available for a message.",0,ibm_mq,queue max depth,
3939
ibm_mq.queue.depth_max_event,gauge,,event,,Controls whether Queue Full events are generated (parameter identifier: `MQIA_Q_DEPTH_MAX_EVENT`).,0,ibm_mq,messages,
4040
ibm_mq.queue.depth_percent,gauge,,percent,,The percent of the queue that is currently utilized.,0,ibm_mq,queue usage percentage,
41+
ibm_mq.queue.discovery.error,gauge,,error,,Submits a metrics along tagged with error code and error tags when there is an error discovering queues,0,ibm_mq,discovery error,
4142
ibm_mq.queue.harden_get_backout,gauge,,request,,Whether to harden backout count. Specifies whether the count of backed out messages should be saved (hardened) across restarts of the message queue manager (parameter identifier: `MQIA_HARDEN_GET_BACKOUT`).,0,ibm_mq,times messages retrieved,
4243
ibm_mq.queue.high_q_depth,gauge,,message,,This attribute specifies the maximum number of messages on a queue (parameter identifier: `MQIA_HIGH_Q_DEPTH`).,0,ibm_mq,high q depth,
4344
ibm_mq.queue.inhibit_get,gauge,,occurrence,,Whether get operations are allowed (parameter identifier: `MQIA_INHIBIT_GET`).,0,ibm_mq,gets inhibited,

ibm_mq/tests/common.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@
134134
'channels': [CHANNEL, BAD_CHANNEL],
135135
}
136136

137+
INSTANCE_COLLECT_ALL_VIA_NAMES = {
138+
**INSTANCE_COLLECT_ALL,
139+
'auto_discover_queues_via_names': True,
140+
}
141+
137142
INSTANCE_QUEUE_REGEX_TAG = {
138143
'channel': CHANNEL,
139144
'queue_manager': QUEUE_MANAGER,

ibm_mq/tests/conftest.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ def instance_collect_all():
5555
return copy.deepcopy(common.INSTANCE_COLLECT_ALL)
5656

5757

58+
@pytest.fixture
59+
def instance_collect_all_via_names():
60+
return copy.deepcopy(common.INSTANCE_COLLECT_ALL_VIA_NAMES)
61+
62+
5863
@pytest.fixture
5964
def instance_queue_regex_tag():
6065
return copy.deepcopy(common.INSTANCE_QUEUE_REGEX_TAG)

0 commit comments

Comments
 (0)