Skip to content

Commit f996404

Browse files
author
Andrew L
committed
adjust public transforming & improve cord links population with batch stream
1 parent 6d443eb commit f996404

6 files changed

Lines changed: 406 additions & 107 deletions

File tree

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,81 @@
11
import mongoose from 'mongoose';
22
import _get from 'lodash/get';
33
import _set from 'lodash/set';
4+
import _keyBy from 'lodash/keyBy';
5+
import _uniq from 'lodash/uniq';
46

5-
import { mapStream } from 'utils/stream';
7+
import { mapStream, trasnformToBatchStream, streamCombiner } from 'utils/stream';
8+
9+
export default function mongooseStreamPopulate(
10+
modelName,
11+
keyPath,
12+
select
13+
) {
14+
let batchSize = 10;
15+
let read, lean;
16+
17+
if (arguments[0] && typeof arguments[0] === 'object') {
18+
modelName = arguments[0].modelName;
19+
keyPath = arguments[0].keyPath;
20+
select = arguments[0].select;
21+
batchSize = arguments[0].batchSize || batchSize;
22+
read = arguments[0].read;
23+
lean = arguments[0].lean;
24+
}
625

7-
export default function mongooseStreamPopulate(modelName, keyPath, select) {
826
const Model = mongoose.model(modelName);
9-
const modelCacheMap = {};
27+
const cacheMap = {};
1028

11-
const fetchResource = async (id, select) => {
12-
if (modelCacheMap[id] !== undefined) {
13-
return modelCacheMap[id];
14-
}
29+
const fetchResources = async(resourceIds) => {
30+
resourceIds = resourceIds.filter(id => cacheMap[id] === undefined);
1531

16-
const query = Model.findById(id).lean();
32+
if (!resourceIds.length) {
33+
return;
34+
}
1735

18-
if (select) {
19-
query.select(select);
36+
for (let resourceId of resourceIds) {
37+
cacheMap[resourceId] = null;
2038
}
2139

22-
const obj = await query;
23-
const result = obj || null;
40+
const query = Model.find({ _id: { $in: resourceIds } });
41+
42+
if (lean !== false) query.lean();
43+
if (read) query.read(read);
44+
if (select) query.select(select);
45+
46+
const list = await query;
47+
const resourceMap = _keyBy(list, '_id');
2448

25-
modelCacheMap[id] = result;
26-
return result;
49+
Object.assign(cacheMap, resourceMap);
2750
};
2851

29-
return mapStream(async obj => {
30-
const resourceId = _get(obj, keyPath);
52+
const streamBatch = trasnformToBatchStream({ size: batchSize });
53+
const streamMap = mapStream(async batch => {
54+
const resourceIds = _uniq(
55+
batch.map(obj => _get(obj, keyPath)).flat()
56+
);
57+
58+
while(resourceIds.length > 0) {
59+
const ids = resourceIds.splice(0, batchSize);
60+
61+
await fetchResources(ids);
62+
}
63+
64+
for (let obj of batch) {
65+
let resourceValue = _get(obj, keyPath);
66+
let result = null;
67+
68+
if (Array.isArray(resourceValue)) {
69+
result = resourceValue.map(id => cacheMap[id] || null);
70+
} else {
71+
result = cacheMap[resourceValue] || null;
72+
}
3173

32-
if (!resourceId) {
33-
_set(obj, keyPath, null);
34-
return obj;
74+
_set(obj, keyPath, result || null);
3575
}
3676

37-
const resource = await fetchResource(resourceId, select);
38-
_set(obj, keyPath, resource);
77+
return batch;
78+
}, { flatMode: true });
3979

40-
return obj;
41-
});
80+
return streamCombiner(streamBatch, streamMap);
4281
}

app/services/sheets/cord-row-population.js

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,78 @@
1-
import Promise from 'bluebird';
2-
import { mapStream } from 'utils/stream';
1+
import _get from 'lodash/get';
2+
import { mapStream, trasnformToBatchStream, streamCombiner } from 'utils/stream';
33

44
import mongoose from 'lib/mongoose';
55

66
const DB_CORD19 = mongoose.__openConnection('cord19');
77

88
export default function cordRowPopulation() {
9-
const cordRefBuffer = {};
9+
const cacheMap = {};
10+
const batchSize = 10;
1011

11-
const cordRefGetter = async(val) => {
12-
if (cordRefBuffer[val] !== undefined) {
13-
return cordRefBuffer[val];
14-
}
12+
const fetchResources = async(ids) => {
13+
ids = ids.filter(id => cacheMap[id] === undefined);
1514

16-
const refDoc = await DB_CORD19.collection('v19').findOne({
17-
cord_uid: val
18-
});
15+
if (!ids.length) {
16+
return;
17+
}
1918

20-
if (refDoc) {
21-
cordRefBuffer[val] = resolveHttpProtocol(refDoc['doi']);
22-
} else {
23-
cordRefBuffer[val] = null;
19+
for (let resourceId of ids) {
20+
cacheMap[resourceId] = null;
2421
}
2522

26-
return cordRefBuffer[val];
23+
const list = await DB_CORD19.collection('v19').find({
24+
cord_uid: { $in: ids }
25+
}, { doi: 1 }).toArray();
26+
27+
list.forEach(v => {
28+
const link = resolveHttpProtocol(v.doi);
29+
cacheMap[v['cord_uid']] = link;
30+
});
2731
};
2832

29-
const transformStream = mapStream(async data => {
30-
const row = data.row || data;
33+
const streamBatch = trasnformToBatchStream({ size: batchSize });
34+
const streamMap = mapStream(async batch => {
35+
const resourceIdsMap = {};
36+
const resourceAssignMap = {};
37+
const rows = [];
38+
39+
for (let rowIdx = 0; rowIdx < batch.length; rowIdx++) {
40+
const row = batch[rowIdx].row || batch[rowIdx];
41+
const cells = _get(row, 'cells', []);
42+
43+
rows.push(row);
44+
45+
for (let cellIdx = 0; cellIdx < cells.length; cellIdx++) {
46+
const cell = cells[cellIdx];
3147

32-
row.cells = await Promise.map(row.cells, async cell => {
33-
if (cell.t === 'cord_ref') {
34-
cell.link = await cordRefGetter(cell.v);
48+
if (cell.t === 'cord_ref') {
49+
resourceIdsMap[cell.v] = true;
50+
resourceAssignMap[`${rowIdx}:${cellIdx}`] = cell.v;
51+
}
3552
}
53+
}
54+
55+
const resourceIds = Object.keys(resourceIdsMap);
56+
57+
while(resourceIds.length > 0) {
58+
const ids = resourceIds.splice(0, batchSize);
59+
await fetchResources(ids);
60+
}
3661

37-
return cell;
38-
}, { concurrency: 5 });
62+
for(let pattern of Object.keys(resourceAssignMap)) {
63+
const cordUid = resourceAssignMap[pattern];
64+
const [rowIdx, cellIdx] = pattern.split(':').map(v => parseInt(v));
65+
66+
const resourceValue = cacheMap[cordUid] || null;
67+
68+
rows[rowIdx].cells[cellIdx].link = resourceValue;
69+
}
3970

40-
return row;
41-
});
71+
return rows;
72+
}, { flatMode: true });
4273

4374
return {
44-
stream: transformStream
75+
stream: streamCombiner(streamBatch, streamMap)
4576
};
4677
}
4778

app/services/sheets/public-row-transform.js

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,23 @@ export default function publicRownTrasnform(sheet) {
6767

6868
const severeInfo = [
6969
// if Critical Only == “Y” --> put a double cross at the beginning of the Severe column
70-
_get(cell('Critical only'), 'v', '')
71-
.replace(/^Y$/i, '‡')
72-
,
70+
pickStr(
71+
_get(cell('Critical only'), 'v', '')
72+
.replace(/^Y$/i, '‡'),
73+
['‡']
74+
),
7375
// if Severe Adjusted == “Not Adjusted” --> put a double S at the beginning of the Severe column
74-
_get(cell('Severe Adjusted'), 'v', '')
75-
.replace(/^Not Adjusted$/i, '§')
76-
,
76+
pickStr(
77+
_get(cell('Severe Adjusted'), 'v', '')
78+
.replace(/^Not Adjusted$/i, '§'),
79+
['§']
80+
),
7781
// if “Severe Calculated” == “Calculated” --> put a single cross at the beginning the Severe column
78-
_get(cell('Severe Calculated'), 'v', '')
79-
.replace(/^Calculated$/i, '†')
80-
,
82+
pickStr(
83+
_get(cell('Severe Calculated'), 'v', '')
84+
.replace(/^Calculated$/i, '†'),
85+
['†']
86+
),
8187
_get(cell('Severe'), 'v')
8288
].filter(v => !isEmpty(v)).join(' ').trim();
8389

@@ -128,19 +134,22 @@ export default function publicRownTrasnform(sheet) {
128134

129135
const fatalityInfo = [
130136
// if Fatality Adjusted == “Not Adjusted” --> put a double S at the beginning of the Fatality column
131-
_get(cell('Fatality Adjusted'), 'v', '')
132-
.replace(/^Not Adjusted$/i, '§')
133-
,
137+
pickStr(
138+
_get(cell('Fatality Adjusted'), 'v', '')
139+
.replace(/^Not Adjusted$/i, '§'),
140+
['§']
141+
),
134142
// if “Fatality Calculated” == “Calcluated” --> put a single cross at the beginning of the Fatality column
135-
_get(cell('Fatality Calculated'), 'v', '')
136-
.replace(/^Calcluated$/i, '†')
137-
,
143+
pickStr(
144+
_get(cell('Fatality Calculated'), 'v', '')
145+
.replace(/^Calcluated$/i, '†'),
146+
['†']
147+
),
138148
// if “Discharged vs. death?” == “Y” --> put a little square at the beginning of the Fatality column
139-
(
149+
pickStr((
140150
_get(cell('Discharged vs. death?'), 'v', '') ||
141151
_get(cell('Discharge vs. death?'), 'v', '')
142-
)
143-
.replace(/^Y$/i, '□')
152+
).replace(/^Y$/i, '□'), ['□'])
144153
,
145154
_get(cell('Fatality'), 'v')
146155
].filter(v => !isEmpty(v)).join(' ').trim();
@@ -222,3 +231,7 @@ export default function publicRownTrasnform(sheet) {
222231
function isEmpty(v) {
223232
return v === undefined || v === null || v === '' || v === 'N';
224233
}
234+
235+
function pickStr(str, values, thenValue = '') {
236+
return values.indexOf(str) > -1 ? str : thenValue;
237+
}

0 commit comments

Comments
 (0)