Skip to content

Commit 6d841ef

Browse files
committed
feat(instrumentations/amqp): add basic amqp isntrumentation
1 parent acee2eb commit 6d841ef

3 files changed

Lines changed: 170 additions & 18 deletions

File tree

lib/instrumentations/amqplib.js

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,63 @@
11
var shimmer = require('../utils/shimmer')
22

3-
function connectionWrapper (Connection, agent) {
4-
shimmer.wrap(Connection.Connection.prototype, 'sendMessage', function (original) {
5-
return function (channel, Method, fields, Properties, props, content) {
6-
return original.apply(this, arguments)
3+
var defaults = require('lodash.defaults')
4+
var get = require('lodash.get')
5+
var microtime = require('../optionalDependencies/microtime')
6+
var debug = require('debug')('risingstack/trace:instrumentations:amqplib')
7+
var format = require('util').format
8+
9+
function consumerWrapper (agent, original) {
10+
return function (queue, callback, options, cb0) {
11+
var orig = callback
12+
var wrapped = function (msg) {
13+
var originTimestamp = parseInt(get(msg, 'properties.headers[\'x-client-send\']'), 10)
14+
var parentServiceKey = parseInt(get(msg, 'properties.headers[\'x-parent\']'), 10)
15+
var timestamp = microtime.now()
16+
var transportDelay = isNaN(originTimestamp) ? undefined : timestamp - originTimestamp
17+
debug('#consumerWrapper', format('incomingEdge [%s %s %s]', 'amqp', parentServiceKey, transportDelay))
18+
agent.incomingEdgeMetrics.report({
19+
serviceKey: parentServiceKey,
20+
protocol: 'amqp',
21+
transportDelay: transportDelay
22+
})
23+
if (typeof orig === 'function') {
24+
return orig.apply(this, arguments)
25+
}
26+
}
27+
var args = Array.prototype.slice.apply(arguments)
28+
args[1] = wrapped
29+
return original.apply(this, args)
30+
}
31+
}
32+
33+
function sendMessageWrapper (agent, original) {
34+
return function (channel, Method, fields, Properties, props, content) {
35+
var traceHeaders = {
36+
'x-request-id': agent.getRequestId() || agent.generateRequestId(),
37+
'x-span-id': agent.generateCommId(),
38+
'x-client-send': String(microtime.now()),
39+
'x-parent': String(agent.getServiceKey())
740
}
8-
})
41+
debug('#sendMessageWrapper', 'sending instrumented message...')
42+
43+
defaults(fields.headers, traceHeaders)
44+
45+
return original.apply(this, arguments)
46+
}
47+
}
48+
49+
function connectionWrapper (Connection, agent) {
50+
shimmer.wrap(Connection.Connection.prototype, 'sendMessage', sendMessageWrapper.bind(null, agent))
951
return Connection
1052
}
1153

1254
function callbackModelWrapper (CallbackModel, agent) {
13-
shimmer.wrap(CallbackModel.Channel.prototype, 'consume', function (original) {
14-
return function (queue, callback, options, cb0) {
15-
return original.apply(this, arguments)
16-
}
17-
})
55+
shimmer.wrap(CallbackModel.Channel.prototype, 'consume', consumerWrapper.bind(null, agent))
1856
return CallbackModel
1957
}
2058

2159
function channelModelWrapper (ChannelModel, agent) {
22-
shimmer.wrap(ChannelModel.Channel.prototype, 'consume', function (original) {
23-
return function (queue, callback, options) {
24-
return original.apply(this, arguments)
25-
}
26-
})
60+
shimmer.wrap(ChannelModel.Channel.prototype, 'consume', consumerWrapper.bind(null, agent))
2761
return ChannelModel
2862
}
2963

@@ -40,19 +74,19 @@ module.exports = {
4074
instrumentations: [{
4175
path: 'amqplib',
4276
pre: function () {
43-
require('amqplib/lib/channel_model')
77+
require('amqplib/channel_api')
4478
return Array.prototype.slice.call(arguments, 2)
4579
}
4680
}, {
4781
path: 'amqplib/channel_api',
4882
pre: function () {
49-
require('amqplib/lib/connection')
83+
require('amqplib/lib/channel_model')
5084
return Array.prototype.slice.call(arguments, 2)
5185
}
5286
}, {
5387
path: 'amqplib/callback_api',
5488
pre: function () {
55-
require('amqplib/lib/connection')
89+
require('amqplib/lib/callback_model')
5690
return Array.prototype.slice.call(arguments, 2)
5791
}
5892
}, {

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
"lodash.defaults": "4.2.0",
5050
"lodash.find": "4.6.0",
5151
"lodash.flatmap": "4.5.0",
52+
"lodash.get": "4.4.2",
5253
"lodash.isnumber": "3.0.3",
5354
"node-uuid": "1.4.7",
5455
"qs": "6.2.1",
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
'use strict'
2+
3+
var expect = require('chai').expect
4+
var wrapper = require('@risingstack/trace/lib/instrumentations/amqplib')
5+
var microtime = require('@risingstack/trace/lib/optionalDependencies/microtime')
6+
7+
describe('amqplib module wrapper', function () {
8+
beforeEach(function () {
9+
delete require.cache[require.resolve('amqplib/lib/connection')]
10+
delete require.cache[require.resolve('amqplib/lib/callback_model')]
11+
delete require.cache[require.resolve('amqplib/lib/channel_model')]
12+
delete require.cache[require.resolve('amqplib/callback_api')]
13+
delete require.cache[require.resolve('amqplib/channel_api')]
14+
delete require.cache[require.resolve('amqplib')]
15+
})
16+
17+
describe('channel api', function () {
18+
beforeEach(function () {
19+
delete require.cache[require.resolve('amqplib/lib/connection')]
20+
delete require.cache[require.resolve('amqplib/lib/channel_model')]
21+
delete require.cache[require.resolve('amqplib/channel_api')]
22+
delete require.cache[require.resolve('amqplib')]
23+
})
24+
25+
it('instruments publish - consume', function (done) {
26+
this.sandbox.stub(microtime, 'now').returns(42)
27+
var fakeAgent = {
28+
incomingEdgeMetrics: {
29+
report: this.sandbox.spy()
30+
},
31+
getRequestId: this.sandbox.spy(),
32+
generateRequestId: this.sandbox.stub().returns('42'),
33+
generateCommId: this.sandbox.stub().returns('52'),
34+
getServiceKey: this.sandbox.stub().returns('62')
35+
}
36+
wrapper.instrumentations[5].post(require('amqplib/lib/connection'), fakeAgent)
37+
wrapper.instrumentations[3].post(require('amqplib/lib/channel_model'), fakeAgent)
38+
39+
var open = require('amqplib').connect('amqp://localhost')
40+
41+
// Publisher
42+
open.then(function (conn) {
43+
return conn.createChannel()
44+
}).then(function (ch) {
45+
ch.assertQueue('test').then(function (ok) {
46+
return ch.sendToQueue('test', new Buffer('something'))
47+
}).catch(function (err) {
48+
done(err)
49+
}).then(function () {
50+
ch.consume('test', function (msg) {
51+
try {
52+
ch.ack(msg)
53+
expect(fakeAgent.incomingEdgeMetrics.report).to.have.been.calledWith({
54+
protocol: 'amqp',
55+
serviceKey: 62,
56+
transportDelay: 0
57+
})
58+
expect(msg.content.toString()).to.eql('something')
59+
done()
60+
} catch (err) {
61+
done(err)
62+
}
63+
})
64+
})
65+
})
66+
})
67+
})
68+
69+
describe('callback api', function () {
70+
beforeEach(function () {
71+
delete require.cache[require.resolve('amqplib/lib/connection')]
72+
delete require.cache[require.resolve('amqplib/lib/callback_model')]
73+
delete require.cache[require.resolve('amqplib/callback_api')]
74+
delete require.cache[require.resolve('amqplib')]
75+
})
76+
77+
it('instruments publish - consume', function (done) {
78+
this.sandbox.stub(microtime, 'now').returns(42)
79+
var fakeAgent = {
80+
incomingEdgeMetrics: {
81+
report: this.sandbox.spy()
82+
},
83+
getRequestId: this.sandbox.spy(),
84+
generateRequestId: this.sandbox.stub().returns('42'),
85+
generateCommId: this.sandbox.stub().returns('52'),
86+
getServiceKey: this.sandbox.stub().returns('62')
87+
}
88+
wrapper.instrumentations[5].post(require('amqplib/lib/connection'), fakeAgent)
89+
wrapper.instrumentations[4].post(require('amqplib/lib/callback_model'), fakeAgent)
90+
91+
require('amqplib/callback_api')
92+
.connect('amqp://localhost', function (err, conn) {
93+
if (err != null) done(err)
94+
conn.createChannel(onOpen)
95+
function onOpen (err, ch) {
96+
if (err != null) done(err)
97+
ch.assertQueue('test-cb')
98+
ch.consume('test-cb', function (msg) {
99+
try {
100+
ch.ack(msg)
101+
expect(fakeAgent.incomingEdgeMetrics.report).to.have.been.calledWith({
102+
protocol: 'amqp',
103+
serviceKey: 62,
104+
transportDelay: 0
105+
})
106+
expect(msg.content.toString()).to.eql('callback')
107+
done()
108+
} catch (err) {
109+
done(err)
110+
}
111+
})
112+
ch.sendToQueue('test-cb', new Buffer('callback'))
113+
}
114+
})
115+
})
116+
})
117+
})

0 commit comments

Comments
 (0)