Skip to content
This repository was archived by the owner on Aug 12, 2023. It is now read-only.

Commit 6bffd50

Browse files
authored
Improve indexing and status fetching jobs (#372)
* Better handle MongoDB eventual consistency * Assign a batch id to bulk index jobs * Tweak error message
1 parent 44a5120 commit 6bffd50

7 files changed

Lines changed: 65 additions & 17 deletions

File tree

src/constants.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ module.exports = {
3333
BULK_INDEX_FILLS: 'bulk-index-fills',
3434
FETCH_FILL_STATUS: 'fetch-fill-status',
3535
INDEX_FILL: 'index-fill',
36+
INDEX_FILL_STATUS: 'index-fill-status',
3637
},
3738
QUEUE: {
3839
BULK_INDEXING: 'bulk-indexing',

src/consumers/bulk-index-fills.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const bulkIndexFills = async job => {
3434
}
3535

3636
const lastFillId = nextBatch[nextBatch.length - 1]._id;
37+
const batchId = job.data.batchId || Date.now();
3738

3839
// Get on with processing the next batch whilst this one is being processed
3940
// to improve batch indexing throughput.
@@ -42,10 +43,11 @@ const bulkIndexFills = async job => {
4243
QUEUE.BULK_INDEXING,
4344
JOB.BULK_INDEX_FILLS,
4445
{
46+
batchId,
4547
batchSize,
4648
lastFillId,
4749
},
48-
{ jobId: `bulk-index-${lastFillId}` },
50+
{ jobId: `bulk-index-${batchId}-${lastFillId}`, removeOnComplete: false },
4951
);
5052
logger.success(
5153
`scheduled indexing of next ${batchSize} fills after ${lastFillId}`,

src/consumers/fetch-fill-status.js

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
const _ = require('lodash');
22
const mongoose = require('mongoose');
3-
const ms = require('ms');
43
const signale = require('signale');
54

65
const { FILL_STATUS, JOB, QUEUE } = require('../constants');
@@ -30,19 +29,16 @@ const fetchFillStatus = async job => {
3029
const status =
3130
receipt.status === 0 ? FILL_STATUS.FAILED : FILL_STATUS.SUCCESSFUL;
3231
const statusText = _.findKey(FILL_STATUS, value => value === status);
32+
const result = await getModel('Fill').updateOne({ _id: fillId }, { status });
3333

34-
await getModel('Fill').updateOne({ _id: fillId }, { status });
35-
await publishJob(
36-
QUEUE.FILL_INDEXING,
37-
JOB.INDEX_FILL,
38-
{
39-
fillId,
40-
},
41-
{
42-
delay: ms('30 seconds'),
43-
removeOnComplete: true,
44-
},
45-
);
34+
if (result.modifiedCount === 0) {
35+
throw new Error(`No fill found with the id: ${fillId}`);
36+
}
37+
38+
await publishJob(QUEUE.FILL_INDEXING, JOB.INDEX_FILL_STATUS, {
39+
fillId,
40+
status,
41+
});
4642

4743
logger.success(`set status of fill ${fillId} to ${statusText}`);
4844
};

src/consumers/index-fill-status.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
const mongoose = require('mongoose');
2+
const signale = require('signale');
3+
4+
const { FILL_STATUS, JOB, QUEUE } = require('../constants');
5+
const elasticsearch = require('../util/elasticsearch');
6+
7+
const logger = signale.scope('index fill status');
8+
9+
const indexFillStatus = async job => {
10+
const { fillId, status } = job.data;
11+
12+
if (!mongoose.Types.ObjectId.isValid(fillId)) {
13+
throw new Error(`Invalid fillId: ${fillId}`);
14+
}
15+
16+
if (status !== FILL_STATUS.FAILED && status !== FILL_STATUS.SUCCESSFUL) {
17+
throw new Error(`Invalid status: ${status}`);
18+
}
19+
20+
const exists = await elasticsearch
21+
.getClient()
22+
.exists({ id: fillId, index: 'fills', _source: false });
23+
const indexed = exists.body;
24+
25+
if (!indexed) {
26+
throw new Error(`Could not update status of fill: ${fillId}`);
27+
}
28+
29+
await elasticsearch.getClient().update({
30+
id: fillId,
31+
index: 'fills',
32+
body: { doc: { status } },
33+
});
34+
35+
logger.success(`indexed fill status: ${fillId}`);
36+
};
37+
38+
module.exports = {
39+
fn: indexFillStatus,
40+
jobName: JOB.INDEX_FILL_STATUS,
41+
queueName: QUEUE.FILL_INDEXING,
42+
};

src/consumers/index.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ const { getQueues } = require('../queues');
33
const bulkIndexFills = require('./bulk-index-fills');
44
const fetchFillStatus = require('./fetch-fill-status');
55
const indexFill = require('./index-fill');
6+
const indexFillStatus = require('./index-fill-status');
67

7-
const consumers = [bulkIndexFills, fetchFillStatus, indexFill];
8+
const consumers = [bulkIndexFills, fetchFillStatus, indexFill, indexFillStatus];
89

910
const initQueueConsumers = config => {
1011
const queues = getQueues();

src/jobs/create-fills/index.js

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
const bluebird = require('bluebird');
2+
const ms = require('ms');
23
const signale = require('signale');
34

45
const {
@@ -53,7 +54,9 @@ const createFills = async ({ batchSize }) => {
5354
fillId: newFill._id,
5455
transactionHash: newFill.transactionHash,
5556
},
56-
{ removeOnComplete: true },
57+
{
58+
delay: ms('5 seconds'), // Delay status fetching to ensure MongoDB changes have propagated
59+
},
5760
);
5861

5962
await publishJob(
@@ -62,7 +65,9 @@ const createFills = async ({ batchSize }) => {
6265
{
6366
fillId: newFill._id,
6467
},
65-
{ removeOnComplete: true },
68+
{
69+
delay: ms('5 seconds'), // Delay indexing to ensure MongoDB changes have propagated
70+
},
6671
);
6772
});
6873

src/queues.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const publishJob = async (queueName, jobName, jobData, options = {}) => {
4242
delay: ms('10 seconds'),
4343
type: 'exponential',
4444
},
45+
removeOnComplete: true,
4546
};
4647
const queue = getQueue(queueName);
4748

0 commit comments

Comments
 (0)