-
Notifications
You must be signed in to change notification settings - Fork 228
Expand file tree
/
Copy pathbee.js
More file actions
87 lines (72 loc) · 2.09 KB
/
bee.js
File metadata and controls
87 lines (72 loc) · 2.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
const Queue = require('./queue');
const Job = require('./job');
const JobData = require('./jobData');
class BeeJob extends Job {
async remove() {
await this._job.remove();
}
async getStatus() {
return this._job.status;
}
async toJSON() {
const {id, progress, data, options: {timestamp, stacktraces: stacktrace, delay}} = this._job;
return new JobData({id, progress, data, timestamp, stacktrace, delay});
}
}
const VALID_STATES = ['waiting', 'active', 'succeeded', 'failed', 'delayed'];
const SUPPORTED_ACTIONS = ['remove'];
module.exports = class BeeQueue extends Queue {
constructor(queueConfig) {
const {name} = queueConfig;
const options = BeeQueue.parseConfig(queueConfig);
const queue = new BeeQueue(name, options);
super(queue);
}
static parseConfig(queueConfig) {
const options = {
redis: this.parseRedisConfig(queueConfig),
isWorker: false,
getEvents: false,
sendEvents: false,
storeJobs: false,
};
const {prefix} = queueConfig;
if (prefix) options.prefix = prefix;
return options;
}
async getJob(id) {
const job = this._queue.getJob(id);
return new BeeJob(job);
}
async getJobCounts() {
const jobCounts = this._queue.checkHealth();
delete jobCounts.newestJob;
return jobCounts;
}
async getJobs(state, start, size) {
const page = {};
if (['failed', 'succeeded'].includes(state)) {
page.size = size;
} else {
page.start = start;
page.end = start + size - 1;
}
let jobs = await this._queue.getJobs(state, page);
// Filter out Bee jobs that have already been removed by the time the promise resolves
jobs = jobs.filter((job) => job);
return jobs.map((j) => new BeeJob(j));
}
async addJob(data) {
const job = await this._queue.createJob(data).save();
return new BeeJob(job);
}
isValidState(state) {
return VALID_STATES.includes(state);
}
isActionSupported(action) {
return SUPPORTED_ACTIONS.includes(action);
}
isPaginationSupported(state) {
return state !== 'succeeded' && state !== 'failed';
}
};