-
Notifications
You must be signed in to change notification settings - Fork 57
fix(mongodb): add pymongo error handling to classify user vs platform errors #677
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
aballman
wants to merge
1
commit into
main
Choose a base branch
from
aballman/mongodb-error-handling
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,128 @@ | ||
| from unittest.mock import MagicMock | ||
|
|
||
| import pytest | ||
| from pymongo.errors import ( | ||
| AutoReconnect, | ||
| BulkWriteError, | ||
| OperationFailure, | ||
| ServerSelectionTimeoutError, | ||
| ) | ||
|
|
||
| from unstructured_ingest.error import ( | ||
| DestinationConnectionError, | ||
| QuotaError, | ||
| TimeoutError, | ||
| WriteError, | ||
| ) | ||
| from unstructured_ingest.processes.connectors.mongodb import ( | ||
| MongoDBConnectionConfig, | ||
| MongoDBUploader, | ||
| MongoDBUploaderConfig, | ||
| ) | ||
|
|
||
|
|
||
| def _make_uploader(): | ||
| connection_config = MagicMock(spec=MongoDBConnectionConfig) | ||
| connection_config.host = "test_host" | ||
| upload_config = MagicMock(spec=MongoDBUploaderConfig) | ||
| upload_config.record_id_key = "record_id" | ||
| upload_config.database = "test_db" | ||
| upload_config.collection = "test_collection" | ||
| upload_config.batch_size = 100 | ||
| return MongoDBUploader( | ||
| connection_config=connection_config, | ||
| upload_config=upload_config, | ||
| ) | ||
|
|
||
|
|
||
| def _mock_client(uploader, collection_side_effect=None): | ||
| mock_client = MagicMock() | ||
| uploader.connection_config.get_client.return_value.__enter__ = MagicMock( | ||
| return_value=mock_client | ||
| ) | ||
| uploader.connection_config.get_client.return_value.__exit__ = MagicMock(return_value=False) | ||
| mock_collection = mock_client.__getitem__("test_db").__getitem__("test_collection") | ||
| # Make can_delete return False so we skip delete_by_record_id and go straight to insert | ||
| mock_collection.list_indexes.return_value = [] | ||
| if collection_side_effect: | ||
| mock_collection.insert_many.side_effect = collection_side_effect | ||
| return mock_collection | ||
|
|
||
|
|
||
| class TestRunDataErrorHandling: | ||
| def test_operation_failure_quota_raises_quota_error(self): | ||
| uploader = _make_uploader() | ||
| file_data = MagicMock() | ||
| file_data.identifier = "test_id" | ||
| _mock_client(uploader, OperationFailure("quota exceeded for writes")) | ||
|
|
||
| with pytest.raises(QuotaError): | ||
| uploader.run_data(data=[{"key": "value"}], file_data=file_data) | ||
|
|
||
| def test_operation_failure_other_raises_destination_error(self): | ||
| uploader = _make_uploader() | ||
| file_data = MagicMock() | ||
| file_data.identifier = "test_id" | ||
| _mock_client(uploader, OperationFailure("some other failure")) | ||
|
|
||
| with pytest.raises(DestinationConnectionError): | ||
| uploader.run_data(data=[{"key": "value"}], file_data=file_data) | ||
|
|
||
| def test_server_selection_timeout_raises_timeout_error(self): | ||
| uploader = _make_uploader() | ||
| file_data = MagicMock() | ||
| file_data.identifier = "test_id" | ||
| _mock_client(uploader, ServerSelectionTimeoutError("timeout")) | ||
|
|
||
| with pytest.raises(TimeoutError): | ||
| uploader.run_data(data=[{"key": "value"}], file_data=file_data) | ||
|
|
||
| def test_bulk_write_error_raises_write_error(self): | ||
| uploader = _make_uploader() | ||
| file_data = MagicMock() | ||
| file_data.identifier = "test_id" | ||
| _mock_client(uploader, BulkWriteError({"writeErrors": [{"errmsg": "fail"}]})) | ||
|
|
||
| with pytest.raises(WriteError): | ||
| uploader.run_data(data=[{"key": "value"}], file_data=file_data) | ||
|
|
||
| def test_auto_reconnect_raises_destination_error(self): | ||
| uploader = _make_uploader() | ||
| file_data = MagicMock() | ||
| file_data.identifier = "test_id" | ||
| _mock_client(uploader, AutoReconnect("connection lost")) | ||
|
|
||
| with pytest.raises(DestinationConnectionError): | ||
| uploader.run_data(data=[{"key": "value"}], file_data=file_data) | ||
|
|
||
|
|
||
| class TestDeleteByRecordIdErrorHandling: | ||
| def test_operation_failure_quota_raises_quota_error(self): | ||
| uploader = _make_uploader() | ||
| file_data = MagicMock() | ||
| file_data.identifier = "test_id" | ||
| collection = MagicMock() | ||
| collection.delete_many.side_effect = OperationFailure("quota exceeded") | ||
|
|
||
| with pytest.raises(QuotaError): | ||
| uploader.delete_by_record_id(collection=collection, file_data=file_data) | ||
|
|
||
| def test_server_selection_timeout_raises_timeout_error(self): | ||
| uploader = _make_uploader() | ||
| file_data = MagicMock() | ||
| file_data.identifier = "test_id" | ||
| collection = MagicMock() | ||
| collection.delete_many.side_effect = ServerSelectionTimeoutError("timeout") | ||
|
|
||
| with pytest.raises(TimeoutError): | ||
| uploader.delete_by_record_id(collection=collection, file_data=file_data) | ||
|
|
||
| def test_auto_reconnect_raises_destination_error(self): | ||
| uploader = _make_uploader() | ||
| file_data = MagicMock() | ||
| file_data.identifier = "test_id" | ||
| collection = MagicMock() | ||
| collection.delete_many.side_effect = AutoReconnect("connection lost") | ||
|
|
||
| with pytest.raises(DestinationConnectionError): | ||
| uploader.delete_by_record_id(collection=collection, file_data=file_data) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catch-all swallows correctly classified errors from delete_by_record_id
High Severity
The
except Exceptioncatch-all inrun_datare-wraps already-classified exceptions fromdelete_by_record_idasDestinationConnectionError. Whendelete_by_record_idraisesQuotaErrororTimeoutError, those propagate intorun_data's try block, skip all the pymongo-specific except handlers, and get caught by the genericexcept Exception— converting them back toDestinationConnectionError. This directly undermines the PR's goal of correct error classification. The catch-all needs to re-raiseUnstructuredIngestErrorsubclasses before falling through to the generic handler.Additional Locations (1)
unstructured_ingest/processes/connectors/mongodb.py#L395-L396