Skip to content

Commit adb9015

Browse files
committed
PYTHON-5763 Improve async test coverage
1 parent 13085ff commit adb9015

File tree

10 files changed

+2568
-0
lines changed

10 files changed

+2568
-0
lines changed

test/asynchronous/test_bulk.py

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,5 +1118,235 @@ async def test_write_concern_failure_unordered(self):
11181118
self.assertTrue(upserts[0].get("_id"))
11191119

11201120

1121+
class TestBulkWriteCoverage(AsyncBulkTestBase):
1122+
"""Additional tests to improve code coverage for async bulk operations."""
1123+
1124+
async def test_bulk_write_with_comment(self):
1125+
"""Test bulk write operations with comment parameter."""
1126+
requests = [
1127+
InsertOne({"x": 1}),
1128+
UpdateOne({"x": 1}, {"$set": {"y": 1}}),
1129+
DeleteOne({"x": 1}),
1130+
]
1131+
result = await self.coll.bulk_write(requests, comment="bulk_comment")
1132+
self.assertEqual(1, result.inserted_count)
1133+
self.assertEqual(1, result.modified_count)
1134+
self.assertEqual(1, result.deleted_count)
1135+
1136+
async def test_bulk_write_with_let(self):
1137+
"""Test bulk write operations with let parameter."""
1138+
if not async_client_context.version.at_least(5, 0):
1139+
self.skipTest("let parameter requires MongoDB 5.0+")
1140+
1141+
await self.coll.insert_one({"x": 1})
1142+
requests = [
1143+
UpdateOne({"$expr": {"$eq": ["$x", "$$targetVal"]}}, {"$set": {"updated": True}}),
1144+
]
1145+
result = await self.coll.bulk_write(requests, let={"targetVal": 1})
1146+
self.assertEqual(1, result.modified_count)
1147+
1148+
async def test_bulk_write_all_operation_types(self):
1149+
"""Test bulk write with all operation types combined."""
1150+
await self.coll.insert_many([{"x": i} for i in range(5)])
1151+
1152+
requests = [
1153+
InsertOne({"x": 100}),
1154+
UpdateOne({"x": 0}, {"$set": {"updated": True}}),
1155+
UpdateMany({"x": {"$lte": 2}}, {"$set": {"batch_updated": True}}),
1156+
ReplaceOne({"x": 3}, {"x": 3, "replaced": True}),
1157+
DeleteOne({"x": 4}),
1158+
DeleteMany({"x": {"$gt": 50}}),
1159+
]
1160+
result = await self.coll.bulk_write(requests)
1161+
1162+
self.assertEqual(1, result.inserted_count)
1163+
self.assertGreaterEqual(result.modified_count, 1)
1164+
self.assertGreaterEqual(result.deleted_count, 1)
1165+
1166+
async def test_bulk_write_unordered(self):
1167+
"""Test unordered bulk write continues after error."""
1168+
await self.coll.create_index([("x", 1)], unique=True)
1169+
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])
1170+
1171+
requests = [
1172+
InsertOne({"x": 1}),
1173+
InsertOne({"x": 1}), # Duplicate - will error
1174+
InsertOne({"x": 2}),
1175+
InsertOne({"x": 3}),
1176+
]
1177+
1178+
with self.assertRaises(BulkWriteError) as ctx:
1179+
await self.coll.bulk_write(requests, ordered=False)
1180+
1181+
# With unordered, should have inserted 3 documents
1182+
self.assertEqual(3, ctx.exception.details["nInserted"])
1183+
1184+
async def test_bulk_write_ordered(self):
1185+
"""Test ordered bulk write stops on first error."""
1186+
await self.coll.create_index([("x", 1)], unique=True)
1187+
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])
1188+
1189+
requests = [
1190+
InsertOne({"x": 1}),
1191+
InsertOne({"x": 1}), # Duplicate - will error
1192+
InsertOne({"x": 2}),
1193+
InsertOne({"x": 3}),
1194+
]
1195+
1196+
with self.assertRaises(BulkWriteError) as ctx:
1197+
await self.coll.bulk_write(requests, ordered=True)
1198+
1199+
# With ordered, should have inserted only 1 document
1200+
self.assertEqual(1, ctx.exception.details["nInserted"])
1201+
1202+
async def test_bulk_write_bypass_document_validation(self):
1203+
"""Test bulk write with bypass_document_validation."""
1204+
if not async_client_context.version.at_least(3, 2):
1205+
self.skipTest("bypass_document_validation requires MongoDB 3.2+")
1206+
1207+
# Create collection with validator
1208+
await self.coll.drop()
1209+
await self.db.create_collection(
1210+
self.coll.name, validator={"$jsonSchema": {"required": ["name"]}}
1211+
)
1212+
1213+
# Without bypass, should fail
1214+
with self.assertRaises(BulkWriteError):
1215+
await self.coll.bulk_write([InsertOne({"x": 1})])
1216+
1217+
# With bypass, should succeed
1218+
result = await self.coll.bulk_write([InsertOne({"x": 1})], bypass_document_validation=True)
1219+
self.assertEqual(1, result.inserted_count)
1220+
1221+
async def test_bulk_write_empty_requests(self):
1222+
"""Test bulk write with empty requests list."""
1223+
with self.assertRaises(InvalidOperation):
1224+
await self.coll.bulk_write([])
1225+
1226+
async def test_bulk_write_result_properties(self):
1227+
"""Test all BulkWriteResult properties."""
1228+
await self.coll.insert_one({"x": 1})
1229+
1230+
requests = [
1231+
InsertOne({"x": 2}),
1232+
UpdateOne({"x": 1}, {"$set": {"updated": True}}),
1233+
ReplaceOne({"x": 2}, {"x": 2, "replaced": True}, upsert=True),
1234+
DeleteOne({"x": 1}),
1235+
]
1236+
result = await self.coll.bulk_write(requests)
1237+
1238+
# Check all properties
1239+
self.assertTrue(result.acknowledged)
1240+
self.assertEqual(1, result.inserted_count)
1241+
self.assertGreaterEqual(result.matched_count, 0)
1242+
self.assertGreaterEqual(result.modified_count, 0)
1243+
self.assertEqual(1, result.deleted_count)
1244+
self.assertIsInstance(result.upserted_count, int)
1245+
self.assertIsInstance(result.upserted_ids, dict)
1246+
1247+
async def test_bulk_write_with_upsert(self):
1248+
"""Test bulk write upsert operations."""
1249+
requests = [
1250+
UpdateOne({"x": 1}, {"$set": {"y": 1}}, upsert=True),
1251+
UpdateOne({"x": 2}, {"$set": {"y": 2}}, upsert=True),
1252+
ReplaceOne({"x": 3}, {"x": 3, "y": 3}, upsert=True),
1253+
]
1254+
result = await self.coll.bulk_write(requests)
1255+
1256+
self.assertEqual(3, result.upserted_count)
1257+
self.assertEqual(3, len(result.upserted_ids))
1258+
1259+
async def test_update_one_with_hint(self):
1260+
"""Test UpdateOne with hint parameter."""
1261+
await self.coll.create_index([("x", 1)])
1262+
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])
1263+
1264+
await self.coll.insert_one({"x": 1})
1265+
1266+
requests = [UpdateOne({"x": 1}, {"$set": {"y": 1}}, hint=[("x", 1)])]
1267+
result = await self.coll.bulk_write(requests)
1268+
self.assertEqual(1, result.modified_count)
1269+
1270+
async def test_update_many_with_hint(self):
1271+
"""Test UpdateMany with hint parameter."""
1272+
await self.coll.create_index([("x", 1)])
1273+
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])
1274+
1275+
await self.coll.insert_many([{"x": 1}, {"x": 1}])
1276+
1277+
requests = [UpdateMany({"x": 1}, {"$set": {"y": 1}}, hint=[("x", 1)])]
1278+
result = await self.coll.bulk_write(requests)
1279+
self.assertEqual(2, result.modified_count)
1280+
1281+
async def test_delete_one_with_hint(self):
1282+
"""Test DeleteOne with hint parameter."""
1283+
await self.coll.create_index([("x", 1)])
1284+
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])
1285+
1286+
await self.coll.insert_one({"x": 1})
1287+
1288+
requests = [DeleteOne({"x": 1}, hint=[("x", 1)])]
1289+
result = await self.coll.bulk_write(requests)
1290+
self.assertEqual(1, result.deleted_count)
1291+
1292+
async def test_delete_many_with_hint(self):
1293+
"""Test DeleteMany with hint parameter."""
1294+
await self.coll.create_index([("x", 1)])
1295+
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])
1296+
1297+
await self.coll.insert_many([{"x": 1}, {"x": 1}])
1298+
1299+
requests = [DeleteMany({"x": 1}, hint=[("x", 1)])]
1300+
result = await self.coll.bulk_write(requests)
1301+
self.assertEqual(2, result.deleted_count)
1302+
1303+
async def test_update_one_with_array_filters(self):
1304+
"""Test UpdateOne with array_filters parameter."""
1305+
await self.coll.insert_one({"x": [{"y": 1}, {"y": 2}, {"y": 3}]})
1306+
1307+
requests = [
1308+
UpdateOne({}, {"$set": {"x.$[elem].z": 1}}, array_filters=[{"elem.y": {"$gt": 1}}])
1309+
]
1310+
result = await self.coll.bulk_write(requests)
1311+
self.assertEqual(1, result.modified_count)
1312+
1313+
doc = await self.coll.find_one()
1314+
# Elements with y > 1 should have z = 1
1315+
for elem in doc["x"]:
1316+
if elem["y"] > 1:
1317+
self.assertEqual(1, elem.get("z"))
1318+
1319+
async def test_replace_one_with_hint(self):
1320+
"""Test ReplaceOne with hint parameter."""
1321+
await self.coll.create_index([("x", 1)])
1322+
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])
1323+
1324+
await self.coll.insert_one({"x": 1})
1325+
1326+
requests = [ReplaceOne({"x": 1}, {"x": 1, "replaced": True}, hint=[("x", 1)])]
1327+
result = await self.coll.bulk_write(requests)
1328+
self.assertEqual(1, result.modified_count)
1329+
1330+
async def test_update_with_collation(self):
1331+
"""Test update operations with collation."""
1332+
await self.coll.insert_many(
1333+
[
1334+
{"name": "cafe"},
1335+
{"name": "Cafe"},
1336+
]
1337+
)
1338+
1339+
requests = [
1340+
UpdateMany(
1341+
{"name": "cafe"},
1342+
{"$set": {"updated": True}},
1343+
collation={"locale": "en", "strength": 2},
1344+
)
1345+
]
1346+
result = await self.coll.bulk_write(requests)
1347+
# With case-insensitive collation, both docs should match
1348+
self.assertEqual(2, result.modified_count)
1349+
1350+
11211351
if __name__ == "__main__":
11221352
unittest.main()

test/asynchronous/test_change_stream.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,5 +1152,122 @@ def asyncTearDown(self):
11521152
)
11531153

11541154

1155+
class TestChangeStreamCoverage(TestAsyncCollectionAsyncChangeStream):
1156+
"""Additional tests to improve code coverage for AsyncChangeStream."""
1157+
1158+
async def test_change_stream_alive_property(self):
1159+
"""Test alive property state transitions."""
1160+
async with await self.change_stream() as cs:
1161+
self.assertTrue(cs.alive)
1162+
# After context exit, should be closed
1163+
self.assertFalse(cs.alive)
1164+
1165+
async def test_change_stream_idempotent_close(self):
1166+
"""Test that close() can be called multiple times safely."""
1167+
cs = await self.change_stream()
1168+
await cs.close()
1169+
# Second close should not raise
1170+
await cs.close()
1171+
self.assertFalse(cs.alive)
1172+
1173+
async def test_change_stream_resume_token_deepcopy(self):
1174+
"""Test that resume_token returns a deep copy."""
1175+
coll = self.watched_collection()
1176+
async with await self.change_stream() as cs:
1177+
await coll.insert_one({"x": 1})
1178+
await anext(cs) # Consume the change event
1179+
token1 = cs.resume_token
1180+
token2 = cs.resume_token
1181+
# Should be equal but different objects
1182+
self.assertEqual(token1, token2)
1183+
self.assertIsNot(token1, token2)
1184+
1185+
async def test_change_stream_with_comment(self):
1186+
"""Test change stream with comment parameter."""
1187+
client, listener = await self.client_with_listener("aggregate")
1188+
try:
1189+
async with await self.change_stream_with_client(client, comment="test_comment"):
1190+
pass
1191+
finally:
1192+
await client.close()
1193+
1194+
# Check that comment was in the aggregate command
1195+
self.assertGreater(len(listener.started_events), 0)
1196+
cmd = listener.started_events[0].command
1197+
self.assertEqual("test_comment", cmd.get("comment"))
1198+
1199+
async def test_change_stream_with_show_expanded_events(self):
1200+
"""Test change stream with show_expanded_events parameter."""
1201+
if not async_client_context.version.at_least(6, 0):
1202+
self.skipTest("show_expanded_events requires MongoDB 6.0+")
1203+
1204+
async with await self.change_stream(show_expanded_events=True) as cs:
1205+
# Just verify it doesn't error
1206+
self.assertTrue(cs.alive)
1207+
1208+
@async_client_context.require_version_min(6, 0)
1209+
async def test_change_stream_with_full_document_before_change(self):
1210+
"""Test change stream with full_document_before_change parameter."""
1211+
coll = self.watched_collection()
1212+
# Need to ensure collection exists with changeStreamPreAndPostImages enabled
1213+
await coll.drop()
1214+
await self.db.create_collection(coll.name, changeStreamPreAndPostImages={"enabled": True})
1215+
await coll.insert_one({"x": 1})
1216+
1217+
async with await self.change_stream(full_document_before_change="whenAvailable") as cs:
1218+
await coll.update_one({"x": 1}, {"$set": {"x": 2}})
1219+
change = await anext(cs)
1220+
self.assertEqual("update", change["operationType"])
1221+
# fullDocumentBeforeChange should be present
1222+
self.assertIn("fullDocumentBeforeChange", change)
1223+
1224+
async def test_change_stream_next_after_close(self):
1225+
"""Test that next() on closed stream raises StopAsyncIteration."""
1226+
cs = await self.change_stream()
1227+
await cs.close()
1228+
with self.assertRaises(StopAsyncIteration):
1229+
await anext(cs)
1230+
1231+
async def test_change_stream_try_next_after_close(self):
1232+
"""Test that try_next() on closed stream raises StopAsyncIteration."""
1233+
cs = await self.change_stream()
1234+
await cs.close()
1235+
with self.assertRaises(StopAsyncIteration):
1236+
await cs.try_next()
1237+
1238+
async def test_change_stream_pipeline_construction(self):
1239+
"""Test change stream pipeline is properly constructed."""
1240+
pipeline = [{"$match": {"operationType": "insert"}}]
1241+
client, listener = await self.client_with_listener("aggregate")
1242+
try:
1243+
async with await self.change_stream_with_client(client, pipeline=pipeline):
1244+
pass
1245+
finally:
1246+
await client.close()
1247+
1248+
cmd = listener.started_events[0].command
1249+
agg_pipeline = cmd["pipeline"]
1250+
# First stage should be $changeStream
1251+
self.assertIn("$changeStream", agg_pipeline[0])
1252+
# Second stage should be our match
1253+
self.assertEqual({"$match": {"operationType": "insert"}}, agg_pipeline[1])
1254+
1255+
async def test_change_stream_empty_pipeline(self):
1256+
"""Test change stream with empty pipeline."""
1257+
async with await self.change_stream(pipeline=[]) as cs:
1258+
self.assertTrue(cs.alive)
1259+
1260+
async def test_change_stream_context_manager_exception(self):
1261+
"""Test change stream context manager closes on exception."""
1262+
cs = None
1263+
try:
1264+
async with await self.change_stream() as cs:
1265+
raise ValueError("test exception")
1266+
except ValueError:
1267+
pass
1268+
# Stream should be closed
1269+
self.assertFalse(cs.alive)
1270+
1271+
11551272
if __name__ == "__main__":
11561273
unittest.main()

0 commit comments

Comments
 (0)