Skip to content
This repository was archived by the owner on Aug 12, 2023. It is now read-only.

Commit 20b0288

Browse files
authored
Start indexing fill value in Elasticsearch (#373)
* Start indexing fill value * Ensure value is indexed when reindexing fills
1 parent 6bffd50 commit 20b0288

8 files changed

Lines changed: 174 additions & 15 deletions

File tree

src/constants.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ module.exports = {
3434
FETCH_FILL_STATUS: 'fetch-fill-status',
3535
INDEX_FILL: 'index-fill',
3636
INDEX_FILL_STATUS: 'index-fill-status',
37+
INDEX_FILL_VALUE: 'index-fill-value',
3738
},
3839
QUEUE: {
3940
BULK_INDEXING: 'bulk-indexing',

src/consumers/index-fill-value.js

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
const _ = require('lodash');
2+
const mongoose = require('mongoose');
3+
const signale = require('signale');
4+
5+
const { JOB, QUEUE } = require('../constants');
6+
const elasticsearch = require('../util/elasticsearch');
7+
8+
const logger = signale.scope('index fill value');
9+
10+
const indexFillValue = async job => {
11+
const { fillId, value } = job.data;
12+
13+
if (!mongoose.Types.ObjectId.isValid(fillId)) {
14+
throw new Error(`Invalid fillId: ${fillId}`);
15+
}
16+
17+
if (!_.isFinite(value)) {
18+
throw new Error(`Invalid value: ${value}`);
19+
}
20+
21+
const exists = await elasticsearch
22+
.getClient()
23+
.exists({ id: fillId, index: 'fills', _source: false });
24+
const indexed = exists.body;
25+
26+
if (!indexed) {
27+
throw new Error(`Could not update value of fill: ${fillId}`);
28+
}
29+
30+
await elasticsearch.getClient().update({
31+
id: fillId,
32+
index: 'fills',
33+
body: { doc: { value } },
34+
});
35+
36+
logger.success(`indexed fill value: ${fillId}`);
37+
};
38+
39+
module.exports = {
40+
fn: indexFillValue,
41+
jobName: JOB.INDEX_FILL_VALUE,
42+
queueName: QUEUE.FILL_INDEXING,
43+
};

src/consumers/index.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,15 @@ const bulkIndexFills = require('./bulk-index-fills');
44
const fetchFillStatus = require('./fetch-fill-status');
55
const indexFill = require('./index-fill');
66
const indexFillStatus = require('./index-fill-status');
7+
const indexFillValue = require('./index-fill-value');
78

8-
const consumers = [bulkIndexFills, fetchFillStatus, indexFill, indexFillStatus];
9+
const consumers = [
10+
bulkIndexFills,
11+
fetchFillStatus,
12+
indexFill,
13+
indexFillStatus,
14+
indexFillValue,
15+
];
916

1017
const initQueueConsumers = config => {
1118
const queues = getQueues();

src/fixtures/fills/v2.js

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
const V2_FILL = {
2+
_id: '5e01056923573c61d846f51d',
3+
conversions: { USD: { makerFee: 0, takerFee: 0, amount: 658.8957691929245 } },
4+
hasValue: true,
5+
immeasurable: false,
6+
status: 1,
7+
assets: [
8+
{
9+
tokenResolved: true,
10+
_id: '5e01056923573c61d846f51f',
11+
amount: 660879036587289100000,
12+
tokenAddress: '0x6b175474e89094c44da98b954eedeac495271d0f',
13+
actor: 0,
14+
price: { USD: 0.9969990462935456 },
15+
value: { USD: 658.8957691929245 },
16+
},
17+
{
18+
tokenResolved: true,
19+
_id: '5e01056923573c61d846f51e',
20+
amount: 5063754758629915000,
21+
tokenAddress: '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
22+
actor: 1,
23+
price: { USD: 130.12 },
24+
value: { USD: 658.8957691929245 },
25+
},
26+
],
27+
blockHash:
28+
'0x7c19236809f502da3d481761009377877171ccd8757df293f7537c974674c81b',
29+
blockNumber: 9151911,
30+
date: '2019-12-23T18:15:16.000Z',
31+
eventId: '5e01053a3e349627a0408d51',
32+
fees: [],
33+
feeRecipient: '0xa258b39954cef5cb142fd567a46cddb31a670124',
34+
logIndex: 38,
35+
maker: '0x6924a03bb710eaf199ab6ac9f2bb148215ae9b5d',
36+
makerFee: 0,
37+
orderHash:
38+
'0xf186241e774ed0ecb6b6c101482f9a52d77caaf2db3165c9052f814e4c844f5e',
39+
protocolVersion: 2,
40+
relayerId: 7,
41+
senderAddress: '0x8018280076d7fa2caa1147e441352e8a89e1ddbe',
42+
taker: '0x8018280076d7fa2caa1147e441352e8a89e1ddbe',
43+
takerFee: 0,
44+
transactionHash:
45+
'0x36a35ae64def8beab677435be4762bf63977000172d47096230a94922383c856',
46+
pricingStatus: 0,
47+
};
48+
49+
module.exports = V2_FILL;
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Jest Snapshot v1, https://goo.gl/fbAQLP
2+
3+
exports[`should create Elasticsearch document for fill 1`] = `
4+
Object {
5+
"assets": Array [
6+
Object {
7+
"tokenAddress": "0x6b175474e89094c44da98b954eedeac495271d0f",
8+
},
9+
Object {
10+
"tokenAddress": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
11+
},
12+
],
13+
"date": "2019-12-23T18:15:16.000Z",
14+
"feeRecipient": "0xa258b39954cef5cb142fd567a46cddb31a670124",
15+
"fees": Array [],
16+
"maker": "0x6924a03bb710eaf199ab6ac9f2bb148215ae9b5d",
17+
"orderHash": "0xf186241e774ed0ecb6b6c101482f9a52d77caaf2db3165c9052f814e4c844f5e",
18+
"protocolVersion": 2,
19+
"relayerId": 7,
20+
"senderAddress": "0x8018280076d7fa2caa1147e441352e8a89e1ddbe",
21+
"status": 1,
22+
"taker": "0x8018280076d7fa2caa1147e441352e8a89e1ddbe",
23+
"transactionHash": "0x36a35ae64def8beab677435be4762bf63977000172d47096230a94922383c856",
24+
"value": 658.8957691929245,
25+
}
26+
`;

src/index/fills/create-document.js

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
1-
const createDocument = fill => ({
2-
assets: fill.assets.map(asset => ({ tokenAddress: asset.tokenAddress })),
3-
date: fill.date,
4-
fees: fill.fees.map(fee => ({ tokenAddress: fee.tokenAddress })),
5-
feeRecipient: fill.feeRecipient,
6-
maker: fill.maker,
7-
orderHash: fill.orderHash,
8-
protocolVersion: fill.protocolVersion,
9-
relayerId: fill.relayerId,
10-
senderAddress: fill.senderAddress,
11-
status: fill.status,
12-
taker: fill.taker,
13-
transactionHash: fill.transactionHash,
14-
});
1+
const _ = require('lodash');
2+
3+
const createDocument = fill => {
4+
const value = _.get(fill, 'conversions.USD.amount');
5+
6+
return {
7+
assets: fill.assets.map(asset => ({ tokenAddress: asset.tokenAddress })),
8+
date: fill.date,
9+
fees: fill.fees.map(fee => ({ tokenAddress: fee.tokenAddress })),
10+
feeRecipient: fill.feeRecipient,
11+
maker: fill.maker,
12+
orderHash: fill.orderHash,
13+
protocolVersion: fill.protocolVersion,
14+
relayerId: fill.relayerId,
15+
senderAddress: fill.senderAddress,
16+
status: fill.status,
17+
taker: fill.taker,
18+
transactionHash: fill.transactionHash,
19+
value: value === null ? undefined : value,
20+
};
21+
};
1522

1623
module.exports = createDocument;
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
const createDocument = require('./create-document');
2+
const V2_FILL = require('../../fixtures/fills/v2');
3+
4+
it('should create Elasticsearch document for fill', () => {
5+
const doc = createDocument(V2_FILL);
6+
expect(doc).toMatchSnapshot();
7+
});
8+
9+
it('should exclude value property when fill is unmeasured', () => {
10+
const fill = {
11+
...V2_FILL,
12+
conversions: {
13+
...V2_FILL.conversions,
14+
USD: { ...V2_FILL.conversions.USD, amount: null },
15+
},
16+
};
17+
const doc = createDocument(fill);
18+
19+
expect(doc.value).toBeUndefined();
20+
});

src/jobs/measure-fills/measure-fill.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
const bluebird = require('bluebird');
22
const signale = require('signale');
33

4+
const { JOB, QUEUE } = require('../../constants');
45
const { getToken } = require('../../tokens/token-cache');
6+
const { publishJob } = require('../../queues');
57
const formatTokenAmount = require('../../tokens/format-token-amount');
68
const getConversionRate = require('../../rates/get-conversion-rate');
79
const getMeasurableActor = require('./get-measurable-actor');
@@ -62,6 +64,10 @@ const measureFill = async fill => {
6264
await withTransaction(async session => {
6365
await fill.save({ session });
6466
await persistTokenPrices(tokenPrices, fill, session);
67+
await publishJob(QUEUE.FILL_INDEXING, JOB.INDEX_FILL_VALUE, {
68+
fillId: fill._id,
69+
value: totalValue,
70+
});
6571
});
6672

6773
logger.debug(`set value of fill ${fill._id} to ${totalValue}`);

0 commit comments

Comments
 (0)