|
43 | 43 | from bson.int64 import Int64 |
44 | 44 | from bson.raw_bson import RawBSONDocument |
45 | 45 | from bson.son import SON |
| 46 | +from pymongo import MongoClient |
46 | 47 | from pymongo.errors import ( |
47 | 48 | AutoReconnect, |
48 | 49 | ConnectionFailure, |
49 | | - OperationFailure, |
| 50 | + NotPrimaryError, |
50 | 51 | ServerSelectionTimeoutError, |
51 | 52 | WriteConcernError, |
52 | 53 | ) |
53 | 54 | from pymongo.monitoring import ( |
| 55 | + CommandFailedEvent, |
54 | 56 | CommandSucceededEvent, |
55 | 57 | ConnectionCheckedOutEvent, |
56 | 58 | ConnectionCheckOutFailedEvent, |
@@ -601,5 +603,192 @@ def raise_connection_err_select_server(*args, **kwargs): |
601 | 603 | self.assertEqual(sent_txn_id, final_txn_id, msg) |
602 | 604 |
|
603 | 605 |
|
| 606 | +class TestErrorPropagationAfterEncounteringMultipleErrors(AsyncIntegrationTest): |
| 607 | + # Only run against replica sets as mongos does not propagate the NoWritesPerformed label to the drivers. |
| 608 | + @async_client_context.require_replica_set |
| 609 | + # Run against server versions 6.0 and above. |
| 610 | + @async_client_context.require_version_min(6, 0) # type: ignore[untyped-decorator] |
| 611 | + async def asyncSetUp(self) -> None: |
| 612 | + await super().asyncSetUp() |
| 613 | + self.setup_client = MongoClient(**async_client_context.default_client_options) |
| 614 | + self.addCleanup(self.setup_client.close) |
| 615 | + |
| 616 | + # TODO: After PYTHON-4595 we can use async event handlers and remove this workaround. |
| 617 | + def configure_fail_point_sync(self, command_args, off=False) -> None: |
| 618 | + cmd = {"configureFailPoint": "failCommand"} |
| 619 | + cmd.update(command_args) |
| 620 | + if off: |
| 621 | + cmd["mode"] = "off" |
| 622 | + cmd.pop("data", None) |
| 623 | + self.setup_client.admin.command(cmd) |
| 624 | + |
| 625 | + async def test_01_drivers_return_the_correct_error_when_receiving_only_errors_without_NoWritesPerformed( |
| 626 | + self |
| 627 | + ) -> None: |
| 628 | + # Create a client with retryWrites=true. |
| 629 | + listener = OvertCommandListener() |
| 630 | + |
| 631 | + # Configure a fail point with error code 91 (ShutdownInProgress) with the RetryableError and SystemOverloadedError error labels. |
| 632 | + command_args = { |
| 633 | + "configureFailPoint": "failCommand", |
| 634 | + "mode": {"times": 1}, |
| 635 | + "data": { |
| 636 | + "failCommands": ["insert"], |
| 637 | + "errorLabels": ["RetryableError", "SystemOverloadedError"], |
| 638 | + "errorCode": 91, |
| 639 | + }, |
| 640 | + } |
| 641 | + |
| 642 | + # Via the command monitoring CommandFailedEvent, configure a fail point with error code 10107 (NotWritablePrimary). |
| 643 | + command_args_inner = { |
| 644 | + "configureFailPoint": "failCommand", |
| 645 | + "mode": "alwaysOn", |
| 646 | + "data": { |
| 647 | + "failCommands": ["insert"], |
| 648 | + "errorCode": 10107, |
| 649 | + "errorLabels": ["RetryableError", "SystemOverloadedError"], |
| 650 | + }, |
| 651 | + } |
| 652 | + |
| 653 | + def failed(event: CommandFailedEvent) -> None: |
| 654 | + # Configure the 10107 fail point command only if the the failed event is for the 91 error configured in step 2. |
| 655 | + if listener.failed_events: |
| 656 | + return |
| 657 | + assert event.failure["code"] == 91 |
| 658 | + self.configure_fail_point_sync(command_args_inner) |
| 659 | + listener.failed_events.append(event) |
| 660 | + |
| 661 | + listener.failed = failed |
| 662 | + |
| 663 | + client = await self.async_rs_client(retryWrites=True, event_listeners=[listener]) |
| 664 | + |
| 665 | + self.configure_fail_point_sync(command_args) |
| 666 | + |
| 667 | + # Attempt an insertOne operation on any record for any database and collection. |
| 668 | + # Expect the insertOne to fail with a server error. |
| 669 | + with self.assertRaises(NotPrimaryError) as exc: |
| 670 | + await client.test.test.insert_one({}) |
| 671 | + |
| 672 | + # Assert that the error code of the server error is 10107. |
| 673 | + assert exc.exception.errors["code"] == 10107 # type:ignore[call-overload] |
| 674 | + |
| 675 | + # Disable the fail point. |
| 676 | + self.configure_fail_point_sync({}, off=True) |
| 677 | + |
| 678 | + async def test_02_drivers_return_the_correct_error_when_receiving_only_errors_with_NoWritesPerformed( |
| 679 | + self |
| 680 | + ) -> None: |
| 681 | + # Create a client with retryWrites=true. |
| 682 | + listener = OvertCommandListener() |
| 683 | + |
| 684 | + # Configure a fail point with error code 91 (ShutdownInProgress) with the RetryableError and SystemOverloadedError error labels. |
| 685 | + command_args = { |
| 686 | + "configureFailPoint": "failCommand", |
| 687 | + "mode": {"times": 1}, |
| 688 | + "data": { |
| 689 | + "failCommands": ["insert"], |
| 690 | + "errorLabels": ["RetryableError", "SystemOverloadedError", "NoWritesPerformed"], |
| 691 | + "errorCode": 91, |
| 692 | + }, |
| 693 | + } |
| 694 | + |
| 695 | + # Via the command monitoring CommandFailedEvent, configure a fail point with error code `10107` (NotWritablePrimary) |
| 696 | + # and a NoWritesPerformed label. |
| 697 | + command_args_inner = { |
| 698 | + "configureFailPoint": "failCommand", |
| 699 | + "mode": "alwaysOn", |
| 700 | + "data": { |
| 701 | + "failCommands": ["insert"], |
| 702 | + "errorCode": 10107, |
| 703 | + "errorLabels": ["RetryableError", "SystemOverloadedError", "NoWritesPerformed"], |
| 704 | + }, |
| 705 | + } |
| 706 | + |
| 707 | + def failed(event: CommandFailedEvent) -> None: |
| 708 | + if listener.failed_events: |
| 709 | + return |
| 710 | + # Configure the 10107 fail point command only if the the failed event is for the 91 error configured in step 2. |
| 711 | + assert event.failure["code"] == 91 |
| 712 | + self.configure_fail_point_sync(command_args_inner) |
| 713 | + listener.failed_events.append(event) |
| 714 | + |
| 715 | + listener.failed = failed |
| 716 | + |
| 717 | + client = await self.async_rs_client(retryWrites=True, event_listeners=[listener]) |
| 718 | + |
| 719 | + self.configure_fail_point_sync(command_args) |
| 720 | + |
| 721 | + # Attempt an insertOne operation on any record for any database and collection. |
| 722 | + # Expect the insertOne to fail with a server error. |
| 723 | + with self.assertRaises(NotPrimaryError) as exc: |
| 724 | + await client.test.test.insert_one({}) |
| 725 | + |
| 726 | + # Assert that the error code of the server error is 91. |
| 727 | + assert exc.exception.errors["code"] == 91 # type:ignore[call-overload] |
| 728 | + |
| 729 | + # Disable the fail point. |
| 730 | + self.configure_fail_point_sync({}, off=True) |
| 731 | + |
| 732 | + async def test_03_drivers_return_the_correct_error_when_receiving_some_errors_with_NoWritesPerformed_and_some_without_NoWritesPerformed( |
| 733 | + self |
| 734 | + ) -> None: |
| 735 | + # TODO: read the expected behavior and add breakpoint() to the retry loop |
| 736 | + # Create a client with retryWrites=true. |
| 737 | + listener = OvertCommandListener() |
| 738 | + |
| 739 | + # Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error |
| 740 | + # code `91` (NotWritablePrimary) and the `NoWritesPerformed`, `RetryableError` and `SystemOverloadedError` labels. |
| 741 | + command_args_inner = { |
| 742 | + "configureFailPoint": "failCommand", |
| 743 | + "mode": "alwaysOn", |
| 744 | + "data": { |
| 745 | + "failCommands": ["insert"], |
| 746 | + "errorLabels": ["RetryableError", "SystemOverloadedError", "NoWritesPerformed"], |
| 747 | + "errorCode": 91, |
| 748 | + }, |
| 749 | + } |
| 750 | + |
| 751 | + # Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and |
| 752 | + # `SystemOverloadedError` error labels but without the `NoWritesPerformed` error label. |
| 753 | + command_args = { |
| 754 | + "configureFailPoint": "failCommand", |
| 755 | + "mode": {"times": 1}, |
| 756 | + "data": { |
| 757 | + "failCommands": ["insert"], |
| 758 | + "errorCode": 91, |
| 759 | + "errorLabels": ["RetryableError", "SystemOverloadedError"], |
| 760 | + }, |
| 761 | + } |
| 762 | + |
| 763 | + def failed(event: CommandFailedEvent) -> None: |
| 764 | + # Configure the fail point command only if the the failed event is for the 91 error configured in step 2. |
| 765 | + if listener.failed_events: |
| 766 | + return |
| 767 | + assert event.failure["code"] == 91 |
| 768 | + self.configure_fail_point_sync(command_args_inner) |
| 769 | + listener.failed_events.append(event) |
| 770 | + |
| 771 | + listener.failed = failed |
| 772 | + |
| 773 | + client = await self.async_rs_client(retryWrites=True, event_listeners=[listener]) |
| 774 | + |
| 775 | + self.configure_fail_point_sync(command_args) |
| 776 | + |
| 777 | + # Attempt an insertOne operation on any record for any database and collection. |
| 778 | + # Expect the insertOne to fail with a server error. |
| 779 | + from pymongo.errors import OperationFailure |
| 780 | + |
| 781 | + with self.assertRaises(Exception) as exc: |
| 782 | + await client.test.test.insert_one({}) |
| 783 | + |
| 784 | + # Assert that the error code of the server error is 91. |
| 785 | + assert exc.exception.errors["code"] == 91 |
| 786 | + # Assert that the error does not contain the error label `NoWritesPerformed`. |
| 787 | + assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"] |
| 788 | + |
| 789 | + # Disable the fail point. |
| 790 | + self.configure_fail_point_sync({}, off=True) |
| 791 | + |
| 792 | + |
604 | 793 | if __name__ == "__main__": |
605 | 794 | unittest.main() |
0 commit comments