@@ -21,11 +21,16 @@ import {
2121import { TdSpanAttributes } from "../../../core/types" ;
2222import { ConnectionHandler } from "./handlers/ConnectionHandler" ;
2323import { TdFakeFindCursor , TdFakeAggregationCursor , TdFakeChangeStream } from "./mocks/FakeCursor" ;
24+ import { TdFakeTopology } from "./mocks/FakeTopology" ;
2425import {
2526 sanitizeBsonValue ,
2627 reconstructBsonValue ,
2728 addOutputAttributesToSpan ,
2829 sanitizeOptions ,
30+ wrapCursorOutput ,
31+ unwrapCursorOutput ,
32+ wrapDirectOutput ,
33+ unwrapDirectOutput ,
2934} from "./utils/bsonConversion" ;
3035
3136/**
@@ -191,6 +196,18 @@ export class MongodbInstrumentation extends TdInstrumentationBase {
191196 ) ;
192197 }
193198
199+ // Patch Collection.prototype.initializeOrderedBulkOp / initializeUnorderedBulkOp
200+ // In replay mode, inject FakeTopology before calling original to prevent
201+ // "MongoClient must be connected" error from BulkOperationBase constructor.
202+ try {
203+ this . _patchBulkOpInitMethods ( actualExports ) ;
204+ } catch ( error ) {
205+ logger . error (
206+ `[${ this . INSTRUMENTATION_NAME } ] Error patching bulk op init methods, skipping:` ,
207+ error ,
208+ ) ;
209+ }
210+
194211 // Patch Db.prototype methods
195212 try {
196213 this . _patchDbMethods ( actualExports ) ;
@@ -345,7 +362,7 @@ export class MongodbInstrumentation extends TdInstrumentationBase {
345362 return resultPromise
346363 . then ( ( result : any ) => {
347364 try {
348- addOutputAttributesToSpan ( spanInfo , result ) ;
365+ addOutputAttributesToSpan ( spanInfo , wrapDirectOutput ( result ) ) ;
349366 SpanUtils . endSpan ( spanInfo . span , {
350367 code : SpanStatusCode . OK ,
351368 } ) ;
@@ -437,7 +454,9 @@ export class MongodbInstrumentation extends TdInstrumentationBase {
437454 throw new Error ( errorMsg ) ;
438455 }
439456
440- const result = reconstructBsonValue ( mockData . result , this . moduleExports ) ;
457+ const result = unwrapDirectOutput (
458+ reconstructBsonValue ( mockData . result , this . moduleExports ) ,
459+ ) ;
441460
442461 SpanUtils . endSpan ( spanInfo . span , { code : SpanStatusCode . OK } ) ;
443462 return result ;
@@ -597,7 +616,10 @@ export class MongodbInstrumentation extends TdInstrumentationBase {
597616 if ( cursorState . recorded || ! cursorState . spanInfo ) return ;
598617 cursorState . recorded = true ;
599618 try {
600- addOutputAttributesToSpan ( cursorState . spanInfo , cursorState . collectedDocuments ) ;
619+ addOutputAttributesToSpan (
620+ cursorState . spanInfo ,
621+ wrapCursorOutput ( cursorState . collectedDocuments ) ,
622+ ) ;
601623 SpanUtils . endSpan ( cursorState . spanInfo . span , {
602624 code : SpanStatusCode . OK ,
603625 } ) ;
@@ -704,7 +726,7 @@ export class MongodbInstrumentation extends TdInstrumentationBase {
704726 return originalToArray ( )
705727 . then ( ( result : any [ ] ) => {
706728 try {
707- addOutputAttributesToSpan ( spanInfo , result ) ;
729+ addOutputAttributesToSpan ( spanInfo , wrapCursorOutput ( result ) ) ;
708730 SpanUtils . endSpan ( spanInfo . span , {
709731 code : SpanStatusCode . OK ,
710732 } ) ;
@@ -835,7 +857,7 @@ export class MongodbInstrumentation extends TdInstrumentationBase {
835857 return originalForEach ( wrappedIterator )
836858 . then ( ( ) => {
837859 try {
838- addOutputAttributesToSpan ( spanInfo , collectedDocs ) ;
860+ addOutputAttributesToSpan ( spanInfo , wrapCursorOutput ( collectedDocs ) ) ;
839861 SpanUtils . endSpan ( spanInfo . span , {
840862 code : SpanStatusCode . OK ,
841863 } ) ;
@@ -960,12 +982,13 @@ export class MongodbInstrumentation extends TdInstrumentationBase {
960982 throw new Error ( errorMsg ) ;
961983 }
962984
963- const documents = reconstructBsonValue ( mockData . result , self . moduleExports ) ;
985+ const reconstructed = reconstructBsonValue ( mockData . result , self . moduleExports ) ;
986+ const documents = unwrapCursorOutput ( reconstructed ) ;
964987
965988 SpanUtils . endSpan ( spanInfo . span , {
966989 code : SpanStatusCode . OK ,
967990 } ) ;
968- return Array . isArray ( documents ) ? documents : [ documents ] ;
991+ return documents ;
969992 } catch ( error : any ) {
970993 SpanUtils . endSpan ( spanInfo . span , {
971994 code : SpanStatusCode . ERROR ,
@@ -1401,7 +1424,7 @@ export class MongodbInstrumentation extends TdInstrumentationBase {
14011424 return resultPromise
14021425 . then ( ( result : any ) => {
14031426 try {
1404- addOutputAttributesToSpan ( spanInfo , result ) ;
1427+ addOutputAttributesToSpan ( spanInfo , wrapDirectOutput ( result ) ) ;
14051428 SpanUtils . endSpan ( spanInfo . span , {
14061429 code : SpanStatusCode . OK ,
14071430 } ) ;
@@ -1493,7 +1516,9 @@ export class MongodbInstrumentation extends TdInstrumentationBase {
14931516 throw new Error ( errorMsg ) ;
14941517 }
14951518
1496- const result = reconstructBsonValue ( mockData . result , this . moduleExports ) ;
1519+ const result = unwrapDirectOutput (
1520+ reconstructBsonValue ( mockData . result , this . moduleExports ) ,
1521+ ) ;
14971522
14981523 SpanUtils . endSpan ( spanInfo . span , { code : SpanStatusCode . OK } ) ;
14991524 return result ;
@@ -1787,12 +1812,13 @@ export class MongodbInstrumentation extends TdInstrumentationBase {
17871812 throw new Error ( errorMsg ) ;
17881813 }
17891814
1790- const documents = reconstructBsonValue ( mockData . result , self . moduleExports ) ;
1815+ const reconstructed = reconstructBsonValue ( mockData . result , self . moduleExports ) ;
1816+ const documents = unwrapCursorOutput ( reconstructed ) ;
17911817
17921818 SpanUtils . endSpan ( spanInfo . span , {
17931819 code : SpanStatusCode . OK ,
17941820 } ) ;
1795- return Array . isArray ( documents ) ? documents : [ documents ] ;
1821+ return documents ;
17961822 } catch ( error : any ) {
17971823 SpanUtils . endSpan ( spanInfo . span , {
17981824 code : SpanStatusCode . ERROR ,
@@ -2252,6 +2278,87 @@ export class MongodbInstrumentation extends TdInstrumentationBase {
22522278 // Bulk Operations (Ordered/Unordered)
22532279 // ---------------------------------------------------------------------------
22542280
2281+ /**
2282+ * Patch Collection.prototype.initializeOrderedBulkOp and
2283+ * Collection.prototype.initializeUnorderedBulkOp.
2284+ *
2285+ * In replay mode, BulkOperationBase's constructor calls getTopology(collection)
2286+ * which throws if no topology is connected. We inject a FakeTopology onto the
2287+ * collection (and its client) BEFORE calling the original, so the constructor
2288+ * finds a valid topology object and proceeds with default size limits.
2289+ */
2290+ private _patchBulkOpInitMethods ( actualExports : any ) : void {
2291+ const Collection = actualExports . Collection ;
2292+ if ( ! Collection || ! Collection . prototype ) {
2293+ logger . warn (
2294+ `[${ this . INSTRUMENTATION_NAME } ] Collection not found, skipping bulk op init patching` ,
2295+ ) ;
2296+ return ;
2297+ }
2298+
2299+ const self = this ;
2300+
2301+ // Patch initializeOrderedBulkOp
2302+ if ( typeof Collection . prototype . initializeOrderedBulkOp === "function" ) {
2303+ this . _wrap (
2304+ Collection . prototype ,
2305+ "initializeOrderedBulkOp" ,
2306+ ( original : Function ) => {
2307+ return function ( this : any , ...args : any [ ] ) {
2308+ if ( self . mode === TuskDriftMode . REPLAY ) {
2309+ self . _injectFakeTopology ( this ) ;
2310+ }
2311+ return original . apply ( this , args ) ;
2312+ } ;
2313+ } ,
2314+ ) ;
2315+ logger . debug (
2316+ `[${ this . INSTRUMENTATION_NAME } ] Wrapped Collection.prototype.initializeOrderedBulkOp` ,
2317+ ) ;
2318+ }
2319+
2320+ // Patch initializeUnorderedBulkOp
2321+ if ( typeof Collection . prototype . initializeUnorderedBulkOp === "function" ) {
2322+ this . _wrap (
2323+ Collection . prototype ,
2324+ "initializeUnorderedBulkOp" ,
2325+ ( original : Function ) => {
2326+ return function ( this : any , ...args : any [ ] ) {
2327+ if ( self . mode === TuskDriftMode . REPLAY ) {
2328+ self . _injectFakeTopology ( this ) ;
2329+ }
2330+ return original . apply ( this , args ) ;
2331+ } ;
2332+ } ,
2333+ ) ;
2334+ logger . debug (
2335+ `[${ this . INSTRUMENTATION_NAME } ] Wrapped Collection.prototype.initializeUnorderedBulkOp` ,
2336+ ) ;
2337+ }
2338+ }
2339+
2340+ /**
2341+ * Inject a FakeTopology onto a collection and its client for replay mode.
2342+ *
2343+ * getTopology() in the MongoDB driver checks:
2344+ * 1. provider.topology (direct property on collection)
2345+ * 2. provider.client.topology (via the MongoClient)
2346+ * We set both to ensure the topology lookup succeeds.
2347+ */
2348+ private _injectFakeTopology ( collection : any ) : void {
2349+ const fakeTopology = new TdFakeTopology ( ) ;
2350+
2351+ // Set on the client (satisfies getTopology's client.topology check)
2352+ if ( collection . client && ! collection . client . topology ) {
2353+ collection . client . topology = fakeTopology ;
2354+ }
2355+
2356+ // Set on collection directly as fallback
2357+ if ( ! collection . topology ) {
2358+ collection . topology = fakeTopology ;
2359+ }
2360+ }
2361+
22552362 /**
22562363 * Patch OrderedBulkOperation.prototype.execute from mongodb/lib/bulk/ordered.js.
22572364 */
@@ -2560,7 +2667,7 @@ export class MongodbInstrumentation extends TdInstrumentationBase {
25602667 return resultPromise
25612668 . then ( ( result : any ) => {
25622669 try {
2563- addOutputAttributesToSpan ( spanInfo , result ) ;
2670+ addOutputAttributesToSpan ( spanInfo , wrapDirectOutput ( result ) ) ;
25642671 SpanUtils . endSpan ( spanInfo . span , {
25652672 code : SpanStatusCode . OK ,
25662673 } ) ;
@@ -2660,7 +2767,9 @@ export class MongodbInstrumentation extends TdInstrumentationBase {
26602767 throw new Error ( errorMsg ) ;
26612768 }
26622769
2663- const result = reconstructBsonValue ( mockData . result , this . moduleExports ) ;
2770+ const result = unwrapDirectOutput (
2771+ reconstructBsonValue ( mockData . result , this . moduleExports ) ,
2772+ ) ;
26642773
26652774 SpanUtils . endSpan ( spanInfo . span , { code : SpanStatusCode . OK } ) ;
26662775 return result ;
0 commit comments