Skip to content

Commit f44d130

Browse files
committed
feat: rework mongo bulkWrite typings, to be an abstracted subset
This matches the flow of other operations, and ensures the types match our mock
1 parent 52d4b67 commit f44d130

22 files changed

Lines changed: 92 additions & 54 deletions

File tree

meteor/server/api/integration/expectedPackages.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { protectString } from '@sofie-automation/corelib/dist/protectedString'
99
import { getCurrentTime } from '../../lib/lib'
1010
import { getPackageContainerPackageId } from '@sofie-automation/corelib/dist/dataModel/PackageContainerPackageStatus'
1111
import { getPackageInfoId, PackageInfoDB } from '@sofie-automation/corelib/dist/dataModel/PackageInfos'
12-
import type { AnyBulkWriteOperation } from 'mongodb'
12+
import type { MongoBulkWriteOperation } from '@sofie-automation/corelib/dist/mongo'
1313
import { onUpdatedPackageInfo } from '../ingest/packageInfo'
1414
import { getPackageContainerId } from '@sofie-automation/corelib/dist/dataModel/PackageContainerStatus'
1515
import {
@@ -58,7 +58,7 @@ export namespace PackageManagerIntegration {
5858
if (!peripheralDevice.studioAndConfigId)
5959
throw new Meteor.Error(400, 'Device "' + peripheralDevice._id + '" has no studio')
6060

61-
const bulkChanges: AnyBulkWriteOperation<ExpectedPackageWorkStatus>[] = []
61+
const bulkChanges: MongoBulkWriteOperation<ExpectedPackageWorkStatus>[] = []
6262
const removedIds: ExpectedPackageWorkStatusId[] = []
6363

6464
const ps: Promise<void>[] = []

meteor/server/collections/collection.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1-
import { FindOptions, MongoModifier, MongoQuery, ObserveChangesOptions } from '@sofie-automation/corelib/dist/mongo'
1+
import {
2+
FindOptions,
3+
MongoBulkWriteOperation,
4+
MongoModifier,
5+
MongoQuery,
6+
ObserveChangesOptions,
7+
} from '@sofie-automation/corelib/dist/mongo'
28
import { ProtectedString } from '@sofie-automation/corelib/dist/protectedString'
39
import { Meteor } from 'meteor/meteor'
410
import { Mongo } from 'meteor/mongo'
511
import { NpmModuleMongodb } from 'meteor/npm-mongo'
612
import { PromisifyCallbacks } from '@sofie-automation/shared-lib/dist/lib/types'
7-
import type { AnyBulkWriteOperation, Collection as RawCollection } from 'mongodb'
13+
import type { Collection as RawCollection } from 'mongodb'
814
import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections'
915
import { registerCollection } from './lib'
1016
import { WrappedMockCollection } from './implementations/mock'
@@ -160,7 +166,7 @@ export interface AsyncOnlyMongoCollection<
160166
* This should be used instead of Promise.all(...) when doing multiple updates, as it is more performant
161167
* @param ops Operations to perform
162168
*/
163-
bulkWriteAsync(ops: Array<AnyBulkWriteOperation<DBInterface>>): Promise<void>
169+
bulkWriteAsync(ops: Array<MongoBulkWriteOperation<DBInterface>>): Promise<void>
164170
}
165171

166172
/**

meteor/server/collections/implementations/asyncCollection.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
import { MongoModifier, MongoQuery, ObserveChangesOptions } from '@sofie-automation/corelib/dist/mongo'
1+
import {
2+
MongoBulkWriteOperation,
3+
MongoModifier,
4+
MongoQuery,
5+
ObserveChangesOptions,
6+
} from '@sofie-automation/corelib/dist/mongo'
27
import { ProtectedString, protectString, unprotectString } from '@sofie-automation/corelib/dist/protectedString'
38
import { Meteor } from 'meteor/meteor'
49
import { Mongo } from 'meteor/mongo'
@@ -287,7 +292,7 @@ export class WrappedAsyncMongoCollection<
287292
}
288293
}
289294

290-
async bulkWriteAsync(ops: Array<AnyBulkWriteOperation<DBInterface>>): Promise<void> {
295+
async bulkWriteAsync(ops: Array<MongoBulkWriteOperation<DBInterface>>): Promise<void> {
291296
const span = profiler.startSpan(`MongoCollection.${this.name}.bulkWrite`)
292297
if (span) {
293298
span.addLabels({
@@ -298,7 +303,7 @@ export class WrappedAsyncMongoCollection<
298303

299304
if (ops.length > 0) {
300305
const rawCollection = this.rawCollection()
301-
const bulkWriteResult = await rawCollection.bulkWrite(ops, {
306+
const bulkWriteResult = await rawCollection.bulkWrite(ops as AnyBulkWriteOperation<DBInterface>[], {
302307
ordered: false,
303308
})
304309

meteor/server/collections/implementations/mock.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { MongoQuery } from '@sofie-automation/corelib/dist/mongo'
1+
import { MongoBulkWriteOperation, MongoQuery } from '@sofie-automation/corelib/dist/mongo'
22
import { ProtectedString } from '@sofie-automation/corelib/dist/protectedString'
33
import { Meteor } from 'meteor/meteor'
44
import { FindOptions, MongoCursor } from '@sofie-automation/meteor-lib/dist/collections/lib'
@@ -37,10 +37,10 @@ export class WrappedMockCollection<DBInterface extends { _id: ProtectedString<an
3737
throw new Error('findWithCursor not supported in tests')
3838
}
3939

40-
override async bulkWriteAsync(ops: Array<AnyBulkWriteOperation<DBInterface>>): Promise<void> {
40+
override async bulkWriteAsync(ops: Array<MongoBulkWriteOperation<DBInterface>>): Promise<void> {
4141
if (ops.length > 0) {
4242
const rawCollection = this.rawCollection()
43-
const bulkWriteResult = await rawCollection.bulkWrite(ops, {
43+
const bulkWriteResult = await rawCollection.bulkWrite(ops as AnyBulkWriteOperation<DBInterface>[], {
4444
ordered: false,
4545
})
4646
if (bulkWriteResult && bulkWriteResult.hasWriteErrors()) {

meteor/server/lib/database.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import { Meteor } from 'meteor/meteor'
2-
import type { AnyBulkWriteOperation } from 'mongodb'
32
import _ from 'underscore'
43
import { normalizeArrayToMap, deleteAllUndefinedProperties } from '@sofie-automation/corelib/dist/lib'
54
import { ProtectedString } from '@sofie-automation/corelib/dist/protectedString'
6-
import { MongoQuery } from '@sofie-automation/corelib/dist/mongo'
5+
import { MongoBulkWriteOperation, MongoQuery } from '@sofie-automation/corelib/dist/mongo'
76
import { profiler } from '../api/profiler'
87
import { AsyncOnlyMongoCollection } from '../collections/collection'
98

@@ -118,7 +117,7 @@ async function savePreparedChanges<DBInterface extends DBObj>(
118117
newObjIds.add(id)
119118
}
120119

121-
const updates: AnyBulkWriteOperation<DBInterface>[] = []
120+
const updates: MongoBulkWriteOperation<DBInterface>[] = []
122121
const removedDocs: DBInterface['_id'][] = []
123122

124123
_.each(preparedChanges.changed || [], (oUpdate) => {

meteor/server/migration/upgrades/lib.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import type { ShowStyleBaseId, TriggeredActionId } from '@sofie-automation/corel
22
import { TriggeredActions } from '../../collections'
33
import { Complete, getRandomId, literal, normalizeArrayToMap } from '@sofie-automation/corelib/dist/lib'
44
import type { DBTriggeredActions } from '@sofie-automation/meteor-lib/dist/collections/TriggeredActions'
5-
import type { AnyBulkWriteOperation } from 'mongodb'
5+
import type { MongoBulkWriteOperation } from '@sofie-automation/corelib/dist/mongo'
66
import { wrapDefaultObject } from '@sofie-automation/corelib/dist/settings/objectWithOverrides'
77
import type { IBlueprintTriggeredActions } from '@sofie-automation/blueprints-integration'
88

@@ -17,7 +17,7 @@ export async function updateTriggeredActionsForShowStyleBaseId(
1717
const oldTriggeredActions = normalizeArrayToMap(oldTriggeredActionsArray, 'blueprintUniqueId')
1818

1919
const newDocIds: TriggeredActionId[] = []
20-
const bulkOps: AnyBulkWriteOperation<DBTriggeredActions>[] = []
20+
const bulkOps: MongoBulkWriteOperation<DBTriggeredActions>[] = []
2121

2222
for (const newTriggeredAction of triggeredActions) {
2323
const oldValue = oldTriggeredActions.get(newTriggeredAction._id)

packages/corelib/src/mongo.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,19 @@ import { ProtectedString } from './protectedString.js'
33
import * as objectPath from 'object-path'
44
import type {
55
Condition,
6+
DeleteManyModel,
7+
DeleteOneModel,
8+
Document,
69
Filter,
10+
InsertOneModel,
711
MatchKeysAndValues,
812
OnlyFieldsOfType,
913
PullOperator,
1014
PushOperator,
15+
ReplaceOneModel,
1116
SetFields,
17+
UpdateManyModel,
18+
UpdateOneModel,
1219
} from 'mongodb'
1320
import { clone } from './lib.js'
1421

@@ -90,6 +97,24 @@ export type MongoModifier<TDoc> = {
9097
$rename?: Record<string, string>
9198
}
9299

100+
/**
101+
* A bulkWrite operation, hand-rolled to match exactly the arms our in-memory mocks implement (see the
102+
* `bulkWrite` handlers in the meteor and job-worker collection mocks). Like {@link MongoModifier}, this is
103+
* intentionally NOT mongodb's `AnyBulkWriteOperation`:
104+
* - the `update` of the `updateOne`/`updateMany` arms is constrained to {@link MongoModifier}, so a whole
105+
* plain document cannot sneak in as a "modifier" via the bulkWrite path (the heaviest write path). Use
106+
* the `replaceOne` arm for a genuine full-document replacement.
107+
* - aggregation-pipeline updates (the `Document[]` form) are deliberately omitted, as `mongoModify` does
108+
* not implement them.
109+
*/
110+
export type MongoBulkWriteOperation<TDoc extends Document> =
111+
| { insertOne: InsertOneModel<TDoc> }
112+
| { replaceOne: ReplaceOneModel<TDoc> }
113+
| { updateOne: Omit<UpdateOneModel<TDoc>, 'update'> & { update: MongoModifier<TDoc> } }
114+
| { updateMany: Omit<UpdateManyModel<TDoc>, 'update'> & { update: MongoModifier<TDoc> } }
115+
| { deleteOne: DeleteOneModel<TDoc> }
116+
| { deleteMany: DeleteManyModel<TDoc> }
117+
93118
/** End of hacks */
94119

95120
export function mongoWhereFilter<T, R extends Record<string, any>>(items: R[], selector: MongoQuery<T>): R[] {

packages/job-worker/src/__mocks__/collection.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@ import { TimelineComplete } from '@sofie-automation/corelib/dist/dataModel/Timel
2727
import { clone, literal } from '@sofie-automation/corelib/dist/lib'
2828
import {
2929
FindOptions as CacheFindOptions,
30+
MongoBulkWriteOperation,
3031
mongoFindOptions,
3132
mongoModify,
3233
mongoWhere,
3334
} from '@sofie-automation/corelib/dist/mongo'
3435
import { ProtectedString } from '@sofie-automation/corelib/dist/protectedString'
35-
import { AnyBulkWriteOperation, Collection, CountOptions, FindOptions } from 'mongodb'
36+
import { Collection, CountOptions, FindOptions } from 'mongodb'
3637
import { ReadonlyDeep } from 'type-fest'
3738
import {
3839
IChangeStream,
@@ -231,7 +232,7 @@ export class MockMongoCollection<TDoc extends { _id: ProtectedString<any> }> imp
231232
this.#documents.set(doc._id, clone(doc))
232233
return exists
233234
}
234-
async bulkWrite(ops: AnyBulkWriteOperation<TDoc>[]): Promise<unknown> {
235+
async bulkWrite(ops: MongoBulkWriteOperation<TDoc>[]): Promise<unknown> {
235236
this.#ops.push({ type: 'bulkWrite', args: [ops.length] })
236237

237238
for (const op of ops) {

packages/job-worker/src/db/changes.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { ProtectedString } from '@sofie-automation/corelib/dist/protectedString'
2-
import { AnyBulkWriteOperation } from 'mongodb'
3-
import { ICollection, MongoQuery } from './collections.js'
2+
import { ICollection, MongoBulkWriteOperation, MongoQuery } from './collections.js'
43
import _ from 'underscore'
54
import { deleteAllUndefinedProperties, normalizeArrayToMap } from '@sofie-automation/corelib/dist/lib'
65
import { JobContext } from '../jobs/index.js'
@@ -123,7 +122,7 @@ async function savePreparedChanges<TDoc extends { _id: ProtectedString<any> }>(
123122
newObjIds.add(id)
124123
}
125124

126-
const updates: AnyBulkWriteOperation<TDoc>[] = []
125+
const updates: MongoBulkWriteOperation<TDoc>[] = []
127126
const removedDocs: TDoc['_id'][] = []
128127

129128
_.each(preparedChanges.changed || [], (oUpdate) => {

packages/job-worker/src/db/collection.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { EventEmitter } from 'events'
33
import { AnyBulkWriteOperation, ChangeStream, Collection as MongoCollection, FindOptions, CountOptions } from 'mongodb'
44
import { IChangeStreamEvents } from './index.js'
55
import { startSpanManual } from '../profiler.js'
6-
import { IChangeStream, ICollection, MongoModifier, MongoQuery } from './collections.js'
6+
import { IChangeStream, ICollection, MongoBulkWriteOperation, MongoModifier, MongoQuery } from './collections.js'
77

88
/** Wrap some APM and better error small query modifications around a Mongo.Collection */
99
class WrappedCollection<TDoc extends { _id: ProtectedString<any> }> implements ICollection<TDoc> {
@@ -155,7 +155,7 @@ class WrappedCollection<TDoc extends { _id: ProtectedString<any> }> implements I
155155
return res.deletedCount
156156
}
157157

158-
async bulkWrite(ops: Array<AnyBulkWriteOperation<TDoc>>): Promise<void> {
158+
async bulkWrite(ops: Array<MongoBulkWriteOperation<TDoc>>): Promise<void> {
159159
const span = startSpanManual('WrappedCollection.bulkWrite')
160160
if (span) {
161161
span.addLabels({
@@ -165,7 +165,7 @@ class WrappedCollection<TDoc extends { _id: ProtectedString<any> }> implements I
165165
}
166166

167167
if (ops.length > 0) {
168-
const bulkWriteResult = await this.#collection.bulkWrite(ops, {
168+
const bulkWriteResult = await this.#collection.bulkWrite(ops as AnyBulkWriteOperation<TDoc>[], {
169169
ordered: false,
170170
})
171171
if (bulkWriteResult && bulkWriteResult.hasWriteErrors()) {

0 commit comments

Comments
 (0)