Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example-apps/collector/.env
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# All default settings defined here can be overridden by environment variables.

# MODE=npm
MODE=local
APP_PORT=2807
SENSOR_ENABLED=true
TRACING_ENABLED=true
Expand Down
3 changes: 3 additions & 0 deletions example-apps/collector/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ if (config.mode === 'npm') {
packageToRequire = '@instana/collector';
}

process.env.INSTANA_METRICS_TRANSMISSION_DELAY = 5000;
process.env.INSTANA_OTLP_FORMAT = 'true';

if (config.collectorEnabled) {
console.log(`enabling @instana/collector (requiring ${packageToRequire})`);
require(packageToRequire)({
Expand Down
179 changes: 151 additions & 28 deletions packages/collector/src/agentConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const pathUtil = require('path');
const circularReferenceRemover = require('./util/removeCircular');
const agentOpts = require('./agent/opts');
const cmdline = require('./cmdline');

const otlpTransformer = require('./otlpTransformer');
/** @typedef {import('@instana/core/src/core').InstanaBaseSpan} InstanaBaseSpan */

/** @type {import('@instana/core/src/core').GenericLogger} */
Expand Down Expand Up @@ -307,20 +307,55 @@ function checkWhetherResponseForPathIsOkay(path, cb) {
exports.sendMetrics = function sendMetrics(data, cb) {
cb = util.atMostOnce('callback for sendMetrics', cb);

sendData(`/com.instana.plugin.nodejs.${pidStore.pid}`, data, (err, body) => {
// Zeige nur die ersten 2 Keys für Debugging
const dataKeys = Object.keys(data);
const firstTwoKeys = {};
for (let i = 0; i < Math.min(2, dataKeys.length); i++) {
firstTwoKeys[dataKeys[i]] = data[dataKeys[i]];
}

logger.debug(`sendMetrics called with data (first 2 keys): ${JSON.stringify(firstTwoKeys)}`);

// Transform Instana metrics to OTLP format
const otlpMetrics = otlpTransformer.transformMetrics(data);

// Zeige nur die ersten 2 Metriken für Debugging
let otlpPreview = otlpMetrics;
if (otlpMetrics.resourceMetrics && otlpMetrics.resourceMetrics.length > 0) {
const firstResource = otlpMetrics.resourceMetrics[0];
if (firstResource.scopeMetrics && firstResource.scopeMetrics.length > 0) {
const metrics = firstResource.scopeMetrics[0].metrics;
if (metrics && metrics.length > 2) {
otlpPreview = {
resourceMetrics: [
{
...firstResource,
scopeMetrics: [
{
...firstResource.scopeMetrics[0],
metrics: metrics.slice(0, 2)
}
]
}
],
totalMetrics: metrics.length
};
}
}
}

logger.debug(`Transformed to OTLP (first 2 metrics) ${JSON.stringify(otlpPreview)}`);

// Send directly without using sendData (which would transform again)
sendOtlpData('/v1/metrics', otlpMetrics, err => {
if (err) {
logger.error('Error sending metrics:', err);
cb(err, null);
} else {
try {
// 2016-09-11
// Older sensor versions will not repond with a JSON
// structure. Support a smooth update path.
body = JSON.parse(body);
} catch (e) {
body = [];
}

cb(null, body);
logger.debug('Metrics sent successfully');
// OTLP endpoints don't return requests like the old Instana endpoint
// Always return empty array for compatibility
cb(null, []);
}
});
};
Expand All @@ -344,7 +379,7 @@ exports.sendSpans = function sendSpans(spans, cb) {
cb(err);
});

sendData(`/com.instana.plugin.nodejs/traces.${pidStore.pid}`, spans, callback, true);
sendData('/v1/traces', spans, callback, true);
};

/**
Expand Down Expand Up @@ -425,7 +460,8 @@ exports.sendTracingMetricsToAgent = function sendTracingMetricsToAgent(tracingMe
cb(err);
});

sendData('/tracermetrics', tracingMetrics, callback);
// sendData('/tracermetrics', tracingMetrics, callback);
cb();
};

/**
Expand All @@ -437,8 +473,13 @@ exports.sendTracingMetricsToAgent = function sendTracingMetricsToAgent(tracingMe
*/
function sendData(path, data, cb, ignore404 = false) {
cb = util.atMostOnce(`callback for sendData: ${path}`, cb);
console.log(JSON.stringify(data));
// Transform Instana format to OTLP format
const otlpFormat = otlpTransformer(data);

console.log(JSON.stringify(otlpFormat));

const payloadAsString = JSON.stringify(data, circularReferenceRemover());
const payloadAsString = JSON.stringify(otlpFormat, circularReferenceRemover());
if (typeof logger.trace === 'function') {
logger.trace(`Sending data to ${path}.`);
} else {
Expand All @@ -455,7 +496,7 @@ function sendData(path, data, cb, ignore404 = false) {
const req = http.request(
{
host: agentOpts.host,
port: agentOpts.port,
port: 4318,
path,
method: 'POST',
agent: http.agent,
Expand All @@ -465,24 +506,26 @@ function sendData(path, data, cb, ignore404 = false) {
}
},
res => {
if (res.statusCode < 200 || res.statusCode >= 300) {
if (res.statusCode !== 404 || !ignore404) {
const statusCodeError = new Error(
`Failed to send data to agent via POST ${path}. Got status code ${res.statusCode}.`
);
// @ts-ignore
statusCodeError.statusCode = res.statusCode;
cb(statusCodeError);
return;
}
}

res.setEncoding('utf8');
let responseBody = '';
res.on('data', chunk => {
responseBody += chunk;
});
res.on('end', () => {
console.log(responseBody);

if (res.statusCode < 200 || res.statusCode >= 300) {
if (res.statusCode !== 404 || !ignore404) {
const statusCodeError = new Error(
`Failed to send data to agent via POST ${path}. Got status code ${res.statusCode}.`
);
// @ts-ignore
statusCodeError.statusCode = res.statusCode;
cb(statusCodeError);
return;
}
}

cb(null, responseBody);
});
}
Expand All @@ -509,6 +552,86 @@ function sendData(path, data, cb, ignore404 = false) {
req.end();
}

/**
* Sendet bereits transformierte OTLP-Daten an den Agent
* @param {string} path - API path
* @param {Object} otlpData - Already transformed OTLP data
* @param {(...args: *) => *} cb - Callback
* @param {boolean} [ignore404]
*/
function sendOtlpData(path, otlpData, cb, ignore404 = false) {
cb = util.atMostOnce(`callback for sendOtlpData: ${path}`, cb);

const payloadAsString = JSON.stringify(otlpData, circularReferenceRemover());
if (typeof logger.trace === 'function') {
logger.trace(`Sending OTLP data to ${path}.`);
} else {
logger.debug(`Sending OTLP data to ${path}, ${agentOpts}`);
}

// Convert payload to a buffer to correctly identify content-length ahead of time.
const payload = Buffer.from(payloadAsString, 'utf8');
if (payload.length > maxContentLength) {
const error = new PayloadTooLargeError(`Request payload is too large. Will not send data to agent. (POST ${path})`);
return setImmediate(cb.bind(null, error));
}

const req = http.request(
{
host: agentOpts.host,
port: 4318,
path,
method: 'POST',
agent: http.agent,
headers: {
'Content-Type': 'application/json; charset=UTF-8',
'Content-Length': payload.length
}
},
res => {
res.setEncoding('utf8');
let responseBody = '';
res.on('data', chunk => {
responseBody += chunk;
});
res.on('end', () => {
if (res.statusCode < 200 || res.statusCode >= 300) {
if (ignore404 && res.statusCode === 404) {
return cb(null, responseBody);
}
return cb(
new Error(
`Failed to send data to agent via POST ${path}. ` +
`Got status code ${res.statusCode}. Response: ${responseBody}`
),
responseBody
);
}
cb(null, responseBody);
});
}
);

req.setTimeout(agentOpts.requestTimeout, function onTimeout() {
if (req.destroyed) {
return;
}

req.destroy(new Error(`Sending data to agent via POST ${path}. Request timeout.`));
});

req.on('error', err => {
if (req.destroyed) {
return;
}

cb(new Error(`Send OTLP data to agent via POST ${path}. Request failed: ${err.message}`));
});

req.write(payload);
req.end();
}

exports.isConnected = function () {
return isConnected;
};
Expand Down
Loading