Skip to content

Commit 3e207c2

Browse files
committed
imp(): change migration to use bulkWrite instead of cursor
1 parent 97721a2 commit 3e207c2

1 file changed

Lines changed: 68 additions & 32 deletions

File tree

migrations/20250707160120-move-timestamp-to-event-level.js

Lines changed: 68 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,51 +26,87 @@ module.exports = {
2626
[
2727
{
2828
$set: {
29-
timestamp: '$payload.timestamp',
29+
timestamp: { $toDouble: "$payload.timestamp" },
3030
},
3131
},
3232
{
33-
$unset: 'payload.timestamp',
33+
$unset: "payload.timestamp",
3434
},
3535
]
3636
);
3737
}
3838

3939
// Step 2: Process repetition collections
4040
for (const collectionName of repetitionCollections) {
41+
const projectId = collectionName.split(':')[1];
4142
const collection = db.collection(collectionName);
42-
const cursor = collection.find({});
4343

44-
while (await cursor.hasNext()) {
45-
const doc = await cursor.next();
46-
47-
if (doc.payload?.timestamp) {
48-
// Move timestamp from payload to root
49-
await collection.updateOne(
50-
{ _id: doc._id },
51-
{
52-
$set: { timestamp: doc.payload.timestamp },
53-
$unset: { 'payload.timestamp': '' },
54-
}
55-
);
56-
} else if (doc.groupHash) {
57-
// Attempt to find matching event
58-
let eventDoc = null;
59-
60-
const projectId = collectionName.split(':')[1];
61-
const eventsCollectionName = `events:${projectId}`;
62-
63-
eventDoc = await db.collection(eventsCollectionName).findOne({ groupHash: doc.groupHash });
44+
// Step 2.1: First, handle documents where payload.timestamp exists
45+
await collection.updateMany(
46+
{ 'payload.timestamp': { $exists: true } },
47+
[
48+
{
49+
$set: {
50+
timestamp: { $toDouble: "$payload.timestamp" }, // Convert payload.timestamp to number
51+
},
52+
},
53+
{
54+
$unset: "payload.timestamp", // Remove payload.timestamp
55+
},
56+
]
57+
);
58+
59+
const pipeline = [
60+
{
61+
$match: {
62+
$or : [
63+
{ "payload.timestamp": { $exists: false } },
64+
{ payload: { $exists: false } },
65+
],
66+
timestamp: { $exists: false },
67+
groupHash: { $exists: true }
68+
}
69+
},
70+
{
71+
$lookup: {
72+
from: `events:${projectId}`, // dynamically referencing the events collection
73+
localField: "groupHash", // field from repetitions collection
74+
foreignField: "groupHash", // field in the events collection
75+
as: "eventData" // alias for the matched data
76+
}
77+
},
78+
{
79+
$unwind: {
80+
path: "$eventData", // we expect only one match per groupHash
81+
preserveNullAndEmptyArrays: true // allow documents with no matching event
82+
}
83+
},
84+
{
85+
$match: {
86+
"eventData.timestamp": { $exists: true } // only proceed if event.timestamp exists
87+
}
88+
},
89+
{
90+
$project: {
91+
_id: 1,
92+
eventTimestamp: { $toDouble: "$eventData.timestamp"}
93+
}
94+
}
95+
];
96+
97+
const matchedDocs = await collection.aggregate(pipeline).toArray();
6498

65-
if (eventDoc?.timestamp) {
66-
await collection.updateOne(
67-
{ _id: doc._id },
68-
{
69-
$set: { timestamp: eventDoc.timestamp },
70-
}
71-
);
99+
const bulkOps = matchedDocs.map(doc => ({
100+
updateOne: {
101+
filter: { _id: doc._id },
102+
update: {
103+
$set: { timestamp: doc.eventTimestamp } // Set the timestamp from the event
72104
}
73105
}
106+
}));
107+
108+
if (bulkOps.length > 0) {
109+
await collection.bulkWrite(bulkOps);
74110
}
75111
}
76112
},
@@ -100,7 +136,7 @@ module.exports = {
100136
},
101137
},
102138
{
103-
$unset: 'timestamp',
139+
$unset: { 'timestamp': "" },
104140
},
105141
]
106142
);
@@ -117,7 +153,7 @@ module.exports = {
117153
},
118154
},
119155
{
120-
$unset: 'timestamp',
156+
$unset: { 'timestamp': "" },
121157
},
122158
]
123159
);

0 commit comments

Comments
 (0)