Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ibm-mq-metrics/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ val ibmClientJar: Configuration by configurations.creating {
}

dependencies {
api("com.google.auto.value:auto-value-annotations:1.11.1")
api("com.google.code.findbugs:jsr305:3.0.2")
api("io.swagger:swagger-annotations:1.6.16")
api("org.jetbrains:annotations:26.1.0")
Expand All @@ -33,6 +34,7 @@ dependencies {
implementation("org.slf4j:slf4j-simple:2.0.17")
testImplementation("com.google.guava:guava")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
annotationProcessor("com.google.auto.value:auto-value:1.11.1")
ibmClientJar("com.ibm.mq:com.ibm.mq.allclient:9.4.5.1") {
artifact {
name = "com.ibm.mq.allclient"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.ibm.mq.WmqMonitor;
import io.opentelemetry.ibm.mq.config.QueueManager;
import io.opentelemetry.ibm.mq.metrics.MetricProducer;
import io.opentelemetry.ibm.mq.opentelemetry.ConfigWrapper;
import java.util.List;
import java.util.Map;
Expand All @@ -26,12 +26,12 @@ class TestWMQMonitor {

private final ConfigWrapper config;
private final ExecutorService threadPool;
private final Meter meter;
private final MetricProducer producer;

TestWMQMonitor(ConfigWrapper config, Meter meter, ExecutorService service) {
TestWMQMonitor(ConfigWrapper config, MetricProducer producer, ExecutorService service) {
this.config = config;
this.threadPool = service;
this.meter = meter;
this.producer = producer;
}

/**
Expand All @@ -47,7 +47,7 @@ void runTest() {
assertThat(queueManagers).isNotNull();
ObjectMapper mapper = new ObjectMapper();

WmqMonitor wmqTask = new WmqMonitor(config, threadPool, meter);
WmqMonitor wmqTask = new WmqMonitor(config, threadPool, producer);

// we override this helper to pass in our opentelemetry helper instead.
for (Map<String, ?> queueManager : queueManagers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import com.ibm.mq.headers.pcf.PCFMessageAgent;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.ibm.mq.config.QueueManager;
import io.opentelemetry.ibm.mq.metrics.MetricProducer;
import io.opentelemetry.ibm.mq.opentelemetry.ConfigWrapper;
import io.opentelemetry.ibm.mq.opentelemetry.Main;
import io.opentelemetry.ibm.mq.util.WmqUtil;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
import io.opentelemetry.sdk.resources.Resource;
import java.io.File;
import java.net.URISyntaxException;
import java.net.URL;
Expand All @@ -50,7 +51,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,9 +60,6 @@ class WMQMonitorIntegrationTest {

private static final Logger logger = LoggerFactory.getLogger(WMQMonitorIntegrationTest.class);

@RegisterExtension
static final OpenTelemetryExtension otelTesting = OpenTelemetryExtension.create();

private static final ExecutorService service =
Executors.newFixedThreadPool(
4, /* one gets burned with our @BeforeAll message uzi, 4 is faster than 2 */
Expand Down Expand Up @@ -183,11 +180,13 @@ void test_monitor_with_full_config() throws Exception {
String configFile = getConfigFile("conf/test-config.yml");

ConfigWrapper config = ConfigWrapper.parse(configFile);
Meter meter = otelTesting.getOpenTelemetry().getMeter("opentelemetry.io/mq");
TestWMQMonitor monitor = new TestWMQMonitor(config, meter, service);
MetricProducer producer =
new MetricProducer(Resource.empty(), InstrumentationScopeInfo.empty());

TestWMQMonitor monitor = new TestWMQMonitor(config, producer, service);
monitor.runTest();

List<MetricData> data = otelTesting.getMetrics();
List<MetricData> data = producer.produce(Resource.empty());
Map<String, MetricData> metrics = new HashMap<>();
for (MetricData metricData : data) {
metrics.put(metricData.getName(), metricData);
Expand Down Expand Up @@ -244,9 +243,9 @@ void test_monitor_with_full_config() throws Exception {
void test_wmqmonitor() throws Exception {
String configFile = getConfigFile("conf/test-queuemgr-config.yml");
ConfigWrapper config = ConfigWrapper.parse(configFile);
Meter meter = otelTesting.getOpenTelemetry().getMeter("opentelemetry.io/mq");

TestWMQMonitor monitor = new TestWMQMonitor(config, meter, service);
MetricProducer producer =
new MetricProducer(Resource.empty(), InstrumentationScopeInfo.empty());
TestWMQMonitor monitor = new TestWMQMonitor(config, producer, service);
monitor.runTest();
// TODO: Wait why are there no asserts here?
}
Expand All @@ -257,14 +256,17 @@ void test_otlphttp() throws Exception {
ConfigWrapper.parse(WMQMonitorIntegrationTest.getConfigFile("conf/test-config.yml"));
ScheduledExecutorService service =
Executors.newScheduledThreadPool(config.getNumberOfThreads());
Main.run(config, service, otelTesting.getOpenTelemetry());
MetricProducer producer =
new MetricProducer(Resource.empty(), InstrumentationScopeInfo.empty());

Main.run(config, service, producer);
CountDownLatch latch = new CountDownLatch(1);
Future<?> ignored = service.submit(latch::countDown);
Thread.sleep(5000); // TODO: This is fragile and time consuming and should be made better
service.shutdown();
assertTrue(service.awaitTermination(30, TimeUnit.SECONDS));

List<MetricData> data = otelTesting.getMetrics();
List<MetricData> data = producer.produce(Resource.empty());
Set<String> metricNames = new HashSet<>();
for (MetricData metricData : data) {
metricNames.add(metricData.getName());
Expand All @@ -290,11 +292,12 @@ void test_bad_connection() throws Exception {
String configFile = getConfigFile("conf/test-bad-config.yml");

ConfigWrapper config = ConfigWrapper.parse(configFile);
Meter meter = otelTesting.getOpenTelemetry().getMeter("opentelemetry.io/mq");
TestWMQMonitor monitor = new TestWMQMonitor(config, meter, service);
MetricProducer producer =
new MetricProducer(Resource.empty(), InstrumentationScopeInfo.empty());
TestWMQMonitor monitor = new TestWMQMonitor(config, producer, service);
monitor.runTest();

List<MetricData> data = otelTesting.getMetrics();
List<MetricData> data = producer.produce(Resource.empty());

assertThat(data).isNotEmpty();
assertThat(data).hasSize(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.headers.pcf.PCFMessageAgent;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongGauge;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.ibm.mq.config.QueueManager;
import io.opentelemetry.ibm.mq.metrics.Metrics;
import io.opentelemetry.ibm.mq.metrics.MetricProducer;
import io.opentelemetry.ibm.mq.metrics.MetricsConfig;
import io.opentelemetry.ibm.mq.metricscollector.ChannelMetricsCollector;
import io.opentelemetry.ibm.mq.metricscollector.InquireChannelCmdCollector;
Expand Down Expand Up @@ -49,15 +46,15 @@ public final class WmqMonitor {

private final List<QueueManager> queueManagers;
private final List<Consumer<MetricsCollectorContext>> jobs = new ArrayList<>();
private final LongCounter errorCodesCounter;
private final LongGauge heartbeatGauge;
private final ExecutorService threadPool;
private final MetricsConfig metricsConfig;
private final MetricProducer producer;

public WmqMonitor(ConfigWrapper config, ExecutorService threadPool, Meter meter) {
public WmqMonitor(ConfigWrapper config, ExecutorService threadPool, MetricProducer producer) {
List<Map<String, ?>> queueManagers = getQueueManagers(config);
ObjectMapper mapper = new ObjectMapper();

this.producer = producer;
this.queueManagers = new ArrayList<>();

for (Map<String, ?> queueManager : queueManagers) {
Expand All @@ -70,21 +67,18 @@ public WmqMonitor(ConfigWrapper config, ExecutorService threadPool, Meter meter)
}

this.metricsConfig = new MetricsConfig(config);

this.heartbeatGauge = Metrics.createIbmMqHeartbeat(meter);
this.errorCodesCounter = Metrics.createIbmMqConnectionErrors(meter);
this.threadPool = threadPool;

jobs.add(new QueueManagerMetricsCollector(meter));
jobs.add(new InquireQueueManagerCmdCollector(meter));
jobs.add(new ChannelMetricsCollector(meter));
jobs.add(new InquireChannelCmdCollector(meter));
jobs.add(new QueueMetricsCollector(meter, threadPool, config));
jobs.add(new ListenerMetricsCollector(meter));
jobs.add(new TopicMetricsCollector(meter));
jobs.add(new ReadConfigurationEventQueueCollector(meter));
jobs.add(new PerformanceEventQueueCollector(meter));
jobs.add(new QueueManagerEventCollector(meter));
jobs.add(new QueueManagerMetricsCollector(producer));
jobs.add(new InquireQueueManagerCmdCollector(producer));
jobs.add(new ChannelMetricsCollector(producer));
jobs.add(new InquireChannelCmdCollector(producer));
jobs.add(new QueueMetricsCollector(producer, threadPool, config));
jobs.add(new ListenerMetricsCollector(producer));
jobs.add(new TopicMetricsCollector(producer));
jobs.add(new ReadConfigurationEventQueueCollector(producer));
jobs.add(new PerformanceEventQueueCollector(producer));
jobs.add(new QueueManagerEventCollector(producer));
}

public void run() {
Expand Down Expand Up @@ -115,12 +109,12 @@ public void run(QueueManager queueManager) {
if (e.getCause() instanceof MQException) {
MQException mqe = (MQException) e.getCause();
String errorCode = String.valueOf(mqe.getReason());
errorCodesCounter.add(
producer.recordIbmMqConnectionErrors(
1, Attributes.of(IBM_MQ_QUEUE_MANAGER, queueManagerName, ERROR_CODE, errorCode));
}
} finally {
if (this.metricsConfig.isIbmMqHeartbeatEnabled()) {
heartbeatGauge.set(
producer.recordIbmMqHeartbeat(
heartBeatMetricValue, Attributes.of(IBM_MQ_QUEUE_MANAGER, queueManagerName));
}
cleanUp(ibmQueueManager, agent);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.ibm.mq.metrics;

import com.google.auto.value.AutoValue;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.Data;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.resources.Resource;
import javax.annotation.concurrent.Immutable;

@Immutable
@AutoValue
abstract class MetricData implements io.opentelemetry.sdk.metrics.data.MetricData {

static MetricData createMetricData(
Resource resource,
InstrumentationScopeInfo instrumentationScopeInfo,
String name,
String description,
String unit,
MetricDataType type,
Data<?> data) {
return new AutoValue_MetricData(
resource, instrumentationScopeInfo, name, description, unit, type, data);
}
}
Loading
Loading