|
| 1 | +"""Final round of drive_service tests covering remaining gaps: |
| 2 | +download_file_by_path, upload_single_item edge cases (empty target path, |
| 3 | +unreadable file, invalid conflict mode), download_file with creation_time |
| 4 | +warning, search recursion edge cases. |
| 5 | +""" |
| 6 | +from unittest.mock import patch |
| 7 | + |
| 8 | +import pytest |
| 9 | + |
| 10 | +from services.drive import drive_service |
| 11 | + |
| 12 | + |
| 13 | +@pytest.fixture(autouse=True) |
| 14 | +def _reset(): |
| 15 | + drive_service.folder_content_cache.clear() |
| 16 | + drive_service._mem_reserved = 0 |
| 17 | + yield |
| 18 | + drive_service.folder_content_cache.clear() |
| 19 | + drive_service._mem_reserved = 0 |
| 20 | + |
| 21 | + |
| 22 | +# ---------- download_file_by_path ---------- |
| 23 | + |
| 24 | +def test_download_file_by_path_resolves_then_calls_download_file(): |
| 25 | + resolved = {'type': 'file', 'uuid': 'fid', 'metadata': {}, |
| 26 | + 'path': '/Documents/x.pdf'} |
| 27 | + with patch.object(drive_service, 'resolve_path', return_value=resolved), \ |
| 28 | + patch.object(drive_service, 'download_file', |
| 29 | + return_value='/tmp/x.pdf') as mock_dl: |
| 30 | + out = drive_service.download_file_by_path('/Documents/x.pdf') |
| 31 | + mock_dl.assert_called_once() |
| 32 | + assert out == '/tmp/x.pdf' |
| 33 | + |
| 34 | + |
| 35 | +def test_download_file_by_path_default_destination_uses_filename(): |
| 36 | + """If no destination specified, use ./<filename>.""" |
| 37 | + resolved = {'type': 'file', 'uuid': 'fid', 'metadata': {}, |
| 38 | + 'path': '/Documents/report.pdf'} |
| 39 | + captured = {} |
| 40 | + def fake_dl(uuid, dest, **kw): |
| 41 | + captured['dest'] = dest |
| 42 | + return dest |
| 43 | + with patch.object(drive_service, 'resolve_path', return_value=resolved), \ |
| 44 | + patch.object(drive_service, 'download_file', side_effect=fake_dl): |
| 45 | + drive_service.download_file_by_path('/Documents/report.pdf') |
| 46 | + assert captured['dest'].endswith('report.pdf') |
| 47 | + |
| 48 | + |
| 49 | +def test_download_file_by_path_rejects_folder(): |
| 50 | + resolved = {'type': 'folder', 'uuid': 'fid', 'metadata': {}, 'path': '/D'} |
| 51 | + with patch.object(drive_service, 'resolve_path', return_value=resolved): |
| 52 | + with pytest.raises(ValueError, match="folder, not a file"): |
| 53 | + drive_service.download_file_by_path('/D') |
| 54 | + |
| 55 | + |
| 56 | +# ---------- upload_single_item edge cases ---------- |
| 57 | + |
| 58 | +def test_upload_single_item_unreadable_file_returns_error(tmp_path): |
| 59 | + """If stat() raises (e.g., permission denied), return 'error' not crash.""" |
| 60 | + f = tmp_path / "broken.txt" |
| 61 | + f.write_bytes(b"x") |
| 62 | + |
| 63 | + with patch('pathlib.Path.is_file', return_value=True), \ |
| 64 | + patch('pathlib.Path.stat', side_effect=PermissionError("denied")): |
| 65 | + result = drive_service.upload_single_item_with_conflict_handling( |
| 66 | + f, '/Docs', 'parent-uuid', on_conflict='skip', |
| 67 | + ) |
| 68 | + assert result == "error" |
| 69 | + |
| 70 | + |
| 71 | +def test_upload_single_item_target_path_normalization(tmp_path): |
| 72 | + """The full target remote path must always start with exactly one '/'.""" |
| 73 | + f = tmp_path / "doc.txt" |
| 74 | + f.write_bytes(b"x") |
| 75 | + |
| 76 | + captured_paths = [] |
| 77 | + def fake_resolve(path): |
| 78 | + captured_paths.append(path) |
| 79 | + raise FileNotFoundError(path) |
| 80 | + |
| 81 | + with patch.object(drive_service, 'resolve_path', side_effect=fake_resolve), \ |
| 82 | + patch.object(drive_service, 'upload_file_to_folder', |
| 83 | + return_value={'uuid': 'x'}): |
| 84 | + # Pass a parent path with NO leading slash → should normalize |
| 85 | + drive_service.upload_single_item_with_conflict_handling( |
| 86 | + f, 'Docs/Sub', 'parent-uuid', on_conflict='skip', |
| 87 | + ) |
| 88 | + |
| 89 | + # The looked-up path must always have exactly one leading slash |
| 90 | + assert all(p.startswith('/') for p in captured_paths) |
| 91 | + assert all(not p.startswith('//') for p in captured_paths) |
| 92 | + |
| 93 | + |
| 94 | +def test_upload_single_item_resolve_unexpected_error_continues(tmp_path): |
| 95 | + """If resolve_path raises something other than FileNotFoundError, |
| 96 | + log warning and proceed with upload (not skip).""" |
| 97 | + f = tmp_path / "doc.txt" |
| 98 | + f.write_bytes(b"x") |
| 99 | + |
| 100 | + with patch.object(drive_service, 'resolve_path', |
| 101 | + side_effect=ConnectionError("flaky API")), \ |
| 102 | + patch.object(drive_service, 'upload_file_to_folder', |
| 103 | + return_value={'uuid': 'x'}) as mock_up: |
| 104 | + result = drive_service.upload_single_item_with_conflict_handling( |
| 105 | + f, '/Docs', 'parent-uuid', on_conflict='skip', |
| 106 | + ) |
| 107 | + # Despite the resolve error, upload still proceeded |
| 108 | + assert result == "uploaded" |
| 109 | + mock_up.assert_called_once() |
| 110 | + |
| 111 | + |
| 112 | +# ---------- download_file metadata-only branches ---------- |
| 113 | + |
| 114 | +def test_download_file_creation_time_warning_branch(tmp_path): |
| 115 | + """If preserve_timestamps=True and only creation_time is present (no |
| 116 | + modification_time), the function still completes — it just warns.""" |
| 117 | + fake_creds = { |
| 118 | + 'user': { |
| 119 | + 'bucket': '00' * 12, |
| 120 | + 'mnemonic': ('abandon abandon abandon abandon abandon abandon ' |
| 121 | + 'abandon abandon abandon abandon abandon about'), |
| 122 | + 'bridgeUser': 'u@example.com', |
| 123 | + 'userId': 'u', |
| 124 | + }, |
| 125 | + } |
| 126 | + payload = b"content for ctime test" |
| 127 | + enc, idx_hex = drive_service.crypto.encrypt_stream_internxt_protocol( |
| 128 | + payload, fake_creds['user']['mnemonic'], |
| 129 | + fake_creds['user']['bucket']) |
| 130 | + |
| 131 | + metadata = { |
| 132 | + 'uuid': 'fid', 'bucket': fake_creds['user']['bucket'], |
| 133 | + 'fileId': 'nid', 'size': len(payload), |
| 134 | + 'plainName': 'doc', 'type': 'txt', |
| 135 | + 'creationTime': '2024-01-15T12:00:00Z', |
| 136 | + # No modificationTime |
| 137 | + } |
| 138 | + |
| 139 | + out_dir = tmp_path / "out" |
| 140 | + out_dir.mkdir() |
| 141 | + |
| 142 | + with patch.object(drive_service.auth, 'get_auth_details', |
| 143 | + return_value=fake_creds), \ |
| 144 | + patch.object(drive_service.api, 'get_file_metadata', |
| 145 | + return_value=metadata), \ |
| 146 | + patch.object(drive_service.api, 'get_download_links', |
| 147 | + return_value={'shards': [{'url': 'u'}], 'index': idx_hex}), \ |
| 148 | + patch.object(drive_service.api, 'download_chunk', return_value=enc): |
| 149 | + out_path = drive_service.download_file('fid', str(out_dir), |
| 150 | + preserve_timestamps=True) |
| 151 | + # File downloaded successfully despite no modification time |
| 152 | + from pathlib import Path |
| 153 | + assert Path(out_path).exists() |
| 154 | + assert Path(out_path).read_bytes() == payload |
| 155 | + |
| 156 | + |
| 157 | +# ---------- find_files at deeper depths ---------- |
| 158 | + |
| 159 | +def test_find_files_max_depth_2_goes_one_level_deeper(): |
| 160 | + """max_depth=2 → search start folder and one level of subfolders.""" |
| 161 | + tree = { |
| 162 | + 'root-uuid': { |
| 163 | + 'folders': [{'uuid': 'a', 'plainName': 'A'}], |
| 164 | + 'files': [{'uuid': 'top', 'plainName': 'top', 'type': 'pdf', 'size': 1}], |
| 165 | + }, |
| 166 | + 'a': { |
| 167 | + 'folders': [{'uuid': 'b', 'plainName': 'B'}], |
| 168 | + 'files': [{'uuid': 'mid', 'plainName': 'mid', 'type': 'pdf', 'size': 1}], |
| 169 | + }, |
| 170 | + 'b': { |
| 171 | + 'folders': [], |
| 172 | + 'files': [{'uuid': 'deep', 'plainName': 'deep', 'type': 'pdf', 'size': 1}], |
| 173 | + }, |
| 174 | + } |
| 175 | + drive_service.folder_content_cache.clear() |
| 176 | + fake_creds = {'user': {'rootFolderId': 'root-uuid'}} |
| 177 | + |
| 178 | + def fake_get_content(uuid): |
| 179 | + return tree.get(uuid, {'folders': [], 'files': []}) |
| 180 | + |
| 181 | + with patch.object(drive_service.auth, 'get_auth_details', |
| 182 | + return_value=fake_creds), \ |
| 183 | + patch.object(drive_service, 'get_folder_content', |
| 184 | + side_effect=fake_get_content): |
| 185 | + results = drive_service.find_files('*.pdf', '/', max_depth=2) |
| 186 | + |
| 187 | + # Top + mid found, deep filtered out |
| 188 | + names = sorted(r['display_name'] for r in results) |
| 189 | + assert names == ['mid.pdf', 'top.pdf'] |
| 190 | + |
| 191 | + |
| 192 | +def test_find_files_handles_listing_error_in_subfolder(): |
| 193 | + """If one subfolder fails to list, others must still be searched.""" |
| 194 | + fake_creds = {'user': {'rootFolderId': 'root-uuid'}} |
| 195 | + |
| 196 | + call_count = {'n': 0} |
| 197 | + def fake_list(path): |
| 198 | + call_count['n'] += 1 |
| 199 | + if path == '/': |
| 200 | + return { |
| 201 | + 'folders': [{'uuid': 'good', 'plainName': 'Good', |
| 202 | + 'path': '/Good', 'display_name': 'Good'}, |
| 203 | + {'uuid': 'bad', 'plainName': 'Bad', |
| 204 | + 'path': '/Bad', 'display_name': 'Bad'}], |
| 205 | + 'files': [], |
| 206 | + } |
| 207 | + if path == '/Bad': |
| 208 | + raise ConnectionError("listing failed") |
| 209 | + # /Good |
| 210 | + return { |
| 211 | + 'folders': [], |
| 212 | + 'files': [{'uuid': 'g', 'plainName': 'g', 'type': 'pdf', 'size': 1, |
| 213 | + 'display_name': 'g.pdf'}], |
| 214 | + } |
| 215 | + |
| 216 | + with patch.object(drive_service.auth, 'get_auth_details', |
| 217 | + return_value=fake_creds), \ |
| 218 | + patch.object(drive_service, 'list_folder_with_paths', |
| 219 | + side_effect=fake_list): |
| 220 | + results = drive_service.find_files('*.pdf', '/') |
| 221 | + |
| 222 | + # Found the file in /Good even though /Bad errored |
| 223 | + assert any(r['display_name'] == 'g.pdf' for r in results) |
0 commit comments