From a2ba091dc2e99bf075380370fbf631cca63f557a Mon Sep 17 00:00:00 2001 From: Devin Slick Date: Thu, 6 Nov 2025 11:39:39 -0600 Subject: [PATCH 1/2] Improve tests and publishing workflow --- .github/workflows/publish.yml | 19 +- fmd_api/_version.py | 2 +- pyproject.toml | 2 +- tests/unit/test_coverage_improvements.py | 299 +++++++++++------------ 4 files changed, 161 insertions(+), 161 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 5e02674..e87b39d 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -1,11 +1,14 @@ name: Publish Python Package # Trigger on: -# - pushes to any branch (we'll publish to TestPyPI for non-main branches and to PyPI for main) +# - pull requests targeting main (publish to TestPyPI for validation before merge) +# - pushes to main (publish to PyPI after merge) # - published GitHub Releases (keep existing behavior for canonical releases) on: + pull_request: + branches: [main] push: - branches: ["**"] + branches: [main] release: types: [published] @@ -41,16 +44,13 @@ jobs: name: python-package-distributions path: dist/ - # Publish from pushes to non-main branches -> TestPyPI + # Publish from pull requests to main -> TestPyPI publish_testpypi: - name: Publish to TestPyPI (branches except main) + name: Publish to TestPyPI (PR to main) runs-on: ubuntu-latest needs: build_sdist_and_wheel - # Only run for pushes (not releases) and only for branches that are NOT main - if: | - github.event_name == 'push' && - startsWith(github.ref, 'refs/heads/') && - github.ref != 'refs/heads/main' + # Only run for pull request events targeting main + if: github.event_name == 'pull_request' && github.base_ref == 'main' environment: testpypi permissions: id-token: write @@ -68,6 +68,7 @@ jobs: with: # repository-url directs upload to TestPyPI repository-url: https://test.pypi.org/legacy/ + skip-existing: true # Publish from pushes to main OR when a Release is published -> Production PyPI publish_pypi: diff --git a/fmd_api/_version.py b/fmd_api/_version.py index 0309ae2..5fa9130 100644 --- a/fmd_api/_version.py +++ b/fmd_api/_version.py @@ -1 +1 @@ -__version__ = "2.0.2" +__version__ = "2.0.3" diff --git a/pyproject.toml b/pyproject.toml index b0f2a8a..5484843 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "fmd_api" -version = "2.0.2" +version = "2.0.3" authors = [{name = "devinslick"}] description = "A Python client for the FMD (Find My Device) server API" readme = "README.md" diff --git a/tests/unit/test_coverage_improvements.py b/tests/unit/test_coverage_improvements.py index 911c265..8f66baf 100644 --- a/tests/unit/test_coverage_improvements.py +++ b/tests/unit/test_coverage_improvements.py @@ -27,7 +27,7 @@ async def test_hash_password_internal(): """Test _hash_password generates correct format.""" client = FmdClient("https://fmd.example.com") result = client._hash_password("testpass", "dGVzdHNhbHQxMjM0NTY3OA") - + assert result.startswith("$argon2id$v=19$m=131072,t=1,p=4$") assert "$" in result parts = result.split("$") @@ -38,20 +38,20 @@ async def test_hash_password_internal(): async def test_load_private_key_from_pem(): """Test _load_private_key_from_bytes with PEM format.""" client = FmdClient("https://fmd.example.com") - + # Generate a test RSA key private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) - + pem_bytes = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) - + loaded_key = client._load_private_key_from_bytes(pem_bytes) assert loaded_key is not None @@ -60,20 +60,20 @@ async def test_load_private_key_from_pem(): async def test_load_private_key_from_der(): """Test _load_private_key_from_bytes with DER format (fallback path).""" client = FmdClient("https://fmd.example.com") - + # Generate a test RSA key private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) - + der_bytes = private_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) - + loaded_key = client._load_private_key_from_bytes(der_bytes) assert loaded_key is not None @@ -88,7 +88,7 @@ async def test_json_parse_error_fallback_to_text(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - + with aioresponses() as m: # Return text that will trigger JSONDecodeError m.put( @@ -96,7 +96,7 @@ async def test_json_parse_error_fallback_to_text(): body='"invalid json', # Missing closing quote content_type="application/json" ) - + try: # Should fall back to text and return the raw string result = await client._make_api_request("PUT", "/api/v1/salt", {"IDT": "test", "Data": ""}) @@ -111,7 +111,7 @@ async def test_json_missing_data_key_fallback(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - + with aioresponses() as m: # Return JSON without 'Data' key m.put( @@ -119,7 +119,7 @@ async def test_json_missing_data_key_fallback(): payload={"error": "something"}, content_type="application/json" ) - + try: # Should catch KeyError and fall back to text result = await client._make_api_request("PUT", "/api/v1/salt", {"IDT": "test", "Data": ""}) @@ -135,15 +135,15 @@ async def test_empty_text_response_warning(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - + with aioresponses() as m: - # Return JSON with Data key but with empty string value + # Return JSON with Data key but with empty string value m.put( "https://fmd.example.com/api/v1/salt", payload={"Data": ""}, content_type="application/json" ) - + try: result = await client._make_api_request("PUT", "/api/v1/salt", {"IDT": "test", "Data": ""}) # Empty response should return empty string @@ -158,14 +158,14 @@ async def test_expect_json_false_path(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - + with aioresponses() as m: m.post( "https://fmd.example.com/api/v1/command", body="Command received", status=200 ) - + try: result = await client._make_api_request( "POST", @@ -187,15 +187,16 @@ async def test_connection_error_retry_with_backoff(monkeypatch): """Test ClientConnectionError triggers retry with backoff.""" client = FmdClient("https://fmd.example.com", max_retries=2, backoff_base=0.1, jitter=False) client.access_token = "token" - + slept = [] + async def fake_sleep(seconds): slept.append(seconds) - + monkeypatch.setattr("asyncio.sleep", fake_sleep) - + await client._ensure_session() - + with aioresponses() as m: # First two attempts: connection error, third: success m.put( @@ -210,7 +211,7 @@ async def fake_sleep(seconds): "https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "0"} ) - + try: result = await client.get_locations() assert result == [] @@ -227,15 +228,16 @@ async def test_connection_error_exhausted_retries(monkeypatch): """Test connection error raises FmdApiException when retries exhausted.""" client = FmdClient("https://fmd.example.com", max_retries=1, backoff_base=0.1, jitter=False) client.access_token = "token" - + slept = [] + async def fake_sleep(seconds): slept.append(seconds) - + monkeypatch.setattr("asyncio.sleep", fake_sleep) - + await client._ensure_session() - + with aioresponses() as m: # All attempts fail for _ in range(3): @@ -243,7 +245,7 @@ async def fake_sleep(seconds): "https://fmd.example.com/api/v1/locationDataSize", exception=aiohttp.ClientConnectionError("Connection failed") ) - + try: with pytest.raises(FmdApiException, match="API request failed"): await client.get_locations() @@ -258,7 +260,7 @@ async def test_connection_error_no_retry_for_unsafe_command(): """Test connection error doesn't retry for unsafe command POST.""" client = FmdClient("https://fmd.example.com", max_retries=3) client.access_token = "token" - + # Set up private key for send_command private_key = rsa.generate_private_key( public_exponent=65537, @@ -266,16 +268,16 @@ async def test_connection_error_no_retry_for_unsafe_command(): backend=default_backend() ) client.private_key = private_key - + await client._ensure_session() - + with aioresponses() as m: # Connection error on command endpoint m.post( "https://fmd.example.com/api/v1/command", exception=aiohttp.ClientConnectionError("Connection failed") ) - + try: with pytest.raises(FmdApiException, match="Failed to send command"): await client.send_command("ring") @@ -292,7 +294,7 @@ async def test_export_zip_png_detection(): """Test PNG magic byte detection in export_data_zip.""" client = FmdClient("https://fmd.example.com") client.access_token = "token" - + # Set up private key private_key = rsa.generate_private_key( public_exponent=65537, @@ -300,19 +302,19 @@ async def test_export_zip_png_detection(): backend=default_backend() ) client.private_key = private_key - + # Create PNG image bytes (PNG magic bytes + minimal data) png_data = b'\x89PNG\r\n\x1a\n' + b'\x00' * 100 png_b64 = base64.b64encode(png_data).decode('utf-8') - + # Double-encode as per FMD picture format session_key = b'\x00' * 32 aesgcm = AESGCM(session_key) iv = b'\x01' * 12 - + # Encrypt the base64 string ciphertext = aesgcm.encrypt(iv, png_b64.encode('utf-8'), None) - + # Build encrypted blob public_key = private_key.public_key() session_key_packet = public_key.encrypt( @@ -323,33 +325,33 @@ async def test_export_zip_png_detection(): label=None ) ) - + blob = session_key_packet + iv + ciphertext blob_b64 = base64.b64encode(blob).decode('utf-8') - + await client._ensure_session() - + with aioresponses() as m: # No locations m.put("https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "0"}) # One PNG picture m.put("https://fmd.example.com/api/v1/pictures", payload={"Data": [blob_b64]}) - + try: import tempfile with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp: output_path = tmp.name - + result = await client.export_data_zip(output_path, include_pictures=True) assert result == output_path - + # Verify ZIP contains PNG import zipfile with zipfile.ZipFile(output_path, 'r') as zf: files = zf.namelist() assert 'pictures/manifest.json' in files assert any('picture_' in f and f.endswith('.png') for f in files) - + import os os.unlink(output_path) finally: @@ -361,7 +363,7 @@ async def test_export_zip_picture_decrypt_error(): """Test export handles picture decryption errors gracefully.""" client = FmdClient("https://fmd.example.com") client.access_token = "token" - + # Set up private key private_key = rsa.generate_private_key( public_exponent=65537, @@ -369,28 +371,28 @@ async def test_export_zip_picture_decrypt_error(): backend=default_backend() ) client.private_key = private_key - + await client._ensure_session() - + with aioresponses() as m: m.put("https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "0"}) # Invalid picture blob (too small) m.put("https://fmd.example.com/api/v1/pictures", payload={"Data": ["invalid"]}) - + try: import tempfile with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp: output_path = tmp.name - - result = await client.export_data_zip(output_path, include_pictures=True) - + + await client.export_data_zip(output_path, include_pictures=True) + # Should complete despite error import zipfile with zipfile.ZipFile(output_path, 'r') as zf: manifest = json.loads(zf.read('pictures/manifest.json')) # Error should be recorded assert 'error' in manifest[0] - + import os os.unlink(output_path) finally: @@ -402,7 +404,7 @@ async def test_export_zip_location_decrypt_error(): """Test export handles location decryption errors gracefully.""" client = FmdClient("https://fmd.example.com") client.access_token = "token" - + # Set up private key private_key = rsa.generate_private_key( public_exponent=65537, @@ -410,30 +412,30 @@ async def test_export_zip_location_decrypt_error(): backend=default_backend() ) client.private_key = private_key - + await client._ensure_session() - + with aioresponses() as m: # One invalid location m.put("https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "1"}) m.put("https://fmd.example.com/api/v1/location", payload={"Data": "tooshort"}) # No pictures m.put("https://fmd.example.com/api/v1/pictures", payload={"Data": []}) - + try: import tempfile with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp: output_path = tmp.name - - result = await client.export_data_zip(output_path, include_pictures=False) - + + await client.export_data_zip(output_path, include_pictures=False) + # Should complete with error recorded import zipfile with zipfile.ZipFile(output_path, 'r') as zf: locations = json.loads(zf.read('locations.json')) assert 'error' in locations[0] assert locations[0]['index'] == 0 - + import os os.unlink(output_path) finally: @@ -449,7 +451,7 @@ async def test_device_download_photo_decode_error(): """Test Device.download_photo handles decode errors (line 137-138).""" client = FmdClient("https://fmd.example.com") client.access_token = "token" - + # Set up private key with 3072-bit key to get 384-byte RSA packet private_key = rsa.generate_private_key( public_exponent=65537, @@ -457,17 +459,17 @@ async def test_device_download_photo_decode_error(): backend=default_backend() ) client.private_key = private_key - + device = Device(client, "test_device") - + # Create an invalid blob (will decrypt but not be valid base64) session_key = b'\x00' * 32 aesgcm = AESGCM(session_key) iv = b'\x01' * 12 - + # Invalid inner data (not valid base64) ciphertext = aesgcm.encrypt(iv, b'not-base64-data!!!', None) - + public_key = private_key.public_key() session_key_packet = public_key.encrypt( session_key, @@ -477,10 +479,10 @@ async def test_device_download_photo_decode_error(): label=None ) ) - + blob = session_key_packet + iv + ciphertext blob_b64 = base64.b64encode(blob).decode('utf-8') - + with pytest.raises(OperationError, match="Failed to decode picture blob"): await device.download_photo(blob_b64) @@ -490,7 +492,7 @@ async def test_device_get_history_decrypt_error(): """Test Device.get_history handles decrypt errors (line 99-101).""" client = FmdClient("https://fmd.example.com") client.access_token = "token" - + # Set up private key private_key = rsa.generate_private_key( public_exponent=65537, @@ -498,15 +500,15 @@ async def test_device_get_history_decrypt_error(): backend=default_backend() ) client.private_key = private_key - + device = Device(client, "test_device") - + await client._ensure_session() - + with aioresponses() as m: m.put("https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "1"}) m.put("https://fmd.example.com/api/v1/location", payload={"Data": "invalid"}) - + try: with pytest.raises(OperationError, match="Failed to decrypt/parse location blob"): async for loc in device.get_history(limit=1): @@ -515,9 +517,6 @@ async def test_device_get_history_decrypt_error(): await client.close() - - - # ========================================== # Test helper functions indirectly through client behavior # ========================================== @@ -527,9 +526,9 @@ async def test_retry_after_header_parsing_indirectly(): """Test Retry-After header parsing through actual 429 response.""" client = FmdClient("https://fmd.example.com", max_retries=2) client.access_token = "token" - + await client._ensure_session() - + with aioresponses() as m: # Test with valid Retry-After number m.put( @@ -541,7 +540,7 @@ async def test_retry_after_header_parsing_indirectly(): "https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "0"} ) - + try: await client.get_locations() # If it succeeds, Retry-After was parsed correctly @@ -558,7 +557,7 @@ async def test_decrypt_blob_with_missing_private_key(): """Test decrypt_data_blob raises when private_key is None.""" client = FmdClient("https://fmd.example.com") # Don't set private_key - + # Use a valid base64 string that's long enough dummy_blob = base64.b64encode(b'\x00' * 400).decode('utf-8') with pytest.raises(FmdApiException, match="Private key not loaded"): @@ -571,7 +570,7 @@ async def test_send_command_with_missing_private_key(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - + try: with pytest.raises(FmdApiException, match="Private key not loaded"): await client.send_command("ring") @@ -585,13 +584,13 @@ async def test_client_error_generic(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - + with aioresponses() as m: m.put( "https://fmd.example.com/api/v1/locationDataSize", exception=aiohttp.ClientError("Generic client error") ) - + try: with pytest.raises(FmdApiException, match="API request failed"): await client.get_locations() @@ -605,14 +604,14 @@ async def test_value_error_in_response_parsing(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - + with aioresponses() as m: # Return JSON that will cause ValueError when parsing int m.put( "https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "not-a-number"} ) - + try: with pytest.raises(Exception): # int() will raise ValueError, caught and re-raised await client.get_locations() @@ -628,18 +627,18 @@ async def test_value_error_in_response_parsing(): async def test_authenticate_full_flow(): """Test complete authenticate flow including internal methods (lines 163-211).""" client = FmdClient("https://fmd.example.com") - + with aioresponses() as m: # Mock salt retrieval m.put("https://fmd.example.com/api/v1/salt", payload={"Data": base64.b64encode(b'\x00' * 16).decode()}) # Mock token request m.put("https://fmd.example.com/api/v1/requestAccess", payload={"Data": "test_token"}) - # Mock private key retrieval + # Mock private key retrieval # Create a simple encrypted key blob password = "testpass" salt = b'\x00' * 16 iv = b'\x01' * 12 - + # Create a dummy private key private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) privkey_pem = private_key.private_bytes( @@ -647,7 +646,7 @@ async def test_authenticate_full_flow(): format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) - + # Encrypt the private key password_bytes = (CONTEXT_STRING_ASYM_KEY_WRAP + password).encode("utf-8") aes_key = hash_secret_raw( @@ -656,9 +655,9 @@ async def test_authenticate_full_flow(): aesgcm = AESGCM(aes_key) ciphertext = aesgcm.encrypt(iv, privkey_pem, None) encrypted_blob = salt + iv + ciphertext - + m.put("https://fmd.example.com/api/v1/key", payload={"Data": base64.b64encode(encrypted_blob).decode()}) - + try: await client.authenticate("testid", password, 3600) assert client.access_token == "test_token" @@ -674,7 +673,7 @@ async def test_429_with_retry_after_header(): client.access_token = "token" client.max_retries = 1 await client._ensure_session() - + with aioresponses() as m: # First request returns 429 with Retry-After m.get( @@ -687,7 +686,7 @@ async def test_429_with_retry_after_header(): "https://fmd.example.com/api/v1/test", payload={"Data": "success"} ) - + try: result = await client._make_api_request("GET", "/api/v1/test", {"IDT": "test", "Data": ""}) assert result == "success" @@ -702,7 +701,7 @@ async def test_500_error_retry(): client.access_token = "token" client.max_retries = 1 await client._ensure_session() - + with aioresponses() as m: # First request returns 500 m.get( @@ -714,7 +713,7 @@ async def test_500_error_retry(): "https://fmd.example.com/api/v1/test", payload={"Data": "success"} ) - + try: result = await client._make_api_request("GET", "/api/v1/test", {"IDT": "test", "Data": ""}) assert result == "success" @@ -729,7 +728,7 @@ async def test_negative_retry_after_header(): client.access_token = "token" client.max_retries = 1 await client._ensure_session() - + with aioresponses() as m: # First request returns 429 with invalid negative Retry-After m.get( @@ -742,7 +741,7 @@ async def test_negative_retry_after_header(): "https://fmd.example.com/api/v1/test", payload={"Data": "success"} ) - + try: result = await client._make_api_request("GET", "/api/v1/test", {"IDT": "test", "Data": ""}) assert result == "success" @@ -757,7 +756,7 @@ async def test_http_date_retry_after(): client.access_token = "token" client.max_retries = 1 await client._ensure_session() - + with aioresponses() as m: # First request returns 429 with HTTP-date Retry-After m.get( @@ -770,7 +769,7 @@ async def test_http_date_retry_after(): "https://fmd.example.com/api/v1/test", payload={"Data": "success"} ) - + try: result = await client._make_api_request("GET", "/api/v1/test", {"IDT": "test", "Data": ""}) assert result == "success" @@ -785,7 +784,7 @@ async def test_backoff_without_jitter(): client.access_token = "token" client.max_retries = 2 await client._ensure_session() - + with aioresponses() as m: # First request returns 500 m.get("https://fmd.example.com/api/v1/test", status=500) @@ -793,7 +792,7 @@ async def test_backoff_without_jitter(): m.get("https://fmd.example.com/api/v1/test", status=500) # Third request succeeds m.get("https://fmd.example.com/api/v1/test", payload={"Data": "success"}) - + try: result = await client._make_api_request("GET", "/api/v1/test", {"IDT": "test", "Data": ""}) assert result == "success" @@ -805,7 +804,7 @@ async def test_backoff_without_jitter(): async def test_device_internal_parse_location_error(): """Test that _parse_location_blob raises RuntimeError (device.py line 23).""" from fmd_api.device import _parse_location_blob - + with pytest.raises(RuntimeError, match="should not be called directly"): _parse_location_blob("dummy_blob") @@ -816,11 +815,11 @@ async def test_get_pictures_with_specific_count(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - + # Set up private key for decryption private_key = rsa.generate_private_key(public_exponent=65537, key_size=3072, backend=default_backend()) client.private_key = private_key - + with aioresponses() as m: # Mock response with 10 pictures pictures_list = [f"picture{i}" for i in range(10)] @@ -832,7 +831,7 @@ async def test_get_pictures_with_specific_count(): "https://fmd.example.com/api/v1/pictures", payload={"Data": pictures_list} ) - + try: # Request only 3 pictures result = await client.get_pictures(num_to_get=3) @@ -848,12 +847,12 @@ async def test_exhausted_retries_on_500(): client.access_token = "token" client.max_retries = 2 await client._ensure_session() - + with aioresponses() as m: # All requests return 500 for _ in range(5): m.get("https://fmd.example.com/api/v1/test", status=500) - + try: with pytest.raises(FmdApiException, match="API request failed"): await client._make_api_request("GET", "/api/v1/test", {"IDT": "test", "Data": ""}) @@ -865,7 +864,7 @@ async def test_exhausted_retries_on_500(): async def test_compute_backoff_with_jitter(): """Test _compute_backoff with jitter enabled (line 715-718).""" from fmd_api.client import _compute_backoff - + # With jitter, result should be between 0 and calculated delay for attempt in range(3): delay = _compute_backoff(1.0, attempt, 10.0, True) @@ -880,13 +879,13 @@ async def test_429_exhausted_retries(): client.access_token = "token" client.max_retries = 0 # No retries await client._ensure_session() - + with aioresponses() as m: m.get( "https://fmd.example.com/api/v1/test", status=429 ) - + try: with pytest.raises(FmdApiException, match="Rate limited.*retries exhausted"): await client._make_api_request("GET", "/api/v1/test", {"IDT": "test", "Data": ""}) @@ -900,18 +899,18 @@ async def test_streaming_response(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - + with aioresponses() as m: m.get( "https://fmd.example.com/api/v1/test", body="streaming content", content_type="text/plain" ) - + try: result = await client._make_api_request( - "GET", "/api/v1/test", - {"IDT": "test", "Data": ""}, + "GET", "/api/v1/test", + {"IDT": "test", "Data": ""}, stream=True # Request streaming response ) # Should return the response object itself @@ -926,11 +925,11 @@ async def test_get_pictures_all_count(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - - # Set up private key + + # Set up private key private_key = rsa.generate_private_key(public_exponent=65537, key_size=3072, backend=default_backend()) client.private_key = private_key - + with aioresponses() as m: # Mock response with 5 pictures pictures_list = [f"picture{i}" for i in range(5)] @@ -942,7 +941,7 @@ async def test_get_pictures_all_count(): "https://fmd.example.com/api/v1/pictures", payload={"Data": pictures_list} ) - + try: # Request all pictures (num_to_get=-1) result = await client.get_pictures(num_to_get=-1) @@ -962,12 +961,12 @@ async def test_500_error_exhausted_retries_raises(): client.access_token = "token" client.max_retries = 1 await client._ensure_session() - + with aioresponses() as m: # All requests return 500 for _ in range(3): m.post("https://fmd.example.com/api/v1/test", status=500) - + try: with pytest.raises(FmdApiException, match="API request failed"): await client._make_api_request("POST", "/api/v1/test", {"IDT": "test", "Data": ""}) @@ -981,18 +980,18 @@ async def test_expect_json_false_returns_text(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - + with aioresponses() as m: m.post( "https://fmd.example.com/api/v1/command", body="Command executed", content_type="text/plain" ) - + try: result = await client._make_api_request( - "POST", "/api/v1/command", - {"IDT": "test", "Data": ""}, + "POST", "/api/v1/command", + {"IDT": "test", "Data": ""}, expect_json=False ) assert result == "Command executed" @@ -1006,7 +1005,7 @@ async def test_response_parsing_key_error(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - + with aioresponses() as m: # Return invalid response that will cause parsing error outside the JSON block m.put( @@ -1014,7 +1013,7 @@ async def test_response_parsing_key_error(): payload={"Data": {"nested": "value"}}, # Valid JSON but might cause issues downstream content_type="application/json" ) - + try: # This should work without errors result = await client._make_api_request("PUT", "/api/v1/test", {"IDT": "test", "Data": ""}) @@ -1029,11 +1028,11 @@ async def test_get_pictures_returns_list_when_all(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - - # Set up private key + + # Set up private key private_key = rsa.generate_private_key(public_exponent=65537, key_size=3072, backend=default_backend()) client.private_key = private_key - + with aioresponses() as m: # Mock response with 10 pictures pictures_list = [f"picture{i}" for i in range(10)] @@ -1045,7 +1044,7 @@ async def test_get_pictures_returns_list_when_all(): "https://fmd.example.com/api/v1/pictures", payload={"Data": pictures_list} ) - + try: # Request all pictures explicitly result = await client.get_pictures(num_to_get=-1) @@ -1061,23 +1060,23 @@ async def test_export_zip_default_jpg_extension(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - + # Set up private key for decryption private_key = rsa.generate_private_key(public_exponent=65537, key_size=3072, backend=default_backend()) client.private_key = private_key - + # Create an encrypted blob with unknown image format (not PNG or JPEG) session_key = b'\x00' * 32 aesgcm = AESGCM(session_key) iv = b'\x01' * 12 - + # Create image data that doesn't match PNG or JPEG magic bytes unknown_image = b'\x00\x00\x00\x00UNKNOWN' + b'\x00' * 20 image_b64 = base64.b64encode(unknown_image).decode('utf-8') - + # Encrypt it ciphertext = aesgcm.encrypt(iv, image_b64.encode('utf-8'), None) - + public_key = private_key.public_key() session_key_packet = public_key.encrypt( session_key, @@ -1087,29 +1086,29 @@ async def test_export_zip_default_jpg_extension(): label=None ) ) - + blob = session_key_packet + iv + ciphertext blob_b64 = base64.b64encode(blob).decode('utf-8') - + with aioresponses() as m: m.put("https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "0"}) m.put("https://fmd.example.com/api/v1/pictureDataSize", payload={"Data": "1"}) m.put("https://fmd.example.com/api/v1/pictures", payload={"Data": [blob_b64]}) - + try: import tempfile import zipfile with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp: output_path = tmp.name - + await client.export_data_zip(output_path, include_pictures=True) - + # Verify the file was created and contains jpg file with zipfile.ZipFile(output_path, 'r') as zf: names = zf.namelist() # Should have defaulted to .jpg extension assert any('.jpg' in name for name in names) - + # Cleanup import os if os.path.exists(output_path): @@ -1122,7 +1121,7 @@ async def test_export_zip_default_jpg_extension(): async def test_mask_token_with_none(): """Test _mask_token with None input (line 685-686).""" from fmd_api.client import _mask_token - + result = _mask_token(None) assert result == "" @@ -1131,7 +1130,7 @@ async def test_mask_token_with_none(): async def test_mask_token_with_short(): """Test _mask_token with short token (line 687-688).""" from fmd_api.client import _mask_token - + result = _mask_token("ab", show_chars=5) assert result == "***" @@ -1140,7 +1139,7 @@ async def test_mask_token_with_short(): async def test_mask_token_with_long(): """Test _mask_token with long token (line 689).""" from fmd_api.client import _mask_token - + result = _mask_token("verylongtoken123456", show_chars=4) assert result.startswith("very") assert result.endswith("...***") @@ -1150,11 +1149,11 @@ async def test_mask_token_with_long(): async def test_parse_retry_after_with_invalid(): """Test _parse_retry_after with invalid input (line 703).""" from fmd_api.client import _parse_retry_after - + # None input result = _parse_retry_after(None) assert result is None - + # Invalid string result = _parse_retry_after("invalid") assert result is None @@ -1164,13 +1163,13 @@ async def test_parse_retry_after_with_invalid(): async def test_compute_backoff_with_jitter_randomness(): """Test _compute_backoff with jitter produces values in range (line 717-718).""" from fmd_api.client import _compute_backoff - + # With jitter=True, should return random value between 0 and delay delays = [_compute_backoff(1.0, 0, 10.0, True) for _ in range(10)] - + # All should be >= 0 and <= 1.0 (base * 2^0) assert all(0 <= d <= 1.0 for d in delays) - + # With enough samples, should have some variation (not all the same) # (This might fail in rare cases but is statistically very unlikely) assert len(set(delays)) > 1 or delays[0] == 0 # Allow all zeros as edge case @@ -1183,12 +1182,12 @@ async def test_502_error_with_exhausted_retries(): client.access_token = "token" client.max_retries = 1 await client._ensure_session() - + with aioresponses() as m: # All requests return 502 for _ in range(5): m.get("https://fmd.example.com/api/v1/test", status=502) - + try: with pytest.raises(FmdApiException, match="API request failed"): await client._make_api_request("GET", "/api/v1/test", {"IDT": "test", "Data": ""}) @@ -1202,14 +1201,14 @@ async def test_non_json_response_with_expect_json_false(): client = FmdClient("https://fmd.example.com") client.access_token = "token" await client._ensure_session() - + with aioresponses() as m: m.put( "https://fmd.example.com/api/v1/test", body="plain text response", content_type="text/plain" ) - + try: result = await client._make_api_request( "PUT", "/api/v1/test", From 76b4ee085ce5f9876be75ec206c192b2d568be68 Mon Sep 17 00:00:00 2001 From: Devin Slick Date: Thu, 6 Nov 2025 12:03:56 -0600 Subject: [PATCH 2/2] ci: add pre-commit hooks for local linting and refactor publish workflow to PR-based TestPyPI --- .github/workflows/publish.yml | 2 +- .gitignore | 2 +- .pre-commit-config.yaml | 48 ++++ README.md | 4 +- docs/HOME_ASSISTANT_REVIEW.md | 12 +- docs/MIGRATE_FROM_V1.md | 44 +-- docs/PROPOSAL.md | 6 +- docs/PROPOSED_BRANCH_AND_STRUCTURE.md | 2 +- fmd_api/client.py | 14 +- pyproject.toml | 3 +- tests/unit/test_coverage_improvements.py | 337 ++++++++--------------- 11 files changed, 197 insertions(+), 277 deletions(-) create mode 100644 .pre-commit-config.yaml diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index e87b39d..d4478cf 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -94,4 +94,4 @@ jobs: path: dist/ - name: Publish package distributions to PyPI - uses: pypa/gh-action-pypi-publish@release/v1 \ No newline at end of file + uses: pypa/gh-action-pypi-publish@release/v1 diff --git a/.gitignore b/.gitignore index 934a81c..74f525b 100644 --- a/.gitignore +++ b/.gitignore @@ -48,4 +48,4 @@ fmd-server/ fmd-android/ #credentials file -examples/tests/credentials.txt \ No newline at end of file +examples/tests/credentials.txt diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..ca8abac --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,48 @@ +# Pre-commit hooks configuration +# See https://pre-commit.com for more information +repos: + # General file cleanup + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v5.0.0 + hooks: + - id: trailing-whitespace + exclude: '^tests/functional/' + - id: end-of-file-fixer + exclude: '^tests/functional/' + - id: check-yaml + - id: check-toml + - id: check-json + - id: check-added-large-files + args: ['--maxkb=1000'] + - id: check-merge-conflict + - id: debug-statements + - id: mixed-line-ending + args: ['--fix=lf'] + + # Python code formatting with black + - repo: https://github.com/psf/black + rev: 24.10.0 + hooks: + - id: black + language_version: python3 + + # Python linting with flake8 + - repo: https://github.com/pycqa/flake8 + rev: 7.1.1 + hooks: + - id: flake8 + args: ['--count', '--show-source', '--statistics'] + exclude: '^tests/functional/' + + # Python type checking with mypy + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.13.0 + hooks: + - id: mypy + additional_dependencies: + - types-aiofiles + - aiohttp + - argon2-cffi + - cryptography + args: ['--install-types', '--non-interactive'] + exclude: '^tests/' diff --git a/README.md b/README.md index d06d5d5..5900342 100644 --- a/README.md +++ b/README.md @@ -114,7 +114,7 @@ Tips: - `set_ringer_mode("normal|vibrate|silent")` - `get_device_stats()` - + - Low‑level: `decrypt_data_blob(b64_blob)` - `Device` helper (per‑device convenience) @@ -171,4 +171,4 @@ This client targets the FMD ecosystem: - https://gitlab.com/fmd-foss - Public community instance: https://fmd.nulide.de/ -MIT © 2025 Devin Slick \ No newline at end of file +MIT © 2025 Devin Slick diff --git a/docs/HOME_ASSISTANT_REVIEW.md b/docs/HOME_ASSISTANT_REVIEW.md index 8e9cebc..20a6fc9 100644 --- a/docs/HOME_ASSISTANT_REVIEW.md +++ b/docs/HOME_ASSISTANT_REVIEW.md @@ -79,7 +79,7 @@ async def _make_api_request(self, ..., timeout: int = 30): - `pyproject.toml`: `2.0.0.dev8` (PEP 440 compliant) - `_version.py`: `2.0.0-dev8` (uses hyphen instead of dot) -**Location:** +**Location:** - `pyproject.toml` line 3 - `fmd_api/_version.py` line 1 @@ -143,7 +143,7 @@ if resp.status == 429: class FmdClient: async def __aenter__(self): return self - + async def __aexit__(self, exc_type, exc, tb): await self.close() ``` @@ -191,9 +191,9 @@ async with await FmdClient.create(...) as client: - Line ~88: Logs may include auth details - Line ~203: Logs full JSON responses which may contain tokens -**Fix:** +**Fix:** - Sanitize all log output -- Mask tokens: `log.debug(f"Token: {token[:8]}...")` +- Mask tokens: `log.debug(f"Token: {token[:8]}...")` - Add guards: `if log.isEnabledFor(logging.DEBUG):` **HA Rationale:** Security and privacy requirement for production systems. @@ -364,7 +364,7 @@ async def decrypt_data_blob_async(self, data_b64: str) -> bytes: async def get_locations(...) -> List[str]: """ ... - + Raises: AuthenticationError: If authentication fails FmdApiException: If server returns error @@ -384,7 +384,7 @@ async def get_locations(...) -> List[str]: **Location:** Test configuration -**Fix:** +**Fix:** - Add `pytest-cov` to dev dependencies - Configure coverage in `pyproject.toml` - Add coverage reporting to CI workflow diff --git a/docs/MIGRATE_FROM_V1.md b/docs/MIGRATE_FROM_V1.md index f880a17..cbe97da 100644 --- a/docs/MIGRATE_FROM_V1.md +++ b/docs/MIGRATE_FROM_V1.md @@ -175,16 +175,16 @@ from fmd_api import FmdApi async def main(): api = await FmdApi.create("https://fmd.example.com", "alice", "secret") - + # Request new location await api.request_location('gps') await asyncio.sleep(30) - + # Get locations blobs = await api.get_all_locations(1) location_json = api.decrypt_data_blob(blobs[0]) location = json.loads(location_json) - + print(f"Lat: {location['lat']}, Lon: {location['lon']}") await api.close() @@ -199,16 +199,16 @@ from fmd_api import FmdClient async def main(): client = await FmdClient.create("https://fmd.example.com", "alice", "secret") - + # Request new location await client.request_location('gps') await asyncio.sleep(30) - + # Get locations blobs = await client.get_locations(1) location_json = client.decrypt_data_blob(blobs[0]) location = json.loads(location_json) - + print(f"Lat: {location['lat']}, Lon: {location['lon']}") await client.close() @@ -223,14 +223,14 @@ from fmd_api import FmdClient, Device async def main(): client = await FmdClient.create("https://fmd.example.com", "alice", "secret") device = Device(client, "alice") - + # Request and get location (simplified) await client.request_location('gps') await asyncio.sleep(30) - + location = await device.get_location(force=True) print(f"Lat: {location.lat}, Lon: {location.lon}") - + await client.close() asyncio.run(main()) @@ -244,16 +244,16 @@ from fmd_api import FmdApi, FmdCommands async def control_device(): api = await FmdApi.create("https://fmd.example.com", "alice", "secret") - + # Using constants await api.send_command(FmdCommands.RING) await api.send_command(FmdCommands.BLUETOOTH_ON) - + # Using convenience methods await api.toggle_bluetooth(True) await api.toggle_do_not_disturb(True) await api.set_ringer_mode('vibrate') - + await api.close() ``` @@ -263,16 +263,16 @@ from fmd_api import FmdClient async def control_device(): client = await FmdClient.create("https://fmd.example.com", "alice", "secret") - + # Use strings directly (constants removed) await client.send_command('ring') await client.send_command('bluetooth on') - + # Using convenience methods (renamed from toggle_* to set_*) await client.set_bluetooth(True) await client.set_do_not_disturb(True) await client.set_ringer_mode('vibrate') - + await client.close() ``` @@ -283,15 +283,15 @@ from fmd_api import FmdClient, Device async def control_device(): client = await FmdClient.create("https://fmd.example.com", "alice", "secret") device = Device(client, "alice") - + # Use device methods for cleaner API await device.play_sound() - + # Settings still use client await client.set_bluetooth(True) await client.set_do_not_disturb(True) await client.set_ringer_mode('vibrate') - + await client.close() ``` @@ -304,13 +304,13 @@ from fmd_api import FmdApi async def get_history(): api = await FmdApi.create("https://fmd.example.com", "alice", "secret") - + blobs = await api.get_all_locations(10) for blob in blobs: location_json = api.decrypt_data_blob(blob) location = json.loads(location_json) print(f"Date: {location['date']}, Lat: {location['lat']}, Lon: {location['lon']}") - + await api.close() ``` @@ -321,11 +321,11 @@ from fmd_api import FmdClient, Device async def get_history(): client = await FmdClient.create("https://fmd.example.com", "alice", "secret") device = Device(client, "alice") - + # Async iterator with automatic decryption async for location in device.get_history(limit=10): print(f"Date: {location.date}, Lat: {location.lat}, Lon: {location.lon}") - + await client.close() ``` diff --git a/docs/PROPOSAL.md b/docs/PROPOSAL.md index d97904b..c75fe36 100644 --- a/docs/PROPOSAL.md +++ b/docs/PROPOSAL.md @@ -1,7 +1,7 @@ # Proposal: fmd_api v2 — Device-centric async interface -Status: Draft -Author: devinslick (proposal by Copilot Space) +Status: Draft +Author: devinslick (proposal by Copilot Space) Date: 2025-11-01 ## Goals @@ -286,4 +286,4 @@ This proposal updates the earlier draft to: - Include explicit methods for taking front and rear photos. - Drop an in-code legacy compatibility layer and instead provide a small migration README. -If you approve, I will create a branch and a PR that implements the core FmdClient and Device class with the initial methods (authenticate, get_devices, get_device, Device.refresh, Device.get_location, Device.play_sound, Device.take_front_photo, Device.take_rear_photo), plus tests and example usage. \ No newline at end of file +If you approve, I will create a branch and a PR that implements the core FmdClient and Device class with the initial methods (authenticate, get_devices, get_device, Device.refresh, Device.get_location, Device.play_sound, Device.take_front_photo, Device.take_rear_photo), plus tests and example usage. diff --git a/docs/PROPOSED_BRANCH_AND_STRUCTURE.md b/docs/PROPOSED_BRANCH_AND_STRUCTURE.md index fb06b53..75624f6 100644 --- a/docs/PROPOSED_BRANCH_AND_STRUCTURE.md +++ b/docs/PROPOSED_BRANCH_AND_STRUCTURE.md @@ -42,4 +42,4 @@ Next steps after branch creation: 5. Iterate on rate-limiter/cache and add streaming helpers for export_data_zip. If you'd like, I can now generate the initial skeleton files for this branch (client.py, device.py, types.py, exceptions.py, helpers.py, docs/MIGRATE_FROM_V1.md, examples/async_example.py, PROPOSAL.md). Which files would you like me to create first? -``` \ No newline at end of file +``` diff --git a/fmd_api/client.py b/fmd_api/client.py index 9829b73..1a0b9d8 100644 --- a/fmd_api/client.py +++ b/fmd_api/client.py @@ -317,13 +317,9 @@ async def _make_api_request( continue # Transient server errors -> retry (except for unsafe command POSTs) - if resp.status in (500, 502, 503, 504) and not ( - is_command and method.upper() == "POST" - ): + if resp.status in (500, 502, 503, 504) and not (is_command and method.upper() == "POST"): if attempts_left > 0: - delay = _compute_backoff( - self.backoff_base, backoff_attempt, self.backoff_max, self.jitter - ) + delay = _compute_backoff(self.backoff_base, backoff_attempt, self.backoff_max, self.jitter) log.warning( f"Server error {resp.status}. " f"Retrying in {delay:.2f}s ({attempts_left} retries left)..." @@ -350,11 +346,7 @@ async def _make_api_request( # Sanitize: don't log full JSON which may contain tokens/sensitive data if log.isEnabledFor(logging.DEBUG): # Log safe metadata only - keys = ( - list(json_data.keys()) - if isinstance(json_data, dict) - else "non-dict" - ) + keys = list(json_data.keys()) if isinstance(json_data, dict) else "non-dict" log.debug(f"{endpoint} JSON response received with keys: {keys}") return json_data["Data"] except (KeyError, ValueError, json.JSONDecodeError) as e: diff --git a/pyproject.toml b/pyproject.toml index 5484843..26d6760 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ dev = [ "black", "flake8", "mypy", + "pre-commit", ] # --- IMPORTANT CHANGE --- @@ -77,4 +78,4 @@ exclude_lines = [ "raise NotImplementedError", "if __name__ == .__main__.:", "if TYPE_CHECKING:", -] \ No newline at end of file +] diff --git a/tests/unit/test_coverage_improvements.py b/tests/unit/test_coverage_improvements.py index 8f66baf..960803f 100644 --- a/tests/unit/test_coverage_improvements.py +++ b/tests/unit/test_coverage_improvements.py @@ -2,6 +2,7 @@ Additional tests to improve code coverage to 95%+ Focuses on uncovered branches and edge cases in client.py and device.py """ + import json import base64 import pytest @@ -22,6 +23,7 @@ # Test authentication helper methods # ========================================== + @pytest.mark.asyncio async def test_hash_password_internal(): """Test _hash_password generates correct format.""" @@ -40,16 +42,12 @@ async def test_load_private_key_from_pem(): client = FmdClient("https://fmd.example.com") # Generate a test RSA key - private_key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - backend=default_backend() - ) + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) pem_bytes = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() + encryption_algorithm=serialization.NoEncryption(), ) loaded_key = client._load_private_key_from_bytes(pem_bytes) @@ -62,16 +60,12 @@ async def test_load_private_key_from_der(): client = FmdClient("https://fmd.example.com") # Generate a test RSA key - private_key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - backend=default_backend() - ) + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) der_bytes = private_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() + encryption_algorithm=serialization.NoEncryption(), ) loaded_key = client._load_private_key_from_bytes(der_bytes) @@ -82,6 +76,7 @@ async def test_load_private_key_from_der(): # Test JSON parsing fallback paths # ========================================== + @pytest.mark.asyncio async def test_json_parse_error_fallback_to_text(): """Test that JSONDecodeError triggers fallback to text response.""" @@ -94,7 +89,7 @@ async def test_json_parse_error_fallback_to_text(): m.put( "https://fmd.example.com/api/v1/salt", body='"invalid json', # Missing closing quote - content_type="application/json" + content_type="application/json", ) try: @@ -114,11 +109,7 @@ async def test_json_missing_data_key_fallback(): with aioresponses() as m: # Return JSON without 'Data' key - m.put( - "https://fmd.example.com/api/v1/salt", - payload={"error": "something"}, - content_type="application/json" - ) + m.put("https://fmd.example.com/api/v1/salt", payload={"error": "something"}, content_type="application/json") try: # Should catch KeyError and fall back to text @@ -138,11 +129,7 @@ async def test_empty_text_response_warning(): with aioresponses() as m: # Return JSON with Data key but with empty string value - m.put( - "https://fmd.example.com/api/v1/salt", - payload={"Data": ""}, - content_type="application/json" - ) + m.put("https://fmd.example.com/api/v1/salt", payload={"Data": ""}, content_type="application/json") try: result = await client._make_api_request("PUT", "/api/v1/salt", {"IDT": "test", "Data": ""}) @@ -160,18 +147,11 @@ async def test_expect_json_false_path(): await client._ensure_session() with aioresponses() as m: - m.post( - "https://fmd.example.com/api/v1/command", - body="Command received", - status=200 - ) + m.post("https://fmd.example.com/api/v1/command", body="Command received", status=200) try: result = await client._make_api_request( - "POST", - "/api/v1/command", - {"IDT": "token", "Data": "test"}, - expect_json=False + "POST", "/api/v1/command", {"IDT": "token", "Data": "test"}, expect_json=False ) assert result == "Command received" finally: @@ -182,6 +162,7 @@ async def test_expect_json_false_path(): # Test connection error retry logic # ========================================== + @pytest.mark.asyncio async def test_connection_error_retry_with_backoff(monkeypatch): """Test ClientConnectionError triggers retry with backoff.""" @@ -201,16 +182,13 @@ async def fake_sleep(seconds): # First two attempts: connection error, third: success m.put( "https://fmd.example.com/api/v1/locationDataSize", - exception=aiohttp.ClientConnectionError("Connection failed") + exception=aiohttp.ClientConnectionError("Connection failed"), ) m.put( "https://fmd.example.com/api/v1/locationDataSize", - exception=aiohttp.ClientConnectionError("Connection failed") - ) - m.put( - "https://fmd.example.com/api/v1/locationDataSize", - payload={"Data": "0"} + exception=aiohttp.ClientConnectionError("Connection failed"), ) + m.put("https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "0"}) try: result = await client.get_locations() @@ -243,7 +221,7 @@ async def fake_sleep(seconds): for _ in range(3): m.put( "https://fmd.example.com/api/v1/locationDataSize", - exception=aiohttp.ClientConnectionError("Connection failed") + exception=aiohttp.ClientConnectionError("Connection failed"), ) try: @@ -262,21 +240,14 @@ async def test_connection_error_no_retry_for_unsafe_command(): client.access_token = "token" # Set up private key for send_command - private_key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - backend=default_backend() - ) + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) client.private_key = private_key await client._ensure_session() with aioresponses() as m: # Connection error on command endpoint - m.post( - "https://fmd.example.com/api/v1/command", - exception=aiohttp.ClientConnectionError("Connection failed") - ) + m.post("https://fmd.example.com/api/v1/command", exception=aiohttp.ClientConnectionError("Connection failed")) try: with pytest.raises(FmdApiException, match="Failed to send command"): @@ -289,6 +260,7 @@ async def test_connection_error_no_retry_for_unsafe_command(): # Test export_data_zip edge cases # ========================================== + @pytest.mark.asyncio async def test_export_zip_png_detection(): """Test PNG magic byte detection in export_data_zip.""" @@ -296,38 +268,30 @@ async def test_export_zip_png_detection(): client.access_token = "token" # Set up private key - private_key = rsa.generate_private_key( - public_exponent=65537, - key_size=3072, - backend=default_backend() - ) + private_key = rsa.generate_private_key(public_exponent=65537, key_size=3072, backend=default_backend()) client.private_key = private_key # Create PNG image bytes (PNG magic bytes + minimal data) - png_data = b'\x89PNG\r\n\x1a\n' + b'\x00' * 100 - png_b64 = base64.b64encode(png_data).decode('utf-8') + png_data = b"\x89PNG\r\n\x1a\n" + b"\x00" * 100 + png_b64 = base64.b64encode(png_data).decode("utf-8") # Double-encode as per FMD picture format - session_key = b'\x00' * 32 + session_key = b"\x00" * 32 aesgcm = AESGCM(session_key) - iv = b'\x01' * 12 + iv = b"\x01" * 12 # Encrypt the base64 string - ciphertext = aesgcm.encrypt(iv, png_b64.encode('utf-8'), None) + ciphertext = aesgcm.encrypt(iv, png_b64.encode("utf-8"), None) # Build encrypted blob public_key = private_key.public_key() session_key_packet = public_key.encrypt( session_key, - asym_padding.OAEP( - mgf=asym_padding.MGF1(algorithm=hashes.SHA256()), - algorithm=hashes.SHA256(), - label=None - ) + asym_padding.OAEP(mgf=asym_padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None), ) blob = session_key_packet + iv + ciphertext - blob_b64 = base64.b64encode(blob).decode('utf-8') + blob_b64 = base64.b64encode(blob).decode("utf-8") await client._ensure_session() @@ -339,7 +303,8 @@ async def test_export_zip_png_detection(): try: import tempfile - with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp: + + with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp: output_path = tmp.name result = await client.export_data_zip(output_path, include_pictures=True) @@ -347,12 +312,14 @@ async def test_export_zip_png_detection(): # Verify ZIP contains PNG import zipfile - with zipfile.ZipFile(output_path, 'r') as zf: + + with zipfile.ZipFile(output_path, "r") as zf: files = zf.namelist() - assert 'pictures/manifest.json' in files - assert any('picture_' in f and f.endswith('.png') for f in files) + assert "pictures/manifest.json" in files + assert any("picture_" in f and f.endswith(".png") for f in files) import os + os.unlink(output_path) finally: await client.close() @@ -365,11 +332,7 @@ async def test_export_zip_picture_decrypt_error(): client.access_token = "token" # Set up private key - private_key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - backend=default_backend() - ) + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) client.private_key = private_key await client._ensure_session() @@ -381,19 +344,22 @@ async def test_export_zip_picture_decrypt_error(): try: import tempfile - with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp: + + with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp: output_path = tmp.name await client.export_data_zip(output_path, include_pictures=True) # Should complete despite error import zipfile - with zipfile.ZipFile(output_path, 'r') as zf: - manifest = json.loads(zf.read('pictures/manifest.json')) + + with zipfile.ZipFile(output_path, "r") as zf: + manifest = json.loads(zf.read("pictures/manifest.json")) # Error should be recorded - assert 'error' in manifest[0] + assert "error" in manifest[0] import os + os.unlink(output_path) finally: await client.close() @@ -406,11 +372,7 @@ async def test_export_zip_location_decrypt_error(): client.access_token = "token" # Set up private key - private_key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - backend=default_backend() - ) + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) client.private_key = private_key await client._ensure_session() @@ -424,19 +386,22 @@ async def test_export_zip_location_decrypt_error(): try: import tempfile - with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp: + + with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp: output_path = tmp.name await client.export_data_zip(output_path, include_pictures=False) # Should complete with error recorded import zipfile - with zipfile.ZipFile(output_path, 'r') as zf: - locations = json.loads(zf.read('locations.json')) - assert 'error' in locations[0] - assert locations[0]['index'] == 0 + + with zipfile.ZipFile(output_path, "r") as zf: + locations = json.loads(zf.read("locations.json")) + assert "error" in locations[0] + assert locations[0]["index"] == 0 import os + os.unlink(output_path) finally: await client.close() @@ -446,6 +411,7 @@ async def test_export_zip_location_decrypt_error(): # Test device.py missing lines # ========================================== + @pytest.mark.asyncio async def test_device_download_photo_decode_error(): """Test Device.download_photo handles decode errors (line 137-138).""" @@ -453,35 +419,27 @@ async def test_device_download_photo_decode_error(): client.access_token = "token" # Set up private key with 3072-bit key to get 384-byte RSA packet - private_key = rsa.generate_private_key( - public_exponent=65537, - key_size=3072, - backend=default_backend() - ) + private_key = rsa.generate_private_key(public_exponent=65537, key_size=3072, backend=default_backend()) client.private_key = private_key device = Device(client, "test_device") # Create an invalid blob (will decrypt but not be valid base64) - session_key = b'\x00' * 32 + session_key = b"\x00" * 32 aesgcm = AESGCM(session_key) - iv = b'\x01' * 12 + iv = b"\x01" * 12 # Invalid inner data (not valid base64) - ciphertext = aesgcm.encrypt(iv, b'not-base64-data!!!', None) + ciphertext = aesgcm.encrypt(iv, b"not-base64-data!!!", None) public_key = private_key.public_key() session_key_packet = public_key.encrypt( session_key, - asym_padding.OAEP( - mgf=asym_padding.MGF1(algorithm=hashes.SHA256()), - algorithm=hashes.SHA256(), - label=None - ) + asym_padding.OAEP(mgf=asym_padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None), ) blob = session_key_packet + iv + ciphertext - blob_b64 = base64.b64encode(blob).decode('utf-8') + blob_b64 = base64.b64encode(blob).decode("utf-8") with pytest.raises(OperationError, match="Failed to decode picture blob"): await device.download_photo(blob_b64) @@ -494,11 +452,7 @@ async def test_device_get_history_decrypt_error(): client.access_token = "token" # Set up private key - private_key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - backend=default_backend() - ) + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) client.private_key = private_key device = Device(client, "test_device") @@ -521,6 +475,7 @@ async def test_device_get_history_decrypt_error(): # Test helper functions indirectly through client behavior # ========================================== + @pytest.mark.asyncio async def test_retry_after_header_parsing_indirectly(): """Test Retry-After header parsing through actual 429 response.""" @@ -531,15 +486,8 @@ async def test_retry_after_header_parsing_indirectly(): with aioresponses() as m: # Test with valid Retry-After number - m.put( - "https://fmd.example.com/api/v1/locationDataSize", - status=429, - headers={"Retry-After": "5"} - ) - m.put( - "https://fmd.example.com/api/v1/locationDataSize", - payload={"Data": "0"} - ) + m.put("https://fmd.example.com/api/v1/locationDataSize", status=429, headers={"Retry-After": "5"}) + m.put("https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "0"}) try: await client.get_locations() @@ -552,6 +500,7 @@ async def test_retry_after_header_parsing_indirectly(): # Additional edge cases # ========================================== + @pytest.mark.asyncio async def test_decrypt_blob_with_missing_private_key(): """Test decrypt_data_blob raises when private_key is None.""" @@ -559,7 +508,7 @@ async def test_decrypt_blob_with_missing_private_key(): # Don't set private_key # Use a valid base64 string that's long enough - dummy_blob = base64.b64encode(b'\x00' * 400).decode('utf-8') + dummy_blob = base64.b64encode(b"\x00" * 400).decode("utf-8") with pytest.raises(FmdApiException, match="Private key not loaded"): client.decrypt_data_blob(dummy_blob) @@ -586,10 +535,7 @@ async def test_client_error_generic(): await client._ensure_session() with aioresponses() as m: - m.put( - "https://fmd.example.com/api/v1/locationDataSize", - exception=aiohttp.ClientError("Generic client error") - ) + m.put("https://fmd.example.com/api/v1/locationDataSize", exception=aiohttp.ClientError("Generic client error")) try: with pytest.raises(FmdApiException, match="API request failed"): @@ -607,10 +553,7 @@ async def test_value_error_in_response_parsing(): with aioresponses() as m: # Return JSON that will cause ValueError when parsing int - m.put( - "https://fmd.example.com/api/v1/locationDataSize", - payload={"Data": "not-a-number"} - ) + m.put("https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "not-a-number"}) try: with pytest.raises(Exception): # int() will raise ValueError, caught and re-raised @@ -623,6 +566,7 @@ async def test_value_error_in_response_parsing(): # Additional tests to reach 95% coverage # ========================================== + @pytest.mark.asyncio async def test_authenticate_full_flow(): """Test complete authenticate flow including internal methods (lines 163-211).""" @@ -630,21 +574,21 @@ async def test_authenticate_full_flow(): with aioresponses() as m: # Mock salt retrieval - m.put("https://fmd.example.com/api/v1/salt", payload={"Data": base64.b64encode(b'\x00' * 16).decode()}) + m.put("https://fmd.example.com/api/v1/salt", payload={"Data": base64.b64encode(b"\x00" * 16).decode()}) # Mock token request m.put("https://fmd.example.com/api/v1/requestAccess", payload={"Data": "test_token"}) # Mock private key retrieval # Create a simple encrypted key blob password = "testpass" - salt = b'\x00' * 16 - iv = b'\x01' * 12 + salt = b"\x00" * 16 + iv = b"\x01" * 12 # Create a dummy private key private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) privkey_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() + encryption_algorithm=serialization.NoEncryption(), ) # Encrypt the private key @@ -676,16 +620,9 @@ async def test_429_with_retry_after_header(): with aioresponses() as m: # First request returns 429 with Retry-After - m.get( - "https://fmd.example.com/api/v1/test", - status=429, - headers={"Retry-After": "1"} - ) + m.get("https://fmd.example.com/api/v1/test", status=429, headers={"Retry-After": "1"}) # Second request succeeds - m.get( - "https://fmd.example.com/api/v1/test", - payload={"Data": "success"} - ) + m.get("https://fmd.example.com/api/v1/test", payload={"Data": "success"}) try: result = await client._make_api_request("GET", "/api/v1/test", {"IDT": "test", "Data": ""}) @@ -704,15 +641,9 @@ async def test_500_error_retry(): with aioresponses() as m: # First request returns 500 - m.get( - "https://fmd.example.com/api/v1/test", - status=500 - ) + m.get("https://fmd.example.com/api/v1/test", status=500) # Second request succeeds - m.get( - "https://fmd.example.com/api/v1/test", - payload={"Data": "success"} - ) + m.get("https://fmd.example.com/api/v1/test", payload={"Data": "success"}) try: result = await client._make_api_request("GET", "/api/v1/test", {"IDT": "test", "Data": ""}) @@ -731,16 +662,9 @@ async def test_negative_retry_after_header(): with aioresponses() as m: # First request returns 429 with invalid negative Retry-After - m.get( - "https://fmd.example.com/api/v1/test", - status=429, - headers={"Retry-After": "-5"} - ) + m.get("https://fmd.example.com/api/v1/test", status=429, headers={"Retry-After": "-5"}) # Second request succeeds - m.get( - "https://fmd.example.com/api/v1/test", - payload={"Data": "success"} - ) + m.get("https://fmd.example.com/api/v1/test", payload={"Data": "success"}) try: result = await client._make_api_request("GET", "/api/v1/test", {"IDT": "test", "Data": ""}) @@ -760,15 +684,10 @@ async def test_http_date_retry_after(): with aioresponses() as m: # First request returns 429 with HTTP-date Retry-After m.get( - "https://fmd.example.com/api/v1/test", - status=429, - headers={"Retry-After": "Wed, 21 Oct 2025 07:28:00 GMT"} + "https://fmd.example.com/api/v1/test", status=429, headers={"Retry-After": "Wed, 21 Oct 2025 07:28:00 GMT"} ) # Second request succeeds - m.get( - "https://fmd.example.com/api/v1/test", - payload={"Data": "success"} - ) + m.get("https://fmd.example.com/api/v1/test", payload={"Data": "success"}) try: result = await client._make_api_request("GET", "/api/v1/test", {"IDT": "test", "Data": ""}) @@ -823,14 +742,8 @@ async def test_get_pictures_with_specific_count(): with aioresponses() as m: # Mock response with 10 pictures pictures_list = [f"picture{i}" for i in range(10)] - m.put( - "https://fmd.example.com/api/v1/pictureDataSize", - payload={"Data": "10"} - ) - m.put( - "https://fmd.example.com/api/v1/pictures", - payload={"Data": pictures_list} - ) + m.put("https://fmd.example.com/api/v1/pictureDataSize", payload={"Data": "10"}) + m.put("https://fmd.example.com/api/v1/pictures", payload={"Data": pictures_list}) try: # Request only 3 pictures @@ -868,7 +781,7 @@ async def test_compute_backoff_with_jitter(): # With jitter, result should be between 0 and calculated delay for attempt in range(3): delay = _compute_backoff(1.0, attempt, 10.0, True) - expected_max = min(10.0, 1.0 * (2 ** attempt)) + expected_max = min(10.0, 1.0 * (2**attempt)) assert 0 <= delay <= expected_max @@ -881,10 +794,7 @@ async def test_429_exhausted_retries(): await client._ensure_session() with aioresponses() as m: - m.get( - "https://fmd.example.com/api/v1/test", - status=429 - ) + m.get("https://fmd.example.com/api/v1/test", status=429) try: with pytest.raises(FmdApiException, match="Rate limited.*retries exhausted"): @@ -901,17 +811,11 @@ async def test_streaming_response(): await client._ensure_session() with aioresponses() as m: - m.get( - "https://fmd.example.com/api/v1/test", - body="streaming content", - content_type="text/plain" - ) + m.get("https://fmd.example.com/api/v1/test", body="streaming content", content_type="text/plain") try: result = await client._make_api_request( - "GET", "/api/v1/test", - {"IDT": "test", "Data": ""}, - stream=True # Request streaming response + "GET", "/api/v1/test", {"IDT": "test", "Data": ""}, stream=True # Request streaming response ) # Should return the response object itself assert result is not None @@ -933,14 +837,8 @@ async def test_get_pictures_all_count(): with aioresponses() as m: # Mock response with 5 pictures pictures_list = [f"picture{i}" for i in range(5)] - m.put( - "https://fmd.example.com/api/v1/pictureDataSize", - payload={"Data": "5"} - ) - m.put( - "https://fmd.example.com/api/v1/pictures", - payload={"Data": pictures_list} - ) + m.put("https://fmd.example.com/api/v1/pictureDataSize", payload={"Data": "5"}) + m.put("https://fmd.example.com/api/v1/pictures", payload={"Data": pictures_list}) try: # Request all pictures (num_to_get=-1) @@ -954,6 +852,7 @@ async def test_get_pictures_all_count(): # Final push to 100% coverage # ========================================== + @pytest.mark.asyncio async def test_500_error_exhausted_retries_raises(): """Test 500 error with exhausted retries (lines 353-358).""" @@ -982,17 +881,11 @@ async def test_expect_json_false_returns_text(): await client._ensure_session() with aioresponses() as m: - m.post( - "https://fmd.example.com/api/v1/command", - body="Command executed", - content_type="text/plain" - ) + m.post("https://fmd.example.com/api/v1/command", body="Command executed", content_type="text/plain") try: result = await client._make_api_request( - "POST", "/api/v1/command", - {"IDT": "test", "Data": ""}, - expect_json=False + "POST", "/api/v1/command", {"IDT": "test", "Data": ""}, expect_json=False ) assert result == "Command executed" finally: @@ -1011,7 +904,7 @@ async def test_response_parsing_key_error(): m.put( "https://fmd.example.com/api/v1/test", payload={"Data": {"nested": "value"}}, # Valid JSON but might cause issues downstream - content_type="application/json" + content_type="application/json", ) try: @@ -1036,14 +929,8 @@ async def test_get_pictures_returns_list_when_all(): with aioresponses() as m: # Mock response with 10 pictures pictures_list = [f"picture{i}" for i in range(10)] - m.put( - "https://fmd.example.com/api/v1/pictureDataSize", - payload={"Data": "10"} - ) - m.put( - "https://fmd.example.com/api/v1/pictures", - payload={"Data": pictures_list} - ) + m.put("https://fmd.example.com/api/v1/pictureDataSize", payload={"Data": "10"}) + m.put("https://fmd.example.com/api/v1/pictures", payload={"Data": pictures_list}) try: # Request all pictures explicitly @@ -1066,29 +953,25 @@ async def test_export_zip_default_jpg_extension(): client.private_key = private_key # Create an encrypted blob with unknown image format (not PNG or JPEG) - session_key = b'\x00' * 32 + session_key = b"\x00" * 32 aesgcm = AESGCM(session_key) - iv = b'\x01' * 12 + iv = b"\x01" * 12 # Create image data that doesn't match PNG or JPEG magic bytes - unknown_image = b'\x00\x00\x00\x00UNKNOWN' + b'\x00' * 20 - image_b64 = base64.b64encode(unknown_image).decode('utf-8') + unknown_image = b"\x00\x00\x00\x00UNKNOWN" + b"\x00" * 20 + image_b64 = base64.b64encode(unknown_image).decode("utf-8") # Encrypt it - ciphertext = aesgcm.encrypt(iv, image_b64.encode('utf-8'), None) + ciphertext = aesgcm.encrypt(iv, image_b64.encode("utf-8"), None) public_key = private_key.public_key() session_key_packet = public_key.encrypt( session_key, - asym_padding.OAEP( - mgf=asym_padding.MGF1(algorithm=hashes.SHA256()), - algorithm=hashes.SHA256(), - label=None - ) + asym_padding.OAEP(mgf=asym_padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None), ) blob = session_key_packet + iv + ciphertext - blob_b64 = base64.b64encode(blob).decode('utf-8') + blob_b64 = base64.b64encode(blob).decode("utf-8") with aioresponses() as m: m.put("https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "0"}) @@ -1098,19 +981,21 @@ async def test_export_zip_default_jpg_extension(): try: import tempfile import zipfile - with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp: + + with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp: output_path = tmp.name await client.export_data_zip(output_path, include_pictures=True) # Verify the file was created and contains jpg file - with zipfile.ZipFile(output_path, 'r') as zf: + with zipfile.ZipFile(output_path, "r") as zf: names = zf.namelist() # Should have defaulted to .jpg extension - assert any('.jpg' in name for name in names) + assert any(".jpg" in name for name in names) # Cleanup import os + if os.path.exists(output_path): os.unlink(output_path) finally: @@ -1203,17 +1088,11 @@ async def test_non_json_response_with_expect_json_false(): await client._ensure_session() with aioresponses() as m: - m.put( - "https://fmd.example.com/api/v1/test", - body="plain text response", - content_type="text/plain" - ) + m.put("https://fmd.example.com/api/v1/test", body="plain text response", content_type="text/plain") try: result = await client._make_api_request( - "PUT", "/api/v1/test", - {"IDT": "test", "Data": ""}, - expect_json=False + "PUT", "/api/v1/test", {"IDT": "test", "Data": ""}, expect_json=False ) assert result == "plain text response" finally: