-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathglobal_traffic_hourly.js
More file actions
135 lines (100 loc) · 2.96 KB
/
global_traffic_hourly.js
File metadata and controls
135 lines (100 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
* Map Reduce procedure for the traffic collection hourly
*/
var mapf=function () {
var date = new Date();
date.setTime(this._id.sample_time);
var sample_hour = new Date(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate(), date.getUTCHours(), 0, 0, 0);
emit({
"sample_time": sample_hour,
"zone": this._id.zone
}, {
"qps": {},
"counters": this.qps,
"count": 1,
"created_time": this.created_time
});
};
var reducef=function (key, values) {
var r = {
"qps": {},
"counters": {},
"count": 0,
"created_time": 0
};
values.forEach(function(v) {
r.counters = hash_add(v.counters, r.counters);
r.count++;
r.created_time = v.created_time > r.created_time ? v.created_time:r.created_time;
});
return r;
};
var finalizef=function (key, value) {
// for hourly we divide the counters by 12 (5 minutes per hour)
var r = {
"qps": hash_divide(value.counters, 12),
"counters": value.counters,
"count": value.count,
"created_time": value.created_time
};
return r;
};
// pull the last sample_time from the DB
var last_processed_cur = db.mr_global_traffic_hourly_log.find({}, {
last_processed_time: 1
}).sort({
last_processed_time: -1
}).limit(1);
// this is where we store the mapreduce output
var mr_output;
var last_processed_time;
// if we have a previous set processed
if (last_processed_cur.hasNext()) {
var last_processed = last_processed_cur.next();
last_processed_time = last_processed.last_processed_time;
print("Running mapreduce with: $gt: " + last_processed_time + "\n");
// Run mapReduce with the previous value
mr_output=db.runCommand( { "mapreduce":"traffic",
"map": mapf,
"reduce": reducef,
"query":{ "created_time": { $gt: last_processed_time }},
"out": { reduce: "global_traffic_hourly" },
"finalize":finalizef
});
print("Done!");
} else {
print("Running mapreduce for the first time!\n");
// This is the first time running
mr_output=db.runCommand( { "mapreduce":"traffic",
"map": mapf,
"reduce": reducef,
"out": "global_traffic_hourly",
"finalize":finalizef
});
// Create index
db.global_traffic_hourly.ensureIndex({
"_id.sample_time": 1,
"_id.pubservhost": 1,
"_id.zone": 1
});
// Index the created time field
db.global_traffic_hourly.ensureIndex({
"value.created_time": 1
});
}
print("Checking for mapreduce result...");
if (mr_output.ok) {
print("OK\n");
var last_processed_cur=db.global_traffic_hourly.find({}).sort({"value.created_time":-1}).limit(1);
if(last_processed_cur.hasNext()){
var lp = last_processed_cur.next();
print("Last created_time in global_traffic_hourly: " + lp.value.created_time + "\n");
db.mr_global_traffic_hourly_log.insert({
"last_processed_time": lp.value.created_time,
"result": mr_output
});
}
}
else{
print("An error occurred processing the set:\n");
}