@@ -37,12 +37,13 @@ public void ChangeStreamExample1()
3737 var inventory = database . GetCollection < BsonDocument > ( "inventory" ) ;
3838
3939 var document = new BsonDocument ( "x" , 1 ) ;
40- new Thread ( ( ) =>
40+ var thread = new Thread ( ( ) =>
4141 {
4242 Thread . Sleep ( TimeSpan . FromMilliseconds ( 100 ) ) ;
4343 inventory . InsertOne ( document ) ;
44- } )
45- . Start ( ) ;
44+ } ) ;
45+
46+ thread . Start ( ) ;
4647
4748 // Start Changestream Example 1
4849 var cursor = inventory . Watch ( ) ;
@@ -52,6 +53,9 @@ public void ChangeStreamExample1()
5253 // End Changestream Example 1
5354
5455 next . FullDocument . Should ( ) . Be ( document ) ;
56+
57+ // Make sure worker thread is finished before we let the next test run.
58+ thread . Join ( ) ;
5559 }
5660
5761 [ Fact ]
@@ -64,14 +68,15 @@ public void ChangeStreamExample2()
6468
6569 var document = new BsonDocument ( "x" , 1 ) ;
6670 inventory . InsertOne ( document ) ;
67- new Thread ( ( ) =>
71+ var thread = new Thread ( ( ) =>
6872 {
6973 Thread . Sleep ( TimeSpan . FromMilliseconds ( 100 ) ) ;
7074 var filter = new BsonDocument ( "_id" , document [ "_id" ] ) ;
7175 var update = "{ $set : { x : 2 } }" ;
7276 inventory . UpdateOne ( filter , update ) ;
73- } )
74- . Start ( ) ;
77+ } ) ;
78+
79+ thread . Start ( ) ;
7580
7681 // Start Changestream Example 2
7782 var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption . UpdateLookup } ;
@@ -83,6 +88,9 @@ public void ChangeStreamExample2()
8388
8489 var expectedFullDocument = document . Set ( "x" , 2 ) ;
8590 next . FullDocument . Should ( ) . Be ( expectedFullDocument ) ;
91+
92+ // Make sure worker thread is finished before we let the next test run.
93+ thread . Join ( ) ;
8694 }
8795
8896 [ Fact ]
@@ -101,15 +109,19 @@ public void ChangeStreamExample3()
101109
102110 IChangeStreamCursor < ChangeStreamDocument < BsonDocument > > previousCursor ;
103111 {
104- new Thread ( ( ) =>
112+ var thread = new Thread ( ( ) =>
105113 {
106114 Thread . Sleep ( TimeSpan . FromMilliseconds ( 100 ) ) ;
107115 inventory . InsertMany ( documents ) ;
108- } )
109- . Start ( ) ;
116+ } ) ;
117+
118+ thread . Start ( ) ;
110119
111120 previousCursor = inventory . Watch ( new ChangeStreamOptions { BatchSize = 1 } ) ;
112121 while ( previousCursor . MoveNext ( ) && previousCursor . Current . Count ( ) == 0 ) { } // keep calling MoveNext until we've read the first batch
122+
123+ // Make sure worker thread is finished before we let the next test run.
124+ thread . Join ( ) ;
113125 }
114126
115127 {
@@ -135,22 +147,24 @@ public void ChangestreamExample4()
135147 var database = client . GetDatabase ( "ChangeStreamExamples" ) ;
136148 database . DropCollection ( "inventory" ) ;
137149
138- using var cancelationTokenSource = new CancellationTokenSource ( ) ;
139- try
150+ var workerEnabled = true ;
151+ var document = new BsonDocument ( "username" , "alice" ) ;
152+
153+ var thread = new Thread ( ( ) =>
140154 {
141- var document = new BsonDocument ( "username" , "alice ") ;
155+ var inventoryCollection = database . GetCollection < BsonDocument > ( "inventory ") ;
142156
143- Task . Run ( ( ) =>
157+ while ( workerEnabled )
144158 {
145- var inventoryCollection = database . GetCollection < BsonDocument > ( "inventory" ) ;
159+ Thread . Sleep ( TimeSpan . FromMilliseconds ( 100 ) ) ;
160+ document [ "_id" ] = ObjectId . GenerateNewId ( ) ;
161+ inventoryCollection . InsertOne ( document ) ;
162+ }
163+ } ) ;
146164
147- while ( ! cancelationTokenSource . IsCancellationRequested )
148- {
149- Thread . Sleep ( TimeSpan . FromMilliseconds ( 100 ) ) ;
150- document [ "_id" ] = ObjectId . GenerateNewId ( ) ;
151- inventoryCollection . InsertOne ( document ) ;
152- }
153- } , cancelationTokenSource . Token ) ;
165+ try
166+ {
167+ thread . Start ( ) ;
154168
155169 // Start Changestream Example 4
156170 var pipeline = new EmptyPipelineDefinition < ChangeStreamDocument < BsonDocument > > ( )
@@ -170,7 +184,11 @@ public void ChangestreamExample4()
170184 }
171185 finally
172186 {
173- cancelationTokenSource . Cancel ( ) ;
187+ workerEnabled = false ;
188+
189+ // Make sure worker thread is finished before we let the next test run.
190+ // Especially important for this test case since the thread continually inserts documents.
191+ thread . Join ( ) ;
174192 }
175193 }
176194 }
0 commit comments