-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaggregator.js
More file actions
79 lines (72 loc) · 2.85 KB
/
aggregator.js
File metadata and controls
79 lines (72 loc) · 2.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
'use strict';
const _ = require('underscore');
const cronJob = require('cron').CronJob;
const changeCase = require('change-case');
const OutputPlugin = require('./output-plugins/output-plugin');
const MysqlOutputPlugin = require('./output-plugins/mysql');
const StatsDOutputPlugin = require('./output-plugins/statsd');
const HBaseOutputPlugin = require('./output-plugins/hbase-plugin');
const debug = require('debug')('node-stats-aggregator:aggregator');
/**
* @class
*/
class StatsAggregator {
constructor(name, keyFields, valueFields, options) {
this.name = name;
this.data = {};
this.plugins = [];
this.keyFields = keyFields;
debug('keyFields = ', keyFields);
this.valueFields = valueFields;
debug('valueFields = ', valueFields);
this.valueFieldsAliases = (options && options.valueFieldsAliases) || {};
debug('aliases map:', this.valueFieldsAliases);
this.timeFields = (options && options.timeFields) || {};
debug('timeFields:', this.timeFields);
_.each(this.valueFields, vfield => {
let methodName = changeCase.camel('add ' + (this.valueFieldsAliases[vfield] || vfield));
debug(this.name, ' is generating inc method named:', methodName);
this[methodName] = (keyValues,value=1) => {
const stat = this.getOrCreateStat(keyValues);
stat[vfield]+=value;
}
});
}
getOrCreateStat(keyValues) {
const dataMapKey = _.map(this.keyFields, kf => keyValues[kf]).join('.');
const dataForKey = this.data[dataMapKey];
if (dataForKey)
return dataForKey;
else {
this.data[dataMapKey] = {};
_.each(this.keyFields, kfield => this.data[dataMapKey][kfield] = keyValues[kfield]);
_.each(this.valueFields, vfield => this.data[dataMapKey][vfield] = 0);
_.each(this.timeFields, (timeFunc, tfieldName) => this.data[dataMapKey][tfieldName] = timeFunc());
const stat = this.data[dataMapKey];
debug('createdStat:', stat);
return stat;
}
}
save() {
if (_.isEmpty(this.data)) {
return;
}
_.each(this.plugins, plugin => plugin.save(this.data));
this.data = {};
}
}
module.exports = {
createStatsAggregator: function (name, keyFields, valueFields, options) {
var agg = new StatsAggregator(name, keyFields, valueFields, options);
new cronJob({
cronTime: (options && options.cronTime) || (Math.floor(Math.random() * 60) + ' */3 * * * *'),
onTick: function () {
debug(`Running ${name} stat Job`);
agg.save();
},
start: true
});
return agg;
},
StatsAggregator, OutputPlugin, MysqlOutputPlugin, StatsDOutputPlugin , HBaseOutputPlugin
};