From dd3bea793e4fdb75a56cf0c4e4a63d0f1dbbd18e Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Tue, 8 Jul 2025 14:13:11 -0700 Subject: [PATCH 1/6] Wire up end to end call path (kafka -> monolith endpoints) --- src/launchpad/sentry_client.py | 42 +++++ src/launchpad/service.py | 235 +++++++++++++++++++++--- src/launchpad/size/analyzers/android.py | 22 ++- src/launchpad/size/runner.py | 46 ++++- tests/test_service.py | 42 +++-- 5 files changed, 334 insertions(+), 53 deletions(-) diff --git a/src/launchpad/sentry_client.py b/src/launchpad/sentry_client.py index 44e6a26c..9bd17efe 100644 --- a/src/launchpad/sentry_client.py +++ b/src/launchpad/sentry_client.py @@ -302,3 +302,45 @@ def _create_multipart_body(self, boundary: str, filename: str, data: bytes) -> b ] return b"".join(parts) + + +def categorize_http_error(error_result: Dict[str, Any]) -> tuple[str, str]: + """ + Categorize HTTP error results from SentryClient. + + Returns: + Tuple of (error_category, error_description) + Categories: "not_found", "server_error", "client_error", "unknown" + """ + # First try to get the structured status code + status_code = error_result.get("status_code") + if isinstance(status_code, int): + if status_code == 404: + return "not_found", f"Resource not found (HTTP {status_code})" + elif 500 <= status_code < 600: + return "server_error", f"Server error (HTTP {status_code})" + elif 400 <= status_code < 500: + return "client_error", f"Client error (HTTP {status_code})" + else: + return "unknown", f"Unexpected HTTP status {status_code}" + + # Fallback to parsing the error message string + error_msg = error_result.get("error", "") + if isinstance(error_msg, str): + # Extract HTTP status code from error message like "HTTP 404" + match = re.search(r"HTTP (\d+)", error_msg) + if match: + try: + status_code = int(match.group(1)) + if status_code == 404: + return "not_found", f"Resource not found (HTTP {status_code})" + elif 500 <= status_code < 600: + return "server_error", f"Server error (HTTP {status_code})" + elif 400 <= status_code < 500: + return "client_error", f"Client error (HTTP {status_code})" + else: + return "unknown", f"Unexpected HTTP status {status_code}" + except ValueError: + pass + + return "unknown", f"Unknown error: {error_result}" diff --git a/src/launchpad/service.py b/src/launchpad/service.py index d3cb2a7b..9641d53a 100644 --- a/src/launchpad/service.py +++ b/src/launchpad/service.py @@ -3,10 +3,13 @@ from __future__ import annotations import asyncio +import json import os import signal +import tempfile import time +from pathlib import Path from typing import Any, Dict, cast from arroyo.backends.kafka import KafkaPayload @@ -15,6 +18,15 @@ PreprodArtifactEvents, ) +from launchpad.artifacts.android.aab import AAB +from launchpad.artifacts.android.zipped_aab import ZippedAAB +from launchpad.artifacts.artifact_factory import ArtifactFactory +from launchpad.sentry_client import SentryClient, categorize_http_error +from launchpad.size.analyzers.android import AndroidAnalyzer +from launchpad.size.analyzers.apple import AppleAppAnalyzer +from launchpad.size.models.android import AndroidAppInfo +from launchpad.size.models.apple import AppleAppInfo +from launchpad.size.runner import do_preprocess, do_size from launchpad.utils.logging import get_logger from launchpad.utils.statsd import DogStatsd, get_statsd @@ -26,6 +38,11 @@ # Health check threshold - consider unhealthy if file not touched in 60 seconds HEALTHCHECK_MAX_AGE_SECONDS = 60.0 +# Artifact type constants +ARTIFACT_TYPE_XCARCHIVE = 0 +ARTIFACT_TYPE_AAB = 1 +ARTIFACT_TYPE_APK = 2 + class LaunchpadService: """Main service that orchestrates HTTP server and Kafka consumer.""" @@ -37,11 +54,15 @@ def __init__(self) -> None: self._kafka_task: asyncio.Future[Any] | None = None self._statsd: DogStatsd | None = None self._healthcheck_file: str | None = None + self._service_config: Dict[str, Any] | None = None async def setup(self) -> None: """Set up the service components.""" - service_config = get_service_config() - self._statsd = get_statsd(host=service_config["statsd_host"], port=service_config["statsd_port"]) + self._service_config = get_service_config() + self._statsd = get_statsd( + host=self._service_config["statsd_host"], + port=self._service_config["statsd_port"], + ) # Setup HTTP server with health check callback server_config = get_server_config() @@ -64,6 +85,29 @@ async def setup(self) -> None: logger.info("Service components initialized") + async def start(self) -> None: + """Start all service components.""" + if not self.server or not self.kafka_processor: + raise RuntimeError("Service not properly initialized. Call setup() first.") + + logger.info("Starting Launchpad service...") + + # Set up signal handlers for graceful shutdown + self._setup_signal_handlers() + + # Start Kafka processor in a background thread + loop = asyncio.get_event_loop() + self._kafka_task = loop.run_in_executor(None, self.kafka_processor.run) + + # Start HTTP server as a background task + server_task = asyncio.create_task(self.server.start()) + logger.info("Launchpad service started successfully") + + try: + await self._shutdown_event.wait() + finally: + await self._cleanup(server_task) + def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None: """ Handle incoming Kafka messages. @@ -78,58 +122,187 @@ def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None: if self._statsd: self._statsd.increment("launchpad.artifact.processing.started") - # TODO: Implement actual analysis logic - # This will need to: - # 1. Fetch the artifact using artifact_id from storage/API - # 2. Determine platform by examining the artifact - # 3. Run appropriate analyzer (iOS/Android) - # 4. Store results + # Perform the actual artifact analysis + self.process_artifact_analysis(artifact_id, project_id, organization_id) - # For now, just log - logger.info(f"Analysis completed for artifact {artifact_id} (stub)") + logger.info(f"Analysis completed for artifact {artifact_id}") if self._statsd: self._statsd.increment("launchpad.artifact.processing.completed") except Exception as e: - logger.error(f"Analysis failed for artifact {artifact_id}: {e}", exc_info=True) + # Log the full error for debugging + logger.error( + f"Failed to process artifact {artifact_id} (project: {project_id}, org: {organization_id}): {e}", + exc_info=True, + ) + if self._statsd: self._statsd.increment("launchpad.artifact.processing.failed") - # Re-raise to let Arroyo handle the error (can be configured for DLQ) - raise - async def start(self) -> None: - """Start all service components.""" - if not self.server or not self.kafka_processor: + def process_artifact_analysis(self, artifact_id: str, project_id: str, organization_id: str) -> None: + """ + Download artifact and perform size analysis. + """ + if not self._service_config: raise RuntimeError("Service not properly initialized. Call setup() first.") - logger.info("Starting Launchpad service...") + sentry_client = SentryClient(base_url=self._service_config["sentry_base_url"]) - # Set up signal handlers for graceful shutdown - self._setup_signal_handlers() + file_content, _ = self._download_artifact(sentry_client, artifact_id, project_id, organization_id) + temp_file = self._save_to_temp_file(file_content, artifact_id) - # Start Kafka processor in a background thread - loop = asyncio.get_event_loop() - self._kafka_task = loop.run_in_executor(None, self.kafka_processor.run) + try: + artifact = ArtifactFactory.from_path(Path(temp_file)) + logger.info(f"Running preprocessing on {temp_file}...") + app_info = do_preprocess(Path(temp_file)) + logger.info(f"Preprocessing completed for artifact {artifact_id}") + update_data = self._prepare_update_data(app_info, artifact) + + logger.info(f"Sending preprocessed info to Sentry for artifact {artifact_id}...") + + update_result = sentry_client.update_artifact( + org=organization_id, + project=project_id, + artifact_id=artifact_id, + data=update_data, + ) + + if "error" in update_result: + logger.error(f"Failed to send preprocessed info: {update_result['error']}") + else: + logger.info(f"Successfully sent preprocessed info for artifact {artifact_id}") + + analyzer = self._create_analyzer(app_info) + logger.info(f"Running full analysis on {temp_file}...") + results = do_size(Path(temp_file), analyzer=analyzer) + logger.info(f"Size analysis completed for artifact {artifact_id}") + self._upload_results(sentry_client, results, artifact_id, project_id, organization_id) - # Start HTTP server as a background task - server_task = asyncio.create_task(self.server.start()) + finally: + self._safe_cleanup(temp_file, "temporary file") + + def _download_artifact( + self, + sentry_client: SentryClient, + artifact_id: str, + project_id: str, + organization_id: str, + ) -> tuple[bytes, int]: + """Download artifact from Sentry and validate response.""" + logger.info(f"Downloading artifact {artifact_id}...") + download_result = sentry_client.download_artifact( + org=organization_id, project=project_id, artifact_id=artifact_id + ) - logger.info("Launchpad service started successfully") + if "error" in download_result: + error_category, error_description = categorize_http_error(download_result) + raise RuntimeError(f"Failed to download artifact ({error_category}): {error_description}") + + if not download_result.get("success"): + raise RuntimeError(f"Download was not successful: {download_result}") + + file_content = download_result["file_content"] + file_size = download_result["file_size_bytes"] + + logger.info(f"Downloaded artifact {artifact_id}: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)") + return file_content, file_size + + def _save_to_temp_file(self, file_content: bytes, artifact_id: str) -> str: + """Save file content to temporary file and return path.""" + with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tf: + tf.write(file_content) + tf.flush() + temp_file = tf.name + + logger.info(f"Saved artifact to temporary file: {temp_file}") + return temp_file + + def _prepare_update_data(self, app_info: AppleAppInfo | AndroidAppInfo, artifact: Any) -> Dict[str, Any]: + """Prepare update data based on app platform and artifact type.""" + if isinstance(app_info, AppleAppInfo): + return { + "build_version": app_info.version, + "build_number": (int(app_info.build) if str(app_info.build).isdigit() else app_info.build), + "artifact_type": ARTIFACT_TYPE_XCARCHIVE, + "apple_app_info": { + "is_simulator": app_info.is_simulator, + "codesigning_type": app_info.codesigning_type, + "profile_name": app_info.profile_name, + "is_code_signature_valid": app_info.is_code_signature_valid, + "code_signature_errors": app_info.code_signature_errors, + }, + } + elif isinstance(app_info, AndroidAppInfo): + artifact_type = ARTIFACT_TYPE_AAB if isinstance(artifact, (AAB, ZippedAAB)) else ARTIFACT_TYPE_APK + return { + "build_version": app_info.version, + "build_number": (int(app_info.build) if app_info.build.isdigit() else None), + "artifact_type": artifact_type, + } + else: + raise ValueError(f"Unsupported app_info type: {type(app_info)}") + + def _create_analyzer(self, app_info: AppleAppInfo | AndroidAppInfo) -> AndroidAnalyzer | AppleAppAnalyzer: + """Create analyzer with preprocessed app info.""" + if isinstance(app_info, AndroidAppInfo): + analyzer = AndroidAnalyzer() + analyzer.app_info = app_info + return analyzer + else: # AppleAppInfo + analyzer = AppleAppAnalyzer() + analyzer.app_info = app_info + return analyzer + + def _upload_results( + self, + sentry_client: SentryClient, + results: Any, + artifact_id: str, + project_id: str, + organization_id: str, + ) -> None: + """Upload analysis results to Sentry.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as af: + json.dump(results.to_dict(), af, indent=2) + analysis_file = af.name + + logger.info(f"Analysis results written to temporary file: {analysis_file}") try: - # Wait for shutdown signal - await self._shutdown_event.wait() + logger.info(f"Uploading analysis results for artifact {artifact_id}...") + upload_result = sentry_client.upload_size_analysis_file( + org=organization_id, + project=project_id, + artifact_id=artifact_id, + file_path=analysis_file, + ) + + if "error" in upload_result: + logger.error(f"Failed to upload analysis results: {upload_result['error']}") + else: + logger.info(f"Successfully uploaded analysis results for artifact {artifact_id}") + finally: - # Cleanup - await self._cleanup(server_task) + self._safe_cleanup(analysis_file, "analysis file") + + def _safe_cleanup(self, file_path: str, description: str) -> None: + """Safely clean up a file with error handling.""" + if file_path and os.path.exists(file_path): + try: + os.remove(file_path) + logger.debug(f"Cleaned up {description}: {file_path}") + except Exception as e: + logger.warning(f"Failed to clean up {description} {file_path}: {e}") def _setup_signal_handlers(self) -> None: """Set up signal handlers for graceful shutdown.""" def signal_handler(signum: int, frame: Any) -> None: if self._shutdown_event.is_set(): - logger.info(f"Received signal {signum} during shutdown, ignoring...") + logger.info(f"Received signal {signum} during shutdown, forcing exit...") + # Force exit if we get a second signal + os._exit(1) return logger.info(f"Received signal {signum}, initiating shutdown...") @@ -248,6 +421,7 @@ def get_service_config() -> Dict[str, Any]: """Get service configuration from environment.""" statsd_host = os.getenv("STATSD_HOST", "127.0.0.1") statsd_port_str = os.getenv("STATSD_PORT", "8125") + sentry_base_url = os.getenv("SENTRY_BASE_URL") try: statsd_port = int(statsd_port_str) @@ -257,6 +431,7 @@ def get_service_config() -> Dict[str, Any]: return { "statsd_host": statsd_host, "statsd_port": statsd_port, + "sentry_base_url": sentry_base_url, } diff --git a/src/launchpad/size/analyzers/android.py b/src/launchpad/size/analyzers/android.py index 7b5dee15..a575f2e0 100644 --- a/src/launchpad/size/analyzers/android.py +++ b/src/launchpad/size/analyzers/android.py @@ -48,17 +48,35 @@ def __init__( skip_insights: Skip insights generation for faster analysis """ self.skip_insights = skip_insights + self.app_info: AndroidAppInfo | None = None - def analyze(self, artifact: AndroidArtifact) -> AndroidAnalysisResults: + def preprocess(self, artifact: AndroidArtifact) -> AndroidAppInfo: + """Extract basic app information from the manifest. + + Args: + artifact: Android artifact to preprocess + + Returns: + Basic app information extracted from manifest + """ manifest_dict = artifact.get_manifest().model_dump() - app_info = AndroidAppInfo( + self.app_info = AndroidAppInfo( name=manifest_dict["application"]["label"] or "Unknown", version=manifest_dict["version_name"] or "Unknown", build=manifest_dict["version_code"] or "Unknown", package_name=manifest_dict["package_name"], ) + return self.app_info + + def analyze(self, artifact: AndroidArtifact) -> AndroidAnalysisResults: + # Use preprocessed app info if available, otherwise extract it + if not self.app_info: + self.app_info = self.preprocess(artifact) + + app_info = self.app_info + apks: list[APK] = [] # Split AAB into APKs, or use the APK directly if isinstance(artifact, AAB): diff --git a/src/launchpad/size/runner.py b/src/launchpad/size/runner.py index ca361446..11e7975f 100644 --- a/src/launchpad/size/runner.py +++ b/src/launchpad/size/runner.py @@ -8,22 +8,60 @@ from launchpad.artifacts.artifact_factory import ArtifactFactory from launchpad.size.analyzers.android import AndroidAnalyzer from launchpad.size.analyzers.apple import AppleAppAnalyzer +from launchpad.size.models.android import AndroidAppInfo +from launchpad.size.models.apple import AppleAppInfo from launchpad.size.models.common import BaseAnalysisResults -def do_size(path: Path, **flags: Any) -> BaseAnalysisResults: - start_time = time.time() - artifact = ArtifactFactory.from_path(path) +def do_preprocess(path: Path, **flags: Any) -> AndroidAppInfo | AppleAppInfo: + """Perform preprocessing step only to extract basic app info. + + Args: + path: Path to the artifact + **flags: Additional flags passed to analyzer + + Returns: + App info extracted during preprocessing + """ # isinstance switch below is a bit sad. Ryan suggested a # get_analyzer method on artifact which might be nicer. - analyzer: AndroidAnalyzer | AppleAppAnalyzer + artifact = ArtifactFactory.from_path(path) if isinstance(artifact, AndroidArtifact): analyzer = AndroidAnalyzer(**flags) + return analyzer.preprocess(cast(AndroidArtifact, artifact)) elif isinstance(artifact, AppleArtifact): analyzer = AppleAppAnalyzer(**flags) + return analyzer.preprocess(cast(AppleArtifact, artifact)) else: raise ValueError(f"Unknown artifact kind {artifact}") + + +def do_size( + path: Path, analyzer: AndroidAnalyzer | AppleAppAnalyzer | None = None, **flags: Any +) -> BaseAnalysisResults: + """Perform full size analysis. + + Args: + path: Path to the artifact + analyzer: Optional pre-configured analyzer (with preprocessing already done) + **flags: Additional flags passed to analyzer if creating new one + + Returns: + Full analysis results + """ + start_time = time.time() + artifact = ArtifactFactory.from_path(path) + + # If no analyzer provided, create one + if analyzer is None: + if isinstance(artifact, AndroidArtifact): + analyzer = AndroidAnalyzer(**flags) + elif isinstance(artifact, AppleArtifact): + analyzer = AppleAppAnalyzer(**flags) + else: + raise ValueError(f"Unknown artifact kind {artifact}") + results = analyzer.analyze(cast(Any, artifact)) end_time = time.time() diff --git a/tests/test_service.py b/tests/test_service.py index e9e0cd75..d3b0a3d8 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -5,7 +5,7 @@ import os import time -from unittest.mock import Mock +from unittest.mock import Mock, patch import pytest @@ -61,7 +61,8 @@ async def test_ready_check(self): class TestLaunchpadService: """Test cases for LaunchpadService.""" - def test_handle_kafka_message_ios(self): + @patch.object(LaunchpadService, "process_artifact_analysis") + def test_handle_kafka_message_ios(self, mock_process): """Test handling iOS artifact messages.""" service = LaunchpadService() @@ -78,11 +79,15 @@ def test_handle_kafka_message_ios(self): # handle_kafka_message is synchronous service.handle_kafka_message(payload) + # Verify process_artifact_analysis was called with correct args + mock_process.assert_called_once_with("ios-test-123", "test-project-ios", "test-org-123") + # Verify metrics were recorded service._statsd.increment.assert_any_call("launchpad.artifact.processing.started") service._statsd.increment.assert_any_call("launchpad.artifact.processing.completed") - def test_handle_kafka_message_android(self): + @patch.object(LaunchpadService, "process_artifact_analysis") + def test_handle_kafka_message_android(self, mock_process): """Test handling Android artifact messages.""" service = LaunchpadService() @@ -99,21 +104,23 @@ def test_handle_kafka_message_android(self): # handle_kafka_message is synchronous service.handle_kafka_message(payload) + # Verify process_artifact_analysis was called with correct args + mock_process.assert_called_once_with("android-test-456", "test-project-android", "test-org-456") + # Verify metrics were recorded service._statsd.increment.assert_any_call("launchpad.artifact.processing.started") service._statsd.increment.assert_any_call("launchpad.artifact.processing.completed") - def test_handle_kafka_message_error(self): + @patch.object(LaunchpadService, "process_artifact_analysis") + def test_handle_kafka_message_error(self, mock_process): """Test error handling in message processing.""" service = LaunchpadService() - # Mock statsd to raise an exception on the second call + # Mock statsd service._statsd = Mock() - service._statsd.increment.side_effect = [ - None, # First call: processing.started - Exception("Simulated error"), # Second call: processing.completed (raises) - None, # Third call: processing.failed - ] + + # Make process_artifact_analysis raise an exception + mock_process.side_effect = RuntimeError("Download failed: HTTP 404") # Create a valid payload payload: PreprodArtifactEvents = { @@ -122,16 +129,17 @@ def test_handle_kafka_message_error(self): "organization_id": "test-org", } - # This should raise the exception (to be handled by Arroyo) - with pytest.raises(Exception, match="Simulated error"): - service.handle_kafka_message(payload) + # This should not raise (simplified error handling catches all exceptions) + service.handle_kafka_message(payload) + + # Verify process_artifact_analysis was called + mock_process.assert_called_once_with("test-123", "test-project", "test-org") - # Verify the metrics were called in the expected order + # Verify the metrics were called correctly calls = service._statsd.increment.call_args_list - assert len(calls) == 3 + assert len(calls) == 2 assert calls[0][0][0] == "launchpad.artifact.processing.started" - assert calls[1][0][0] == "launchpad.artifact.processing.completed" - assert calls[2][0][0] == "launchpad.artifact.processing.failed" + assert calls[1][0][0] == "launchpad.artifact.processing.failed" @pytest.mark.asyncio async def test_health_check_with_healthcheck_file(self, tmp_path): From 8ccc467e198585f028e43e06ecc3d66ca0d5b09e Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Tue, 8 Jul 2025 16:14:44 -0700 Subject: [PATCH 2/6] update integration tests --- tests/test_integration.py | 55 ++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/tests/test_integration.py b/tests/test_integration.py index 85b7e6bc..56f88a1f 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -47,29 +47,48 @@ async def test_kafka_message_processing(self): # Mock statsd service._statsd = Mock() - # Test artifact analysis message with iOS artifact - ios_payload: PreprodArtifactEvents = { - "artifact_id": "ios-test-123", - "project_id": "test-project-ios", - "organization_id": "test-org-123", + # Mock service config to make the service appear initialized + service._service_config = { + "statsd_host": "127.0.0.1", + "statsd_port": 8125, + "sentry_base_url": "https://sentry.example.com", } - # handle_kafka_message is synchronous - service.handle_kafka_message(ios_payload) + # Mock process_artifact_analysis to avoid actual processing + with patch.object(service, "process_artifact_analysis") as mock_process: + # Test artifact analysis message with iOS artifact + ios_payload: PreprodArtifactEvents = { + "artifact_id": "ios-test-123", + "project_id": "test-project-ios", + "organization_id": "test-org-123", + } - # Verify statsd metrics were sent - service._statsd.increment.assert_any_call("launchpad.artifact.processing.started") - service._statsd.increment.assert_any_call("launchpad.artifact.processing.completed") + # handle_kafka_message is synchronous + service.handle_kafka_message(ios_payload) - # Test artifact analysis message with Android artifact - android_payload: PreprodArtifactEvents = { - "artifact_id": "android-test-456", - "project_id": "test-project-android", - "organization_id": "test-org-456", - } + # Verify the processing method was called + mock_process.assert_called_once_with("ios-test-123", "test-project-ios", "test-org-123") + + # Verify statsd metrics were sent + service._statsd.increment.assert_any_call("launchpad.artifact.processing.started") + service._statsd.increment.assert_any_call("launchpad.artifact.processing.completed") + + # Reset mocks for next test + mock_process.reset_mock() + service._statsd.reset_mock() + + # Test artifact analysis message with Android artifact + android_payload: PreprodArtifactEvents = { + "artifact_id": "android-test-456", + "project_id": "test-project-android", + "organization_id": "test-org-456", + } + + # handle_kafka_message is synchronous + service.handle_kafka_message(android_payload) - # handle_kafka_message is synchronous - service.handle_kafka_message(android_payload) + # Verify the processing method was called + mock_process.assert_called_once_with("android-test-456", "test-project-android", "test-org-456") @pytest.mark.asyncio async def test_error_handling_in_message_processing(self): From fbf1baf84961cc6034f37dc84fee56590c6fc885 Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Tue, 8 Jul 2025 18:47:28 -0700 Subject: [PATCH 3/6] add error handling --- src/launchpad/constants.py | 58 ++++ src/launchpad/sentry_client.py | 74 +++-- src/launchpad/service.py | 314 ++++++++++++++++++---- tests/test_integration.py | 4 +- tests/test_service.py | 14 +- tests/unit/test_sentry_client_retry.py | 188 +++++++++++++ tests/unit/test_service_error_handling.py | 268 ++++++++++++++++++ 7 files changed, 834 insertions(+), 86 deletions(-) create mode 100644 tests/unit/test_sentry_client_retry.py create mode 100644 tests/unit/test_service_error_handling.py diff --git a/src/launchpad/constants.py b/src/launchpad/constants.py index 9bac61a9..ecb5a31a 100644 --- a/src/launchpad/constants.py +++ b/src/launchpad/constants.py @@ -1,4 +1,62 @@ """Constants used throughout the Launchpad application.""" +from enum import Enum + # Kafka topic names PREPROD_ARTIFACT_EVENTS_TOPIC = "preprod-artifact-events" + +# Error code constants (matching the Django model) +ERROR_CODE_UNKNOWN = 0 +ERROR_CODE_UPLOAD_TIMEOUT = 1 +ERROR_CODE_ARTIFACT_PROCESSING_TIMEOUT = 2 +ERROR_CODE_ARTIFACT_PROCESSING_ERROR = 3 + +# Artifact type constants +ARTIFACT_TYPE_XCARCHIVE = 0 +ARTIFACT_TYPE_AAB = 1 +ARTIFACT_TYPE_APK = 2 + +# Retry configuration +MAX_RETRY_ATTEMPTS = 3 + +# Health check threshold - consider unhealthy if file not touched in 60 seconds +HEALTHCHECK_MAX_AGE_SECONDS = 60.0 + + +class OperationName(Enum): + """Enum for operation names used in retry logic.""" + + PREPROCESSING = "preprocessing" + SIZE_ANALYSIS = "size analysis" + + +class ProcessingErrorMessage(Enum): + """Fixed set of error messages for artifact processing.""" + + # Network-related errors + DOWNLOAD_FAILED = "Failed to download artifact from Sentry" + UPLOAD_FAILED = "Failed to upload analysis results to Sentry" + UPDATE_FAILED = "Failed to update artifact info in Sentry" + + # Processing-related errors + PREPROCESSING_FAILED = "Failed to extract basic app information" + SIZE_ANALYSIS_FAILED = "Failed to perform size analysis" + ARTIFACT_PARSING_FAILED = "Failed to parse artifact file" + UNSUPPORTED_ARTIFACT_TYPE = "Unsupported artifact type" + + # System-related errors + TEMP_FILE_CREATION_FAILED = "Failed to create temporary file" + CLEANUP_FAILED = "Failed to clean up temporary files" + + # Timeout errors + PROCESSING_TIMEOUT = "Processing timed out" + + # Unknown errors + UNKNOWN_ERROR = "An unknown error occurred" + + +# Operation to error message mapping +OPERATION_ERRORS = { + OperationName.PREPROCESSING: ProcessingErrorMessage.PREPROCESSING_FAILED, + OperationName.SIZE_ANALYSIS: ProcessingErrorMessage.SIZE_ANALYSIS_FAILED, +} diff --git a/src/launchpad/sentry_client.py b/src/launchpad/sentry_client.py index 9bd17efe..7610de53 100644 --- a/src/launchpad/sentry_client.py +++ b/src/launchpad/sentry_client.py @@ -15,9 +15,31 @@ import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + logger = logging.getLogger(__name__) +def create_retry_session(max_retries: int = 3) -> requests.Session: + """Create a requests session with retry configuration.""" + session = requests.Session() + + retry_strategy = Retry( + total=max_retries, + backoff_factor=0.1, + status_forcelist=[429, 500, 502, 503, 504], # Retry on these HTTP status codes + allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"], + raise_on_status=False, # Don't raise on HTTP errors, let our code handle them + ) + + adapter = HTTPAdapter(max_retries=retry_strategy) + session.mount("http://", adapter) + session.mount("https://", adapter) + + return session + + class SentryClient: """Client for authenticated API calls to the Sentry monolith.""" @@ -27,36 +49,34 @@ def __init__(self, base_url: str) -> None: if not self.shared_secret: raise RuntimeError("LAUNCHPAD_RPC_SHARED_SECRET must be provided or set as environment variable") + self.session = create_retry_session() + def download_artifact(self, org: str, project: str, artifact_id: str) -> Dict[str, Any]: """Download preprod artifact.""" endpoint = f"/api/0/internal/{org}/{project}/files/preprodartifacts/{artifact_id}/" url = self._build_url(endpoint) - try: - logger.debug(f"GET {url}") - response = requests.get(url, headers=self._get_auth_headers(), timeout=120, stream=True) - - if response.status_code != 200: - return self._handle_error_response(response, "Download") - - # Read content with size limit - content = b"" - for chunk in response.iter_content(chunk_size=8192): - if chunk: - content += chunk - if len(content) > 5 * 1024 * 1024 * 1024: # 5GB limit - logger.warning("Download truncated at 5GB") - break - - return { - "success": True, - "file_content": content, - "file_size_bytes": len(content), - "headers": dict(response.headers), - } - except Exception as e: - logger.error(f"Download failed: {e}") - return {"error": str(e)} + logger.debug(f"GET {url}") + response = self.session.get(url, headers=self._get_auth_headers(), timeout=120, stream=True) + + if response.status_code != 200: + return self._handle_error_response(response, "Download") + + # Read content with size limit + content = b"" + for chunk in response.iter_content(chunk_size=8192): + if chunk: + content += chunk + if len(content) > 5 * 1024 * 1024 * 1024: # 5GB limit + logger.warning("Download truncated at 5GB") + break + + return { + "success": True, + "file_content": content, + "file_size_bytes": len(content), + "headers": dict(response.headers), + } def update_artifact(self, org: str, project: str, artifact_id: str, data: Dict[str, Any]) -> Dict[str, Any]: """Update preprod artifact.""" @@ -174,7 +194,7 @@ def _make_json_request( operation = operation or f"{method} {endpoint}" logger.debug(f"{method} {url}") - response = requests.request( + response = self.session.request( method=method, url=url, data=body or None, @@ -249,7 +269,7 @@ def _upload_chunk(self, org: str, chunk: Dict[str, Any]) -> bool: } try: - response = requests.post(url, data=body, headers=headers, timeout=60) + response = self.session.post(url, data=body, headers=headers, timeout=60) success = response.status_code in [200, 201, 409] # 409 = already exists if not success: diff --git a/src/launchpad/service.py b/src/launchpad/service.py index 9641d53a..2bbd4905 100644 --- a/src/launchpad/service.py +++ b/src/launchpad/service.py @@ -21,6 +21,19 @@ from launchpad.artifacts.android.aab import AAB from launchpad.artifacts.android.zipped_aab import ZippedAAB from launchpad.artifacts.artifact_factory import ArtifactFactory +from launchpad.constants import ( + ARTIFACT_TYPE_AAB, + ARTIFACT_TYPE_APK, + ARTIFACT_TYPE_XCARCHIVE, + ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ERROR_CODE_ARTIFACT_PROCESSING_TIMEOUT, + ERROR_CODE_UNKNOWN, + HEALTHCHECK_MAX_AGE_SECONDS, + MAX_RETRY_ATTEMPTS, + OPERATION_ERRORS, + OperationName, + ProcessingErrorMessage, +) from launchpad.sentry_client import SentryClient, categorize_http_error from launchpad.size.analyzers.android import AndroidAnalyzer from launchpad.size.analyzers.apple import AppleAppAnalyzer @@ -35,14 +48,6 @@ logger = get_logger(__name__) -# Health check threshold - consider unhealthy if file not touched in 60 seconds -HEALTHCHECK_MAX_AGE_SECONDS = 60.0 - -# Artifact type constants -ARTIFACT_TYPE_XCARCHIVE = 0 -ARTIFACT_TYPE_AAB = 1 -ARTIFACT_TYPE_APK = 2 - class LaunchpadService: """Main service that orchestrates HTTP server and Kafka consumer.""" @@ -122,8 +127,7 @@ def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None: if self._statsd: self._statsd.increment("launchpad.artifact.processing.started") - # Perform the actual artifact analysis - self.process_artifact_analysis(artifact_id, project_id, organization_id) + self.process_artifact(artifact_id, project_id, organization_id) logger.info(f"Analysis completed for artifact {artifact_id}") @@ -140,7 +144,7 @@ def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None: if self._statsd: self._statsd.increment("launchpad.artifact.processing.failed") - def process_artifact_analysis(self, artifact_id: str, project_id: str, organization_id: str) -> None: + def process_artifact(self, artifact_id: str, project_id: str, organization_id: str) -> None: """ Download artifact and perform size analysis. """ @@ -148,19 +152,22 @@ def process_artifact_analysis(self, artifact_id: str, project_id: str, organizat raise RuntimeError("Service not properly initialized. Call setup() first.") sentry_client = SentryClient(base_url=self._service_config["sentry_base_url"]) - - file_content, _ = self._download_artifact(sentry_client, artifact_id, project_id, organization_id) - temp_file = self._save_to_temp_file(file_content, artifact_id) + temp_file = None try: + file_content, _ = self._download_artifact(sentry_client, artifact_id, project_id, organization_id) + temp_file = self._save_to_temp_file(file_content, artifact_id) + artifact = ArtifactFactory.from_path(Path(temp_file)) logger.info(f"Running preprocessing on {temp_file}...") - app_info = do_preprocess(Path(temp_file)) + app_info = self._retry_operation( + lambda: do_preprocess(Path(temp_file)), + OperationName.PREPROCESSING, + ) logger.info(f"Preprocessing completed for artifact {artifact_id}") - update_data = self._prepare_update_data(app_info, artifact) + update_data = self._prepare_update_data(app_info, artifact) logger.info(f"Sending preprocessed info to Sentry for artifact {artifact_id}...") - update_result = sentry_client.update_artifact( org=organization_id, project=project_id, @@ -169,18 +176,160 @@ def process_artifact_analysis(self, artifact_id: str, project_id: str, organizat ) if "error" in update_result: - logger.error(f"Failed to send preprocessed info: {update_result['error']}") - else: - logger.info(f"Successfully sent preprocessed info for artifact {artifact_id}") + error_msg = f"Failed to send preprocessed info: {update_result['error']}" + logger.error(error_msg) + # Extract just the error details for the database, the enum prefix will be added automatically + detailed_error = update_result["error"] + self._update_artifact_error( + sentry_client, + artifact_id, + project_id, + organization_id, + ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.UPDATE_FAILED, + detailed_error, + ) + return + + logger.info(f"Successfully sent preprocessed info for artifact {artifact_id}") analyzer = self._create_analyzer(app_info) logger.info(f"Running full analysis on {temp_file}...") - results = do_size(Path(temp_file), analyzer=analyzer) + results = self._retry_operation( + lambda: do_size(Path(temp_file), analyzer=analyzer), + OperationName.SIZE_ANALYSIS, + ) logger.info(f"Size analysis completed for artifact {artifact_id}") + self._upload_results(sentry_client, results, artifact_id, project_id, organization_id) + except Exception as e: + logger.error(f"Failed to process artifact {artifact_id}: {e}", exc_info=True) + + error_code, error_message = self._categorize_processing_error(e) + + # Include detailed error information for better debugging + detailed_error = str(e) + + self._update_artifact_error( + sentry_client, artifact_id, project_id, organization_id, error_code, error_message, detailed_error + ) + raise + finally: - self._safe_cleanup(temp_file, "temporary file") + if temp_file: + self._safe_cleanup(temp_file, "temporary file") + + def _retry_operation(self, operation, operation_name: OperationName): + """Retry an operation up to MAX_RETRY_ATTEMPTS times.""" + error_message = OPERATION_ERRORS[operation_name] + last_exception = None + + for attempt in range(1, MAX_RETRY_ATTEMPTS + 1): + try: + logger.debug(f"Attempting {operation_name.value} (attempt {attempt}/{MAX_RETRY_ATTEMPTS})") + return operation() + except Exception as e: + last_exception = e + logger.warning(f"{operation_name.value} failed on attempt {attempt}/{MAX_RETRY_ATTEMPTS}: {e}") + + if self._is_non_retryable_error(e): + logger.info(f"Non-retryable error for {operation_name.value}, not retrying") + break + + if attempt < MAX_RETRY_ATTEMPTS: + logger.info(f"Retrying {operation_name.value} in a moment...") + time.sleep(1) + + logger.error(f"All {MAX_RETRY_ATTEMPTS} attempts failed for {operation_name.value}") + raise RuntimeError(f"{error_message.value}: {str(last_exception)}") from last_exception + + def _is_non_retryable_error(self, exception: Exception) -> bool: + """Determine if an error should not be retried.""" + return isinstance(exception, (ValueError, NotImplementedError, FileNotFoundError)) + + def _categorize_processing_error(self, exception: Exception) -> tuple[int, ProcessingErrorMessage]: + """Categorize an exception into error code and message.""" + if isinstance(exception, ValueError): + return ERROR_CODE_ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.ARTIFACT_PARSING_FAILED + elif isinstance(exception, NotImplementedError): + return ERROR_CODE_ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.UNSUPPORTED_ARTIFACT_TYPE + elif isinstance(exception, FileNotFoundError): + return ERROR_CODE_ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.ARTIFACT_PARSING_FAILED + elif isinstance(exception, RuntimeError): + error_str = str(exception).lower() + if "timeout" in error_str: + return ERROR_CODE_ARTIFACT_PROCESSING_TIMEOUT, ProcessingErrorMessage.PROCESSING_TIMEOUT + elif "preprocess" in error_str: + return ERROR_CODE_ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.PREPROCESSING_FAILED + elif "size" in error_str or "analysis" in error_str: + return ERROR_CODE_ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.SIZE_ANALYSIS_FAILED + else: + return ERROR_CODE_ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.UNKNOWN_ERROR + else: + return ERROR_CODE_UNKNOWN, ProcessingErrorMessage.UNKNOWN_ERROR + + def _update_artifact_error( + self, + sentry_client: SentryClient, + artifact_id: str, + project_id: str, + organization_id: str, + error_code: int, + error_message: ProcessingErrorMessage, + detailed_error: str | None = None, + ) -> None: + """Update artifact with error information.""" + try: + logger.info(f"Updating artifact {artifact_id} with error code {error_code}") + + # Use detailed error message if provided, otherwise use enum value + final_error_message = f"{error_message.value}: {detailed_error}" if detailed_error else error_message.value + + # Log error to datadog with tags for better monitoring + if self._statsd: + self._statsd.increment( + "launchpad.artifact.processing.error", + tags=[ + f"error_code:{error_code}", + f"error_type:{error_message.name}", + f"project_id:{project_id}", + f"organization_id:{organization_id}", + ], + ) + + result = sentry_client.update_artifact( + org=organization_id, + project=project_id, + artifact_id=artifact_id, + data={"error_code": error_code, "error_message": final_error_message}, + ) + + if "error" in result: + logger.error(f"Failed to update artifact with error: {result['error']}") + else: + logger.info(f"Successfully updated artifact {artifact_id} with error information") + + except Exception as e: + logger.error(f"Failed to update artifact {artifact_id} with error information: {e}", exc_info=True) + + def _handle_download_error( + self, sentry_client: SentryClient, artifact_id: str, project_id: str, organization_id: str, error_msg: str + ) -> None: + """Handle download error by logging and updating artifact.""" + logger.error(error_msg) + # Extract just the error details for the database, the enum prefix will be added automatically + detailed_error = error_msg.split(": ", 1)[1] if ": " in error_msg else error_msg + self._update_artifact_error( + sentry_client, + artifact_id, + project_id, + organization_id, + ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.DOWNLOAD_FAILED, + detailed_error, + ) + raise RuntimeError(error_msg) def _download_artifact( self, @@ -191,36 +340,70 @@ def _download_artifact( ) -> tuple[bytes, int]: """Download artifact from Sentry and validate response.""" logger.info(f"Downloading artifact {artifact_id}...") - download_result = sentry_client.download_artifact( - org=organization_id, project=project_id, artifact_id=artifact_id - ) - if "error" in download_result: - error_category, error_description = categorize_http_error(download_result) - raise RuntimeError(f"Failed to download artifact ({error_category}): {error_description}") - - if not download_result.get("success"): - raise RuntimeError(f"Download was not successful: {download_result}") + try: + download_result = sentry_client.download_artifact( + org=organization_id, project=project_id, artifact_id=artifact_id + ) - file_content = download_result["file_content"] - file_size = download_result["file_size_bytes"] + if "error" in download_result: + error_category, error_description = categorize_http_error(download_result) + self._handle_download_error( + sentry_client, + artifact_id, + project_id, + organization_id, + f"Failed to download artifact ({error_category}): {error_description}", + ) + + if not download_result.get("success"): + self._handle_download_error( + sentry_client, + artifact_id, + project_id, + organization_id, + f"Download was not successful: {download_result}", + ) + + file_content = download_result["file_content"] + file_size = download_result["file_size_bytes"] + + logger.info(f"Downloaded artifact {artifact_id}: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)") + return file_content, file_size - logger.info(f"Downloaded artifact {artifact_id}: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)") - return file_content, file_size + except Exception as e: + if not isinstance(e, RuntimeError): + detailed_error = str(e) + self._update_artifact_error( + sentry_client, + artifact_id, + project_id, + organization_id, + ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.DOWNLOAD_FAILED, + detailed_error, + ) + raise def _save_to_temp_file(self, file_content: bytes, artifact_id: str) -> str: """Save file content to temporary file and return path.""" - with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tf: - tf.write(file_content) - tf.flush() - temp_file = tf.name + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tf: + tf.write(file_content) + tf.flush() + temp_file = tf.name - logger.info(f"Saved artifact to temporary file: {temp_file}") - return temp_file + logger.info(f"Saved artifact to temporary file: {temp_file}") + return temp_file + + except Exception as e: + logger.error(f"Failed to save artifact to temporary file: {e}") + raise RuntimeError(f"{ProcessingErrorMessage.TEMP_FILE_CREATION_FAILED.value}: {str(e)}") from e def _prepare_update_data(self, app_info: AppleAppInfo | AndroidAppInfo, artifact: Any) -> Dict[str, Any]: """Prepare update data based on app platform and artifact type.""" if isinstance(app_info, AppleAppInfo): + # TODO: add "date_built" field once exposed in 'AppleAppInfo' return { "build_version": app_info.version, "build_number": (int(app_info.build) if str(app_info.build).isdigit() else app_info.build), @@ -235,6 +418,7 @@ def _prepare_update_data(self, app_info: AppleAppInfo | AndroidAppInfo, artifact } elif isinstance(app_info, AndroidAppInfo): artifact_type = ARTIFACT_TYPE_AAB if isinstance(artifact, (AAB, ZippedAAB)) else ARTIFACT_TYPE_APK + # TODO: add "date_built" and custom android fields return { "build_version": app_info.version, "build_number": (int(app_info.build) if app_info.build.isdigit() else None), @@ -263,14 +447,16 @@ def _upload_results( organization_id: str, ) -> None: """Upload analysis results to Sentry.""" - with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as af: - json.dump(results.to_dict(), af, indent=2) - analysis_file = af.name - - logger.info(f"Analysis results written to temporary file: {analysis_file}") + analysis_file = None try: + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as af: + json.dump(results.to_dict(), af, indent=2) + analysis_file = af.name + + logger.info(f"Analysis results written to temporary file: {analysis_file}") logger.info(f"Uploading analysis results for artifact {artifact_id}...") + upload_result = sentry_client.upload_size_analysis_file( org=organization_id, project=project_id, @@ -279,12 +465,40 @@ def _upload_results( ) if "error" in upload_result: - logger.error(f"Failed to upload analysis results: {upload_result['error']}") - else: - logger.info(f"Successfully uploaded analysis results for artifact {artifact_id}") + error_msg = f"Failed to upload analysis results: {upload_result['error']}" + logger.error(error_msg) + # Extract just the error details for the database, the enum prefix will be added automatically + detailed_error = upload_result["error"] + self._update_artifact_error( + sentry_client, + artifact_id, + project_id, + organization_id, + ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.UPLOAD_FAILED, + detailed_error, + ) + raise RuntimeError(error_msg) + + logger.info(f"Successfully uploaded analysis results for artifact {artifact_id}") + + except Exception as e: + if not isinstance(e, RuntimeError): + detailed_error = str(e) + self._update_artifact_error( + sentry_client, + artifact_id, + project_id, + organization_id, + ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.UPLOAD_FAILED, + detailed_error, + ) + raise finally: - self._safe_cleanup(analysis_file, "analysis file") + if analysis_file: + self._safe_cleanup(analysis_file, "analysis file") def _safe_cleanup(self, file_path: str, description: str) -> None: """Safely clean up a file with error handling.""" diff --git a/tests/test_integration.py b/tests/test_integration.py index 56f88a1f..a99371b0 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -54,8 +54,8 @@ async def test_kafka_message_processing(self): "sentry_base_url": "https://sentry.example.com", } - # Mock process_artifact_analysis to avoid actual processing - with patch.object(service, "process_artifact_analysis") as mock_process: + # Mock process_artifact to avoid actual processing + with patch.object(service, "process_artifact") as mock_process: # Test artifact analysis message with iOS artifact ios_payload: PreprodArtifactEvents = { "artifact_id": "ios-test-123", diff --git a/tests/test_service.py b/tests/test_service.py index d3b0a3d8..e9ffb2cb 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -61,7 +61,7 @@ async def test_ready_check(self): class TestLaunchpadService: """Test cases for LaunchpadService.""" - @patch.object(LaunchpadService, "process_artifact_analysis") + @patch.object(LaunchpadService, "process_artifact") def test_handle_kafka_message_ios(self, mock_process): """Test handling iOS artifact messages.""" service = LaunchpadService() @@ -79,14 +79,14 @@ def test_handle_kafka_message_ios(self, mock_process): # handle_kafka_message is synchronous service.handle_kafka_message(payload) - # Verify process_artifact_analysis was called with correct args + # Verify process_artifact was called with correct args mock_process.assert_called_once_with("ios-test-123", "test-project-ios", "test-org-123") # Verify metrics were recorded service._statsd.increment.assert_any_call("launchpad.artifact.processing.started") service._statsd.increment.assert_any_call("launchpad.artifact.processing.completed") - @patch.object(LaunchpadService, "process_artifact_analysis") + @patch.object(LaunchpadService, "process_artifact") def test_handle_kafka_message_android(self, mock_process): """Test handling Android artifact messages.""" service = LaunchpadService() @@ -104,14 +104,14 @@ def test_handle_kafka_message_android(self, mock_process): # handle_kafka_message is synchronous service.handle_kafka_message(payload) - # Verify process_artifact_analysis was called with correct args + # Verify process_artifact was called with correct args mock_process.assert_called_once_with("android-test-456", "test-project-android", "test-org-456") # Verify metrics were recorded service._statsd.increment.assert_any_call("launchpad.artifact.processing.started") service._statsd.increment.assert_any_call("launchpad.artifact.processing.completed") - @patch.object(LaunchpadService, "process_artifact_analysis") + @patch.object(LaunchpadService, "process_artifact") def test_handle_kafka_message_error(self, mock_process): """Test error handling in message processing.""" service = LaunchpadService() @@ -119,7 +119,7 @@ def test_handle_kafka_message_error(self, mock_process): # Mock statsd service._statsd = Mock() - # Make process_artifact_analysis raise an exception + # Make process_artifact raise an exception mock_process.side_effect = RuntimeError("Download failed: HTTP 404") # Create a valid payload @@ -132,7 +132,7 @@ def test_handle_kafka_message_error(self, mock_process): # This should not raise (simplified error handling catches all exceptions) service.handle_kafka_message(payload) - # Verify process_artifact_analysis was called + # Verify process_artifact was called mock_process.assert_called_once_with("test-123", "test-project", "test-org") # Verify the metrics were called correctly diff --git a/tests/unit/test_sentry_client_retry.py b/tests/unit/test_sentry_client_retry.py new file mode 100644 index 00000000..d95c7432 --- /dev/null +++ b/tests/unit/test_sentry_client_retry.py @@ -0,0 +1,188 @@ +"""Tests for retry logic in SentryClient.""" + +from unittest.mock import Mock, patch + +import requests + +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from launchpad.sentry_client import SentryClient, create_retry_session + + +class TestSentryClientRetry: + """Test retry logic in SentryClient.""" + + def setup_method(self): + """Set up test fixtures.""" + with patch.dict("os.environ", {"LAUNCHPAD_RPC_SHARED_SECRET": "test_secret"}): + self.client = SentryClient(base_url="https://test.sentry.io") + + def test_create_retry_session_configuration(self): + """Test that create_retry_session creates a session with correct retry configuration.""" + session = create_retry_session(max_retries=5) + + # Check that session has the right type + assert isinstance(session, requests.Session) + + # Check that the session has adapters mounted + assert "http://" in session.adapters + assert "https://" in session.adapters + + # Check that adapters are HTTPAdapter instances + http_adapter = session.adapters["http://"] + https_adapter = session.adapters["https://"] + assert isinstance(http_adapter, HTTPAdapter) + assert isinstance(https_adapter, HTTPAdapter) + + # Check that the retry strategy is configured + assert http_adapter.max_retries.total == 5 + assert https_adapter.max_retries.total == 5 + + def test_create_retry_session_default_retries(self): + """Test that create_retry_session uses default retry count.""" + session = create_retry_session() + + http_adapter = session.adapters["http://"] + assert http_adapter.max_retries.total == 3 + + def test_sentry_client_uses_retry_session(self): + """Test that SentryClient uses a retry session.""" + with patch.dict("os.environ", {"LAUNCHPAD_RPC_SHARED_SECRET": "test_secret"}): + client = SentryClient(base_url="https://test.sentry.io") + + # Check that the client has a session + assert hasattr(client, "session") + assert isinstance(client.session, requests.Session) + + # Check that the session has retry adapters + assert "http://" in client.session.adapters + assert "https://" in client.session.adapters + assert isinstance(client.session.adapters["http://"], HTTPAdapter) + + @patch("launchpad.sentry_client.requests.Session") + def test_download_artifact_with_retry_success_after_failure(self, mock_session_class): + """Test that download_artifact succeeds after retries via urllib3.""" + # Mock the session and its methods + mock_session = Mock() + mock_session_class.return_value = mock_session + + # First call fails, second succeeds + mock_response_success = Mock() + mock_response_success.status_code = 200 + mock_response_success.iter_content.return_value = [b"test content"] + mock_response_success.headers = {"content-length": "12"} + + # Configure the mock to succeed on the call + mock_session.get.return_value = mock_response_success + + # Create client with mocked session + with patch.dict("os.environ", {"LAUNCHPAD_RPC_SHARED_SECRET": "test_secret"}): + client = SentryClient(base_url="https://test.sentry.io") + client.session = mock_session + + result = client.download_artifact("test-org", "test-project", "test-artifact") + + assert result["success"] is True + assert result["file_content"] == b"test content" + assert mock_session.get.called + + @patch("launchpad.sentry_client.requests.Session") + def test_update_artifact_with_retry_session(self, mock_session_class): + """Test that update_artifact uses the retry session.""" + # Mock the session and its methods + mock_session = Mock() + mock_session_class.return_value = mock_session + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {"success": True} + mock_session.request.return_value = mock_response + + # Create client with mocked session + with patch.dict("os.environ", {"LAUNCHPAD_RPC_SHARED_SECRET": "test_secret"}): + client = SentryClient(base_url="https://test.sentry.io") + client.session = mock_session + + result = client.update_artifact("test-org", "test-project", "test-artifact", {"version": "1.0"}) + + assert result == {"success": True} + assert mock_session.request.called + + @patch("launchpad.sentry_client.requests.Session") + def test_upload_chunk_with_retry_session(self, mock_session_class): + """Test that _upload_chunk uses the retry session.""" + # Mock the session and its methods + mock_session = Mock() + mock_session_class.return_value = mock_session + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_session.post.return_value = mock_response + + # Create client with mocked session + with patch.dict("os.environ", {"LAUNCHPAD_RPC_SHARED_SECRET": "test_secret"}): + client = SentryClient(base_url="https://test.sentry.io") + client.session = mock_session + + chunk = {"checksum": "abcd1234", "data": b"test data", "size": 9} + + result = client._upload_chunk("test-org", chunk) + + assert result is True + assert mock_session.post.called + + def test_retry_strategy_configuration(self): + """Test that the retry strategy is configured correctly.""" + session = create_retry_session() + adapter = session.adapters["https://"] + retry_strategy = adapter.max_retries + + # Check retry configuration + assert isinstance(retry_strategy, Retry) + assert retry_strategy.total == 3 + assert retry_strategy.backoff_factor == 0.1 + assert retry_strategy.status_forcelist == [429, 500, 502, 503, 504] + assert retry_strategy.raise_on_status is False + + # Check allowed methods + expected_methods = ["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"] + assert retry_strategy.allowed_methods == expected_methods + + def test_retry_strategy_custom_max_retries(self): + """Test that custom max retries is applied correctly.""" + session = create_retry_session(max_retries=5) + adapter = session.adapters["https://"] + retry_strategy = adapter.max_retries + + assert retry_strategy.total == 5 + + @patch("launchpad.sentry_client.requests.Session") + def test_json_request_uses_session(self, mock_session_class): + """Test that _make_json_request uses the retry session.""" + # Mock the session and its methods + mock_session = Mock() + mock_session_class.return_value = mock_session + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {"result": "success"} + mock_session.request.return_value = mock_response + + # Create client with mocked session + with patch.dict("os.environ", {"LAUNCHPAD_RPC_SHARED_SECRET": "test_secret"}): + client = SentryClient(base_url="https://test.sentry.io") + client.session = mock_session + + result = client._make_json_request("POST", "/test", {"key": "value"}) + + assert result == {"result": "success"} + assert mock_session.request.called + + # Verify the call was made with correct parameters + call_args = mock_session.request.call_args + assert call_args[1]["method"] == "POST" + assert "test.sentry.io/test" in call_args[1]["url"] diff --git a/tests/unit/test_service_error_handling.py b/tests/unit/test_service_error_handling.py new file mode 100644 index 00000000..5ba41d56 --- /dev/null +++ b/tests/unit/test_service_error_handling.py @@ -0,0 +1,268 @@ +"""Tests for error handling and retry logic in LaunchpadService.""" + +from unittest.mock import Mock, patch + +import pytest + +from launchpad.constants import ( + ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ERROR_CODE_ARTIFACT_PROCESSING_TIMEOUT, + ERROR_CODE_UNKNOWN, + MAX_RETRY_ATTEMPTS, + OperationName, + ProcessingErrorMessage, +) +from launchpad.service import LaunchpadService + + +class TestLaunchpadServiceErrorHandling: + """Test error handling and retry logic in LaunchpadService.""" + + def setup_method(self): + """Set up test fixtures.""" + self.service = LaunchpadService() + self.service._service_config = { + "sentry_base_url": "https://test.sentry.io", + "statsd_host": "localhost", + "statsd_port": 8125, + } + self.service._statsd = Mock() + + def test_retry_operation_success_on_first_attempt(self): + """Test that _retry_operation succeeds on first attempt.""" + operation = Mock(return_value="success") + + result = self.service._retry_operation( + operation, + OperationName.PREPROCESSING, + ) + + assert result == "success" + assert operation.call_count == 1 + + def test_retry_operation_success_on_second_attempt(self): + """Test that _retry_operation succeeds on second attempt after one failure.""" + operation = Mock() + operation.side_effect = [RuntimeError("Temporary failure"), "success"] + + result = self.service._retry_operation( + operation, + OperationName.PREPROCESSING, + ) + + assert result == "success" + assert operation.call_count == 2 + + def test_retry_operation_fails_after_max_attempts(self): + """Test that _retry_operation fails after MAX_RETRY_ATTEMPTS.""" + operation = Mock() + operation.side_effect = RuntimeError("Persistent failure") + + with pytest.raises(RuntimeError, match="Failed to extract basic app information"): + self.service._retry_operation( + operation, + OperationName.PREPROCESSING, + ) + + assert operation.call_count == MAX_RETRY_ATTEMPTS + + def test_retry_operation_non_retryable_error(self): + """Test that _retry_operation doesn't retry non-retryable errors.""" + operation = Mock() + operation.side_effect = ValueError("Invalid input") + + with pytest.raises(RuntimeError, match="Failed to perform size analysis"): + self.service._retry_operation( + operation, + OperationName.SIZE_ANALYSIS, + ) + + assert operation.call_count == 1 # Should not retry + + def test_retry_operation_maps_operation_to_correct_error_message(self): + """Test that _retry_operation correctly maps operation names to error messages.""" + operation = Mock() + operation.side_effect = RuntimeError("Persistent failure") + + # Test PREPROCESSING maps to PREPROCESSING_FAILED + with pytest.raises(RuntimeError, match="Failed to extract basic app information"): + self.service._retry_operation(operation, OperationName.PREPROCESSING) + + # Test SIZE_ANALYSIS maps to SIZE_ANALYSIS_FAILED + with pytest.raises(RuntimeError, match="Failed to perform size analysis"): + self.service._retry_operation(operation, OperationName.SIZE_ANALYSIS) + + def test_is_non_retryable_error(self): + """Test that _is_non_retryable_error correctly identifies non-retryable errors.""" + # Non-retryable errors + assert self.service._is_non_retryable_error(ValueError("test")) + assert self.service._is_non_retryable_error(NotImplementedError("test")) + assert self.service._is_non_retryable_error(FileNotFoundError("test")) + + # Retryable errors + assert not self.service._is_non_retryable_error(RuntimeError("test")) + assert not self.service._is_non_retryable_error(ConnectionError("test")) + assert not self.service._is_non_retryable_error(Exception("test")) + + def test_categorize_processing_error(self): + """Test that processing errors are categorized correctly.""" + service = LaunchpadService() + + # Test ValueError -> ARTIFACT_PARSING_FAILED + error_code, error_message = service._categorize_processing_error(ValueError("Invalid format")) + assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_ERROR + assert error_message == ProcessingErrorMessage.ARTIFACT_PARSING_FAILED + + # Test NotImplementedError -> UNSUPPORTED_ARTIFACT_TYPE + error_code, error_message = service._categorize_processing_error(NotImplementedError("Not supported")) + assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_ERROR + assert error_message == ProcessingErrorMessage.UNSUPPORTED_ARTIFACT_TYPE + + # Test FileNotFoundError -> ARTIFACT_PARSING_FAILED + error_code, error_message = service._categorize_processing_error(FileNotFoundError("File not found")) + assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_ERROR + assert error_message == ProcessingErrorMessage.ARTIFACT_PARSING_FAILED + + # Test RuntimeError with timeout -> PROCESSING_TIMEOUT + error_code, error_message = service._categorize_processing_error(RuntimeError("Processing timeout occurred")) + assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_TIMEOUT + assert error_message == ProcessingErrorMessage.PROCESSING_TIMEOUT + + # Test RuntimeError with preprocessing keywords -> PREPROCESSING_FAILED + error_code, error_message = service._categorize_processing_error(RuntimeError("Preprocessing failed")) + assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_ERROR + assert error_message == ProcessingErrorMessage.PREPROCESSING_FAILED + + # Test RuntimeError with size keywords -> SIZE_ANALYSIS_FAILED + error_code, error_message = service._categorize_processing_error(RuntimeError("Size analysis failed")) + assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_ERROR + assert error_message == ProcessingErrorMessage.SIZE_ANALYSIS_FAILED + + # Test RuntimeError with unknown content -> UNKNOWN_ERROR + error_code, error_message = service._categorize_processing_error(RuntimeError("Something unknown happened")) + assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_ERROR + assert error_message == ProcessingErrorMessage.UNKNOWN_ERROR + + # Test generic exception -> UNKNOWN_ERROR + error_code, error_message = service._categorize_processing_error(Exception("Generic error")) + assert error_code == ERROR_CODE_UNKNOWN + assert error_message == ProcessingErrorMessage.UNKNOWN_ERROR + + @patch("launchpad.service.SentryClient") + def test_update_artifact_error_success(self, mock_sentry_client_class): + """Test that _update_artifact_error successfully updates artifact with error.""" + mock_sentry_client = Mock() + mock_sentry_client_class.return_value = mock_sentry_client + mock_sentry_client.update_artifact.return_value = {"success": True} + + self.service._update_artifact_error( + mock_sentry_client, + "test-artifact-id", + "test-project-id", + "test-org-id", + ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.PREPROCESSING_FAILED, + ) + + mock_sentry_client.update_artifact.assert_called_once_with( + org="test-org-id", + project="test-project-id", + artifact_id="test-artifact-id", + data={ + "error_code": ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + "error_message": ProcessingErrorMessage.PREPROCESSING_FAILED.value, + }, + ) + + @patch("launchpad.service.SentryClient") + def test_update_artifact_error_failure(self, mock_sentry_client_class): + """Test that _update_artifact_error handles update failures gracefully.""" + mock_sentry_client = Mock() + mock_sentry_client_class.return_value = mock_sentry_client + mock_sentry_client.update_artifact.return_value = {"error": "Update failed"} + + # Should not raise an exception + self.service._update_artifact_error( + mock_sentry_client, + "test-artifact-id", + "test-project-id", + "test-org-id", + ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.PREPROCESSING_FAILED, + ) + + mock_sentry_client.update_artifact.assert_called_once() + + @patch("launchpad.service.SentryClient") + def test_update_artifact_error_exception(self, mock_sentry_client_class): + """Test that _update_artifact_error handles exceptions gracefully.""" + mock_sentry_client = Mock() + mock_sentry_client_class.return_value = mock_sentry_client + mock_sentry_client.update_artifact.side_effect = Exception("Network error") + + # Should not raise an exception + self.service._update_artifact_error( + mock_sentry_client, + "test-artifact-id", + "test-project-id", + "test-org-id", + ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.PREPROCESSING_FAILED, + ) + + mock_sentry_client.update_artifact.assert_called_once() + + @patch("launchpad.service.SentryClient") + def test_update_artifact_error_with_detailed_message(self, mock_sentry_client): + """Test that _update_artifact_error uses detailed error message when provided.""" + service = LaunchpadService() + service._statsd = Mock() + + mock_client = Mock() + mock_client.update_artifact.return_value = {"success": True} + mock_sentry_client.return_value = mock_client + + detailed_error = "Failed to parse Info.plist: [Errno 2] No such file or directory" + + service._update_artifact_error( + mock_client, + "test_artifact_id", + "test_project_id", + "test_org_id", + ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.PREPROCESSING_FAILED, + detailed_error, + ) + + # Verify that the detailed error message is used instead of the enum value + expected_error_message = f"{ProcessingErrorMessage.PREPROCESSING_FAILED.value}: {detailed_error}" + mock_client.update_artifact.assert_called_once_with( + org="test_org_id", + project="test_project_id", + artifact_id="test_artifact_id", + data={"error_code": ERROR_CODE_ARTIFACT_PROCESSING_ERROR, "error_message": expected_error_message}, + ) + + # Verify datadog logging + service._statsd.increment.assert_called_once_with( + "launchpad.artifact.processing.error", + tags=[ + "error_code:3", + "error_type:PREPROCESSING_FAILED", + "project_id:test_project_id", + "organization_id:test_org_id", + ], + ) + + def test_processing_error_message_enum_values(self): + """Test that ProcessingErrorMessage enum has expected values.""" + # Test that all enum values are strings + for error_message in ProcessingErrorMessage: + assert isinstance(error_message.value, str) + assert len(error_message.value) > 0 + + # Test some specific values + assert ProcessingErrorMessage.DOWNLOAD_FAILED.value == "Failed to download artifact from Sentry" + assert ProcessingErrorMessage.PREPROCESSING_FAILED.value == "Failed to extract basic app information" + assert ProcessingErrorMessage.SIZE_ANALYSIS_FAILED.value == "Failed to perform size analysis" + assert ProcessingErrorMessage.UNKNOWN_ERROR.value == "An unknown error occurred" From b05d19422b3fdaf7d79de7172f957eae323be572 Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Wed, 9 Jul 2025 08:15:16 -0700 Subject: [PATCH 4/6] some minor changes, more to come --- src/launchpad/constants.py | 23 +++++++---- src/launchpad/sentry_client.py | 7 +++- src/launchpad/service.py | 50 +++++++++++------------ tests/unit/test_service_error_handling.py | 35 ++++++++-------- 4 files changed, 62 insertions(+), 53 deletions(-) diff --git a/src/launchpad/constants.py b/src/launchpad/constants.py index ecb5a31a..144cf2f7 100644 --- a/src/launchpad/constants.py +++ b/src/launchpad/constants.py @@ -5,16 +5,25 @@ # Kafka topic names PREPROD_ARTIFACT_EVENTS_TOPIC = "preprod-artifact-events" + # Error code constants (matching the Django model) -ERROR_CODE_UNKNOWN = 0 -ERROR_CODE_UPLOAD_TIMEOUT = 1 -ERROR_CODE_ARTIFACT_PROCESSING_TIMEOUT = 2 -ERROR_CODE_ARTIFACT_PROCESSING_ERROR = 3 +class ProcessingErrorCode(Enum): + """Error codes for artifact processing (matching the Django model).""" + + UNKNOWN = 0 + UPLOAD_TIMEOUT = 1 + ARTIFACT_PROCESSING_TIMEOUT = 2 + ARTIFACT_PROCESSING_ERROR = 3 + # Artifact type constants -ARTIFACT_TYPE_XCARCHIVE = 0 -ARTIFACT_TYPE_AAB = 1 -ARTIFACT_TYPE_APK = 2 +class ArtifactType(Enum): + """Artifact types for different platforms and formats.""" + + XCARCHIVE = 0 + AAB = 1 + APK = 2 + # Retry configuration MAX_RETRY_ATTEMPTS = 3 diff --git a/src/launchpad/sentry_client.py b/src/launchpad/sentry_client.py index 7610de53..ed358163 100644 --- a/src/launchpad/sentry_client.py +++ b/src/launchpad/sentry_client.py @@ -68,8 +68,11 @@ def download_artifact(self, org: str, project: str, artifact_id: str) -> Dict[st if chunk: content += chunk if len(content) > 5 * 1024 * 1024 * 1024: # 5GB limit - logger.warning("Download truncated at 5GB") - break + logger.error("Download exceeds 5GB limit") + return { + "error": "File size exceeds 5GB limit", + "status_code": 413, # Payload Too Large + } return { "success": True, diff --git a/src/launchpad/service.py b/src/launchpad/service.py index 2bbd4905..f59255ce 100644 --- a/src/launchpad/service.py +++ b/src/launchpad/service.py @@ -22,16 +22,12 @@ from launchpad.artifacts.android.zipped_aab import ZippedAAB from launchpad.artifacts.artifact_factory import ArtifactFactory from launchpad.constants import ( - ARTIFACT_TYPE_AAB, - ARTIFACT_TYPE_APK, - ARTIFACT_TYPE_XCARCHIVE, - ERROR_CODE_ARTIFACT_PROCESSING_ERROR, - ERROR_CODE_ARTIFACT_PROCESSING_TIMEOUT, - ERROR_CODE_UNKNOWN, HEALTHCHECK_MAX_AGE_SECONDS, MAX_RETRY_ATTEMPTS, OPERATION_ERRORS, + ArtifactType, OperationName, + ProcessingErrorCode, ProcessingErrorMessage, ) from launchpad.sentry_client import SentryClient, categorize_http_error @@ -185,7 +181,7 @@ def process_artifact(self, artifact_id: str, project_id: str, organization_id: s artifact_id, project_id, organization_id, - ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.UPDATE_FAILED, detailed_error, ) @@ -248,26 +244,26 @@ def _is_non_retryable_error(self, exception: Exception) -> bool: """Determine if an error should not be retried.""" return isinstance(exception, (ValueError, NotImplementedError, FileNotFoundError)) - def _categorize_processing_error(self, exception: Exception) -> tuple[int, ProcessingErrorMessage]: + def _categorize_processing_error(self, exception: Exception) -> tuple[ProcessingErrorCode, ProcessingErrorMessage]: """Categorize an exception into error code and message.""" if isinstance(exception, ValueError): - return ERROR_CODE_ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.ARTIFACT_PARSING_FAILED + return ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.ARTIFACT_PARSING_FAILED elif isinstance(exception, NotImplementedError): - return ERROR_CODE_ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.UNSUPPORTED_ARTIFACT_TYPE + return ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.UNSUPPORTED_ARTIFACT_TYPE elif isinstance(exception, FileNotFoundError): - return ERROR_CODE_ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.ARTIFACT_PARSING_FAILED + return ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.ARTIFACT_PARSING_FAILED elif isinstance(exception, RuntimeError): error_str = str(exception).lower() if "timeout" in error_str: - return ERROR_CODE_ARTIFACT_PROCESSING_TIMEOUT, ProcessingErrorMessage.PROCESSING_TIMEOUT + return ProcessingErrorCode.ARTIFACT_PROCESSING_TIMEOUT, ProcessingErrorMessage.PROCESSING_TIMEOUT elif "preprocess" in error_str: - return ERROR_CODE_ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.PREPROCESSING_FAILED + return ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.PREPROCESSING_FAILED elif "size" in error_str or "analysis" in error_str: - return ERROR_CODE_ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.SIZE_ANALYSIS_FAILED + return ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.SIZE_ANALYSIS_FAILED else: - return ERROR_CODE_ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.UNKNOWN_ERROR + return ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.UNKNOWN_ERROR else: - return ERROR_CODE_UNKNOWN, ProcessingErrorMessage.UNKNOWN_ERROR + return ProcessingErrorCode.UNKNOWN, ProcessingErrorMessage.UNKNOWN_ERROR def _update_artifact_error( self, @@ -275,13 +271,13 @@ def _update_artifact_error( artifact_id: str, project_id: str, organization_id: str, - error_code: int, + error_code: ProcessingErrorCode, error_message: ProcessingErrorMessage, detailed_error: str | None = None, ) -> None: """Update artifact with error information.""" try: - logger.info(f"Updating artifact {artifact_id} with error code {error_code}") + logger.info(f"Updating artifact {artifact_id} with error code {error_code.value}") # Use detailed error message if provided, otherwise use enum value final_error_message = f"{error_message.value}: {detailed_error}" if detailed_error else error_message.value @@ -291,7 +287,7 @@ def _update_artifact_error( self._statsd.increment( "launchpad.artifact.processing.error", tags=[ - f"error_code:{error_code}", + f"error_code:{error_code.value}", f"error_type:{error_message.name}", f"project_id:{project_id}", f"organization_id:{organization_id}", @@ -302,7 +298,7 @@ def _update_artifact_error( org=organization_id, project=project_id, artifact_id=artifact_id, - data={"error_code": error_code, "error_message": final_error_message}, + data={"error_code": error_code.value, "error_message": final_error_message}, ) if "error" in result: @@ -325,7 +321,7 @@ def _handle_download_error( artifact_id, project_id, organization_id, - ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.DOWNLOAD_FAILED, detailed_error, ) @@ -379,7 +375,7 @@ def _download_artifact( artifact_id, project_id, organization_id, - ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.DOWNLOAD_FAILED, detailed_error, ) @@ -407,7 +403,7 @@ def _prepare_update_data(self, app_info: AppleAppInfo | AndroidAppInfo, artifact return { "build_version": app_info.version, "build_number": (int(app_info.build) if str(app_info.build).isdigit() else app_info.build), - "artifact_type": ARTIFACT_TYPE_XCARCHIVE, + "artifact_type": ArtifactType.XCARCHIVE.value, "apple_app_info": { "is_simulator": app_info.is_simulator, "codesigning_type": app_info.codesigning_type, @@ -417,12 +413,12 @@ def _prepare_update_data(self, app_info: AppleAppInfo | AndroidAppInfo, artifact }, } elif isinstance(app_info, AndroidAppInfo): - artifact_type = ARTIFACT_TYPE_AAB if isinstance(artifact, (AAB, ZippedAAB)) else ARTIFACT_TYPE_APK + artifact_type = ArtifactType.AAB if isinstance(artifact, (AAB, ZippedAAB)) else ArtifactType.APK # TODO: add "date_built" and custom android fields return { "build_version": app_info.version, "build_number": (int(app_info.build) if app_info.build.isdigit() else None), - "artifact_type": artifact_type, + "artifact_type": artifact_type.value, } else: raise ValueError(f"Unsupported app_info type: {type(app_info)}") @@ -474,7 +470,7 @@ def _upload_results( artifact_id, project_id, organization_id, - ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.UPLOAD_FAILED, detailed_error, ) @@ -490,7 +486,7 @@ def _upload_results( artifact_id, project_id, organization_id, - ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.UPLOAD_FAILED, detailed_error, ) diff --git a/tests/unit/test_service_error_handling.py b/tests/unit/test_service_error_handling.py index 5ba41d56..53865692 100644 --- a/tests/unit/test_service_error_handling.py +++ b/tests/unit/test_service_error_handling.py @@ -5,11 +5,9 @@ import pytest from launchpad.constants import ( - ERROR_CODE_ARTIFACT_PROCESSING_ERROR, - ERROR_CODE_ARTIFACT_PROCESSING_TIMEOUT, - ERROR_CODE_UNKNOWN, MAX_RETRY_ATTEMPTS, OperationName, + ProcessingErrorCode, ProcessingErrorMessage, ) from launchpad.service import LaunchpadService @@ -110,42 +108,42 @@ def test_categorize_processing_error(self): # Test ValueError -> ARTIFACT_PARSING_FAILED error_code, error_message = service._categorize_processing_error(ValueError("Invalid format")) - assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_ERROR + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR assert error_message == ProcessingErrorMessage.ARTIFACT_PARSING_FAILED # Test NotImplementedError -> UNSUPPORTED_ARTIFACT_TYPE error_code, error_message = service._categorize_processing_error(NotImplementedError("Not supported")) - assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_ERROR + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR assert error_message == ProcessingErrorMessage.UNSUPPORTED_ARTIFACT_TYPE # Test FileNotFoundError -> ARTIFACT_PARSING_FAILED error_code, error_message = service._categorize_processing_error(FileNotFoundError("File not found")) - assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_ERROR + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR assert error_message == ProcessingErrorMessage.ARTIFACT_PARSING_FAILED # Test RuntimeError with timeout -> PROCESSING_TIMEOUT error_code, error_message = service._categorize_processing_error(RuntimeError("Processing timeout occurred")) - assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_TIMEOUT + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_TIMEOUT assert error_message == ProcessingErrorMessage.PROCESSING_TIMEOUT # Test RuntimeError with preprocessing keywords -> PREPROCESSING_FAILED error_code, error_message = service._categorize_processing_error(RuntimeError("Preprocessing failed")) - assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_ERROR + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR assert error_message == ProcessingErrorMessage.PREPROCESSING_FAILED # Test RuntimeError with size keywords -> SIZE_ANALYSIS_FAILED error_code, error_message = service._categorize_processing_error(RuntimeError("Size analysis failed")) - assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_ERROR + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR assert error_message == ProcessingErrorMessage.SIZE_ANALYSIS_FAILED # Test RuntimeError with unknown content -> UNKNOWN_ERROR error_code, error_message = service._categorize_processing_error(RuntimeError("Something unknown happened")) - assert error_code == ERROR_CODE_ARTIFACT_PROCESSING_ERROR + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR assert error_message == ProcessingErrorMessage.UNKNOWN_ERROR # Test generic exception -> UNKNOWN_ERROR error_code, error_message = service._categorize_processing_error(Exception("Generic error")) - assert error_code == ERROR_CODE_UNKNOWN + assert error_code == ProcessingErrorCode.UNKNOWN assert error_message == ProcessingErrorMessage.UNKNOWN_ERROR @patch("launchpad.service.SentryClient") @@ -160,7 +158,7 @@ def test_update_artifact_error_success(self, mock_sentry_client_class): "test-artifact-id", "test-project-id", "test-org-id", - ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.PREPROCESSING_FAILED, ) @@ -169,7 +167,7 @@ def test_update_artifact_error_success(self, mock_sentry_client_class): project="test-project-id", artifact_id="test-artifact-id", data={ - "error_code": ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + "error_code": ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR.value, "error_message": ProcessingErrorMessage.PREPROCESSING_FAILED.value, }, ) @@ -187,7 +185,7 @@ def test_update_artifact_error_failure(self, mock_sentry_client_class): "test-artifact-id", "test-project-id", "test-org-id", - ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.PREPROCESSING_FAILED, ) @@ -206,7 +204,7 @@ def test_update_artifact_error_exception(self, mock_sentry_client_class): "test-artifact-id", "test-project-id", "test-org-id", - ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.PREPROCESSING_FAILED, ) @@ -229,7 +227,7 @@ def test_update_artifact_error_with_detailed_message(self, mock_sentry_client): "test_artifact_id", "test_project_id", "test_org_id", - ERROR_CODE_ARTIFACT_PROCESSING_ERROR, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.PREPROCESSING_FAILED, detailed_error, ) @@ -240,7 +238,10 @@ def test_update_artifact_error_with_detailed_message(self, mock_sentry_client): org="test_org_id", project="test_project_id", artifact_id="test_artifact_id", - data={"error_code": ERROR_CODE_ARTIFACT_PROCESSING_ERROR, "error_message": expected_error_message}, + data={ + "error_code": ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR.value, + "error_message": expected_error_message, + }, ) # Verify datadog logging From 7246703e6f8f02bd5358f2ee1ce2b23c7fba6277 Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Fri, 11 Jul 2025 12:54:07 -0700 Subject: [PATCH 5/6] feedback --- scripts/test_download_artifact.py | 92 ------------- src/launchpad/sentry_client.py | 174 +++++++++++++++++-------- src/launchpad/service.py | 123 ++++++----------- tests/unit/test_sentry_client_retry.py | 25 ++-- 4 files changed, 168 insertions(+), 246 deletions(-) delete mode 100755 scripts/test_download_artifact.py diff --git a/scripts/test_download_artifact.py b/scripts/test_download_artifact.py deleted file mode 100755 index 04ee51b5..00000000 --- a/scripts/test_download_artifact.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python3 -"""Script to test downloading artifacts from Sentry using the internal endpoint.""" - -import json -import logging -import os -import sys -import time - -import click - -sys.path.insert(0, "src") -from launchpad.sentry_client import SentryClient - - -@click.command() -@click.option( - "--base-url", default="http://localhost:8000", help="Base URL for Sentry API" -) -@click.option("--org", default="sentry", help="Organization slug") -@click.option("--project", default="internal", help="Project slug") -@click.option("--artifact-id", default="1", help="Artifact ID to download") -@click.option("--verbose", is_flag=True, help="Enable verbose logging") -def main( - base_url: str, org: str, project: str, artifact_id: str, verbose: bool -) -> None: - """Test downloading artifacts from Sentry using the internal endpoint.""" - - logging.basicConfig( - level=logging.DEBUG if verbose else logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - ) - - try: - click.echo(f"Testing download: {org}/{project}/artifacts/{artifact_id}") - - client = SentryClient(base_url=base_url) - start_time = time.time() - - response = client.download_artifact(org, project, artifact_id) - duration = time.time() - start_time - - if "error" in response: - click.echo( - f"❌ Failed: {response['error']} (Status: {response.get('status_code', 'Unknown')})" - ) - sys.exit(1) - - file_content = response.get("file_content", b"") - file_size = len(file_content) - - if not file_content: - click.echo("❌ No file content received") - sys.exit(1) - - # Save file to disk - timestamp = int(time.time()) - file_ext = ".zip" if file_content.startswith(b"PK") else ".bin" - filename = ( - f"preprod_artifact_{org}_{project}_{artifact_id}_{timestamp}{file_ext}" - ) - file_path = os.path.join(os.getcwd(), filename) - - with open(file_path, "wb") as f: - f.write(file_content) - - # Verify and report - disk_size = os.path.getsize(file_path) - integrity_ok = file_size == disk_size - - click.echo(f"✅ Downloaded {file_size:,} bytes in {duration:.2f}s") - click.echo(f"💾 Saved to: {file_path}") - click.echo( - f"{'✅' if integrity_ok else '⚠️ '} File integrity: {'OK' if integrity_ok else 'MISMATCH'}" - ) - - if verbose: - click.echo( - f"📄 Headers: {json.dumps(response.get('headers', {}), indent=2)}" - ) - - except Exception as e: - click.echo(f"❌ Error: {e}") - if verbose: - import traceback - - click.echo(traceback.format_exc()) - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/src/launchpad/sentry_client.py b/src/launchpad/sentry_client.py index ed358163..693f1986 100644 --- a/src/launchpad/sentry_client.py +++ b/src/launchpad/sentry_client.py @@ -11,7 +11,7 @@ import secrets from pathlib import Path -from typing import Any, Dict +from typing import Any, Dict, NamedTuple import requests @@ -21,6 +21,30 @@ logger = logging.getLogger(__name__) +class DownloadResult(NamedTuple): + """Result of artifact download operation.""" + + success: bool + file_content: bytes + file_size_bytes: int + headers: dict[str, str] + + +class ErrorResult(NamedTuple): + """Result when an operation fails.""" + + error: str + status_code: int + + +class UploadResult(NamedTuple): + """Result of upload operation.""" + + success: bool + state: str | None = None + message: str | None = None + + def create_retry_session(max_retries: int = 3) -> requests.Session: """Create a requests session with retry configuration.""" session = requests.Session() @@ -51,8 +75,18 @@ def __init__(self, base_url: str) -> None: self.session = create_retry_session() - def download_artifact(self, org: str, project: str, artifact_id: str) -> Dict[str, Any]: - """Download preprod artifact.""" + def download_artifact_to_file(self, org: str, project: str, artifact_id: str, out) -> int | ErrorResult: + """Download preprod artifact directly to a file-like object. + + Args: + org: Organization slug + project: Project slug + artifact_id: Artifact ID + out: File-like object to write to (must support write() method) + + Returns: + Number of bytes written on success, or ErrorResult on failure + """ endpoint = f"/api/0/internal/{org}/{project}/files/preprodartifacts/{artifact_id}/" url = self._build_url(endpoint) @@ -62,26 +96,32 @@ def download_artifact(self, org: str, project: str, artifact_id: str) -> Dict[st if response.status_code != 200: return self._handle_error_response(response, "Download") - # Read content with size limit - content = b"" - for chunk in response.iter_content(chunk_size=8192): - if chunk: - content += chunk - if len(content) > 5 * 1024 * 1024 * 1024: # 5GB limit - logger.error("Download exceeds 5GB limit") - return { - "error": "File size exceeds 5GB limit", - "status_code": 413, # Payload Too Large - } + # Stream directly to the file-like object + file_size = 0 + try: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + out.write(chunk) + file_size += len(chunk) + if file_size > 5 * 1024 * 1024 * 1024: # 5GB limit + logger.error("Download exceeds 5GB limit") + return ErrorResult( + error="File size exceeds 5GB limit", + status_code=413, # Payload Too Large + ) + + return file_size - return { - "success": True, - "file_content": content, - "file_size_bytes": len(content), - "headers": dict(response.headers), - } + except Exception as e: + logger.error(f"Failed to write to file-like object: {e}") + return ErrorResult( + error=f"Failed to write to file-like object: {e}", + status_code=500, + ) - def update_artifact(self, org: str, project: str, artifact_id: str, data: Dict[str, Any]) -> Dict[str, Any]: + def update_artifact( + self, org: str, project: str, artifact_id: str, data: Dict[str, Any] + ) -> Dict[str, Any] | ErrorResult: """Update preprod artifact.""" endpoint = f"/api/0/internal/{org}/{project}/files/preprodartifacts/{artifact_id}/update/" return self._make_json_request("PUT", endpoint, data, operation="Update") @@ -93,7 +133,7 @@ def upload_size_analysis_file( artifact_id: str, file_path: str, max_retries: int = 3, - ) -> Dict[str, Any]: + ) -> Dict[str, Any] | ErrorResult: """Upload size analysis file with chunking following Rust sentry-cli pattern.""" # Basic path validation path = Path(file_path).resolve() @@ -110,8 +150,11 @@ def upload_size_analysis_file( # Step 1: Get chunk upload options from server logger.debug("Getting chunk upload options...") options_result = self._get_chunk_upload_options(org) - if "error" in options_result: - return {"error": f"Failed to get chunk upload options: {options_result['error']}"} + if isinstance(options_result, ErrorResult): + return ErrorResult( + error=f"Failed to get chunk upload options: {options_result.error}", + status_code=options_result.status_code, + ) chunk_options = options_result.get("chunking", {}) chunk_size = chunk_options.get("chunk_size", 8 * 1024 * 1024) # fallback to 8MB @@ -142,6 +185,13 @@ def upload_size_analysis_file( chunks=chunk_checksums, ) + # Handle ErrorResult from _assemble_size_analysis + if isinstance(result, ErrorResult): + logger.warning(f"Assembly attempt {attempt + 1} failed: {result}") + if attempt == max_retries - 1: # Last attempt + return result + continue + state = result.get("state") if state in ["ok", "created"]: logger.info("Upload and assembly successful") @@ -160,7 +210,7 @@ def upload_size_analysis_file( if attempt == max_retries - 1: # Last attempt return result - return {"error": f"Failed after {max_retries} attempts"} + return ErrorResult(error=f"Failed after {max_retries} attempts", status_code=500) def _get_auth_headers(self, body: bytes | None = None) -> Dict[str, str]: """Get authentication headers for a request.""" @@ -175,13 +225,13 @@ def _build_url(self, endpoint: str) -> str: """Build full URL from endpoint.""" return f"{self.base_url}{endpoint}" - def _handle_error_response(self, response: requests.Response, operation: str) -> Dict[str, Any]: + def _handle_error_response(self, response: requests.Response, operation: str) -> ErrorResult: """Handle non-200 response with consistent error format.""" logger.warning(f"{operation} failed: {response.status_code}") - return { - "error": f"HTTP {response.status_code}", - "status_code": response.status_code, - } + return ErrorResult( + error=f"HTTP {response.status_code}", + status_code=response.status_code, + ) def _make_json_request( self, @@ -190,7 +240,7 @@ def _make_json_request( data: Dict[str, Any] | None = None, timeout: int = 30, operation: str | None = None, - ) -> Dict[str, Any]: + ) -> Dict[str, Any] | ErrorResult: """Make a JSON request with standard error handling.""" url = self._build_url(endpoint) body = json.dumps(data).encode("utf-8") if data else b"" @@ -210,7 +260,7 @@ def _make_json_request( return response.json() - def _get_chunk_upload_options(self, org: str) -> Dict[str, Any]: + def _get_chunk_upload_options(self, org: str) -> Dict[str, Any] | ErrorResult: """Get chunk upload configuration from server.""" endpoint = f"/api/0/organizations/{org}/chunk-upload/" return self._make_json_request("GET", endpoint, operation="Get chunk options") @@ -290,7 +340,7 @@ def _assemble_size_analysis( artifact_id: str | int, checksum: str, chunks: list[str], - ) -> Dict[str, Any]: + ) -> Dict[str, Any] | ErrorResult: """Call the assemble size analysis endpoint.""" # Validate hex strings if not re.match(r"^[a-fA-F0-9]+$", checksum): @@ -327,7 +377,7 @@ def _create_multipart_body(self, boundary: str, filename: str, data: bytes) -> b return b"".join(parts) -def categorize_http_error(error_result: Dict[str, Any]) -> tuple[str, str]: +def categorize_http_error(error_result: ErrorResult | Dict[str, Any]) -> tuple[str, str]: """ Categorize HTTP error results from SentryClient. @@ -335,9 +385,9 @@ def categorize_http_error(error_result: Dict[str, Any]) -> tuple[str, str]: Tuple of (error_category, error_description) Categories: "not_found", "server_error", "client_error", "unknown" """ - # First try to get the structured status code - status_code = error_result.get("status_code") - if isinstance(status_code, int): + # Handle ErrorResult NamedTuple + if isinstance(error_result, ErrorResult): + status_code = error_result.status_code if status_code == 404: return "not_found", f"Resource not found (HTTP {status_code})" elif 500 <= status_code < 600: @@ -347,23 +397,37 @@ def categorize_http_error(error_result: Dict[str, Any]) -> tuple[str, str]: else: return "unknown", f"Unexpected HTTP status {status_code}" - # Fallback to parsing the error message string - error_msg = error_result.get("error", "") - if isinstance(error_msg, str): - # Extract HTTP status code from error message like "HTTP 404" - match = re.search(r"HTTP (\d+)", error_msg) - if match: - try: - status_code = int(match.group(1)) - if status_code == 404: - return "not_found", f"Resource not found (HTTP {status_code})" - elif 500 <= status_code < 600: - return "server_error", f"Server error (HTTP {status_code})" - elif 400 <= status_code < 500: - return "client_error", f"Client error (HTTP {status_code})" - else: - return "unknown", f"Unexpected HTTP status {status_code}" - except ValueError: - pass + # Handle legacy dict format (for backward compatibility) + if isinstance(error_result, dict): + # First try to get the structured status code + status_code = error_result.get("status_code") + if isinstance(status_code, int): + if status_code == 404: + return "not_found", f"Resource not found (HTTP {status_code})" + elif 500 <= status_code < 600: + return "server_error", f"Server error (HTTP {status_code})" + elif 400 <= status_code < 500: + return "client_error", f"Client error (HTTP {status_code})" + else: + return "unknown", f"Unexpected HTTP status {status_code}" + + # Fallback to parsing the error message string + error_msg = error_result.get("error", "") + if isinstance(error_msg, str): + # Extract HTTP status code from error message like "HTTP 404" + match = re.search(r"HTTP (\d+)", error_msg) + if match: + try: + status_code = int(match.group(1)) + if status_code == 404: + return "not_found", f"Resource not found (HTTP {status_code})" + elif 500 <= status_code < 600: + return "server_error", f"Server error (HTTP {status_code})" + elif 400 <= status_code < 500: + return "client_error", f"Client error (HTTP {status_code})" + else: + return "unknown", f"Unexpected HTTP status {status_code}" + except ValueError: + pass return "unknown", f"Unknown error: {error_result}" diff --git a/src/launchpad/service.py b/src/launchpad/service.py index f59255ce..8b5dd8bf 100644 --- a/src/launchpad/service.py +++ b/src/launchpad/service.py @@ -30,7 +30,7 @@ ProcessingErrorCode, ProcessingErrorMessage, ) -from launchpad.sentry_client import SentryClient, categorize_http_error +from launchpad.sentry_client import ErrorResult, SentryClient, categorize_http_error from launchpad.size.analyzers.android import AndroidAnalyzer from launchpad.size.analyzers.apple import AppleAppAnalyzer from launchpad.size.models.android import AndroidAppInfo @@ -151,8 +151,7 @@ def process_artifact(self, artifact_id: str, project_id: str, organization_id: s temp_file = None try: - file_content, _ = self._download_artifact(sentry_client, artifact_id, project_id, organization_id) - temp_file = self._save_to_temp_file(file_content, artifact_id) + temp_file = self._download_artifact_to_temp_file(sentry_client, artifact_id, project_id, organization_id) artifact = ArtifactFactory.from_path(Path(temp_file)) logger.info(f"Running preprocessing on {temp_file}...") @@ -301,75 +300,40 @@ def _update_artifact_error( data={"error_code": error_code.value, "error_message": final_error_message}, ) - if "error" in result: - logger.error(f"Failed to update artifact with error: {result['error']}") + if isinstance(result, ErrorResult): + logger.error(f"Failed to update artifact with error: {result.error}") else: logger.info(f"Successfully updated artifact {artifact_id} with error information") except Exception as e: logger.error(f"Failed to update artifact {artifact_id} with error information: {e}", exc_info=True) - def _handle_download_error( - self, sentry_client: SentryClient, artifact_id: str, project_id: str, organization_id: str, error_msg: str - ) -> None: - """Handle download error by logging and updating artifact.""" - logger.error(error_msg) - # Extract just the error details for the database, the enum prefix will be added automatically - detailed_error = error_msg.split(": ", 1)[1] if ": " in error_msg else error_msg - self._update_artifact_error( - sentry_client, - artifact_id, - project_id, - organization_id, - ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, - ProcessingErrorMessage.DOWNLOAD_FAILED, - detailed_error, - ) - raise RuntimeError(error_msg) - - def _download_artifact( + def _download_artifact_to_temp_file( self, sentry_client: SentryClient, artifact_id: str, project_id: str, organization_id: str, - ) -> tuple[bytes, int]: - """Download artifact from Sentry and validate response.""" + ) -> str: + """Download artifact from Sentry directly to a temporary file.""" logger.info(f"Downloading artifact {artifact_id}...") - try: - download_result = sentry_client.download_artifact( - org=organization_id, project=project_id, artifact_id=artifact_id - ) - - if "error" in download_result: - error_category, error_description = categorize_http_error(download_result) - self._handle_download_error( - sentry_client, - artifact_id, - project_id, - organization_id, - f"Failed to download artifact ({error_category}): {error_description}", - ) - - if not download_result.get("success"): - self._handle_download_error( - sentry_client, - artifact_id, - project_id, - organization_id, - f"Download was not successful: {download_result}", - ) + # Create temporary file first + with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tf: + temp_file = tf.name - file_content = download_result["file_content"] - file_size = download_result["file_size_bytes"] + # Download directly to the temporary file + file_size = sentry_client.download_artifact_to_file( + org=organization_id, project=project_id, artifact_id=artifact_id, out=tf + ) - logger.info(f"Downloaded artifact {artifact_id}: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)") - return file_content, file_size + # Handle ErrorResult + if isinstance(file_size, ErrorResult): + error_category, error_description = categorize_http_error(file_size) + error_msg = f"Failed to download artifact ({error_category}): {error_description}" + logger.error(error_msg) - except Exception as e: - if not isinstance(e, RuntimeError): - detailed_error = str(e) + # Update artifact with error info self._update_artifact_error( sentry_client, artifact_id, @@ -377,25 +341,14 @@ def _download_artifact( organization_id, ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.DOWNLOAD_FAILED, - detailed_error, + error_description, ) - raise - - def _save_to_temp_file(self, file_content: bytes, artifact_id: str) -> str: - """Save file content to temporary file and return path.""" - try: - with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tf: - tf.write(file_content) - tf.flush() - temp_file = tf.name + raise RuntimeError(error_msg) + logger.info(f"Downloaded artifact {artifact_id}: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)") logger.info(f"Saved artifact to temporary file: {temp_file}") return temp_file - except Exception as e: - logger.error(f"Failed to save artifact to temporary file: {e}") - raise RuntimeError(f"{ProcessingErrorMessage.TEMP_FILE_CREATION_FAILED.value}: {str(e)}") from e - def _prepare_update_data(self, app_info: AppleAppInfo | AndroidAppInfo, artifact: Any) -> Dict[str, Any]: """Prepare update data based on app platform and artifact type.""" if isinstance(app_info, AppleAppInfo): @@ -460,11 +413,11 @@ def _upload_results( file_path=analysis_file, ) - if "error" in upload_result: - error_msg = f"Failed to upload analysis results: {upload_result['error']}" + if isinstance(upload_result, ErrorResult): + error_msg = f"Failed to upload analysis results: {upload_result.error}" logger.error(error_msg) # Extract just the error details for the database, the enum prefix will be added automatically - detailed_error = upload_result["error"] + detailed_error = upload_result.error self._update_artifact_error( sentry_client, artifact_id, @@ -478,19 +431,17 @@ def _upload_results( logger.info(f"Successfully uploaded analysis results for artifact {artifact_id}") - except Exception as e: - if not isinstance(e, RuntimeError): - detailed_error = str(e) - self._update_artifact_error( - sentry_client, - artifact_id, - project_id, - organization_id, - ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, - ProcessingErrorMessage.UPLOAD_FAILED, - detailed_error, - ) - raise + except RuntimeError as e: + detailed_error = str(e) + self._update_artifact_error( + sentry_client, + artifact_id, + project_id, + organization_id, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.UPLOAD_FAILED, + detailed_error, + ) finally: if analysis_file: diff --git a/tests/unit/test_sentry_client_retry.py b/tests/unit/test_sentry_client_retry.py index d95c7432..55c11486 100644 --- a/tests/unit/test_sentry_client_retry.py +++ b/tests/unit/test_sentry_client_retry.py @@ -61,31 +61,30 @@ def test_sentry_client_uses_retry_session(self): assert isinstance(client.session.adapters["http://"], HTTPAdapter) @patch("launchpad.sentry_client.requests.Session") - def test_download_artifact_with_retry_success_after_failure(self, mock_session_class): - """Test that download_artifact succeeds after retries via urllib3.""" + def test_download_artifact_to_file_with_retry_session(self, mock_session_class): + """Test that download_artifact_to_file uses the retry session.""" # Mock the session and its methods mock_session = Mock() mock_session_class.return_value = mock_session - # First call fails, second succeeds - mock_response_success = Mock() - mock_response_success.status_code = 200 - mock_response_success.iter_content.return_value = [b"test content"] - mock_response_success.headers = {"content-length": "12"} - - # Configure the mock to succeed on the call - mock_session.get.return_value = mock_response_success + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.iter_content.return_value = [b"test content"] + mock_session.get.return_value = mock_response # Create client with mocked session with patch.dict("os.environ", {"LAUNCHPAD_RPC_SHARED_SECRET": "test_secret"}): client = SentryClient(base_url="https://test.sentry.io") client.session = mock_session - result = client.download_artifact("test-org", "test-project", "test-artifact") + # Mock file object + mock_file = Mock() + result = client.download_artifact_to_file("test-org", "test-project", "test-artifact", mock_file) - assert result["success"] is True - assert result["file_content"] == b"test content" + assert result == 12 # Length of "test content" assert mock_session.get.called + mock_file.write.assert_called_with(b"test content") @patch("launchpad.sentry_client.requests.Session") def test_update_artifact_with_retry_session(self, mock_session_class): From f06ca0ce6455d49594e63219de0d2681464b3d96 Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Fri, 11 Jul 2025 14:46:00 -0700 Subject: [PATCH 6/6] cleanup --- src/launchpad/sentry_client.py | 46 ++++++++-------- src/launchpad/service.py | 96 ++++++++++++++++++---------------- 2 files changed, 71 insertions(+), 71 deletions(-) diff --git a/src/launchpad/sentry_client.py b/src/launchpad/sentry_client.py index 693f1986..92608c75 100644 --- a/src/launchpad/sentry_client.py +++ b/src/launchpad/sentry_client.py @@ -11,7 +11,7 @@ import secrets from pathlib import Path -from typing import Any, Dict, NamedTuple +from typing import Any, Dict, NamedTuple, cast import requests @@ -75,7 +75,7 @@ def __init__(self, base_url: str) -> None: self.session = create_retry_session() - def download_artifact_to_file(self, org: str, project: str, artifact_id: str, out) -> int | ErrorResult: + def download_artifact_to_file(self, org: str, project: str, artifact_id: str, out) -> int: """Download preprod artifact directly to a file-like object. Args: @@ -85,39 +85,33 @@ def download_artifact_to_file(self, org: str, project: str, artifact_id: str, ou out: File-like object to write to (must support write() method) Returns: - Number of bytes written on success, or ErrorResult on failure + Number of bytes written on success + + Raises: + RuntimeError: With categorized error message on failure """ endpoint = f"/api/0/internal/{org}/{project}/files/preprodartifacts/{artifact_id}/" url = self._build_url(endpoint) logger.debug(f"GET {url}") + response = self.session.get(url, headers=self._get_auth_headers(), timeout=120, stream=True) if response.status_code != 200: - return self._handle_error_response(response, "Download") + error_result = self._handle_error_response(response, "Download artifact") + error_category, error_description = categorize_http_error(error_result) + raise RuntimeError(f"Failed to download artifact ({error_category}): {error_description}") # Stream directly to the file-like object file_size = 0 - try: - for chunk in response.iter_content(chunk_size=8192): - if chunk: - out.write(chunk) - file_size += len(chunk) - if file_size > 5 * 1024 * 1024 * 1024: # 5GB limit - logger.error("Download exceeds 5GB limit") - return ErrorResult( - error="File size exceeds 5GB limit", - status_code=413, # Payload Too Large - ) - - return file_size + for chunk in response.iter_content(chunk_size=8192): + if chunk: + out.write(chunk) + file_size += len(chunk) + if file_size > 5 * 1024 * 1024 * 1024: # 5GB limit + raise RuntimeError("Failed to download artifact (client_error): File size exceeds 5GB limit") - except Exception as e: - logger.error(f"Failed to write to file-like object: {e}") - return ErrorResult( - error=f"Failed to write to file-like object: {e}", - status_code=500, - ) + return file_size def update_artifact( self, org: str, project: str, artifact_id: str, data: Dict[str, Any] @@ -228,9 +222,11 @@ def _build_url(self, endpoint: str) -> str: def _handle_error_response(self, response: requests.Response, operation: str) -> ErrorResult: """Handle non-200 response with consistent error format.""" logger.warning(f"{operation} failed: {response.status_code}") + # Cast to int to help type checker understand status_code is int + status_code = cast(int, response.status_code) return ErrorResult( - error=f"HTTP {response.status_code}", - status_code=response.status_code, + error=f"HTTP {status_code}", + status_code=status_code, ) def _make_json_request( diff --git a/src/launchpad/service.py b/src/launchpad/service.py index 8b5dd8bf..18173ef1 100644 --- a/src/launchpad/service.py +++ b/src/launchpad/service.py @@ -170,11 +170,12 @@ def process_artifact(self, artifact_id: str, project_id: str, organization_id: s data=update_data, ) - if "error" in update_result: - error_msg = f"Failed to send preprocessed info: {update_result['error']}" + # Check if update_result is an ErrorResult + if isinstance(update_result, ErrorResult): + error_category, error_description = categorize_http_error(update_result) + error_msg = f"Failed to send preprocessed info: {error_description}" logger.error(error_msg) - # Extract just the error details for the database, the enum prefix will be added automatically - detailed_error = update_result["error"] + # Use the categorized error description for the database self._update_artifact_error( sentry_client, artifact_id, @@ -182,7 +183,7 @@ def process_artifact(self, artifact_id: str, project_id: str, organization_id: s organization_id, ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.UPDATE_FAILED, - detailed_error, + error_description, ) return @@ -318,36 +319,37 @@ def _download_artifact_to_temp_file( """Download artifact from Sentry directly to a temporary file.""" logger.info(f"Downloading artifact {artifact_id}...") - # Create temporary file first - with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tf: - temp_file = tf.name + temp_file = None + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tf: + temp_file = tf.name + file_size = sentry_client.download_artifact_to_file( + org=organization_id, project=project_id, artifact_id=artifact_id, out=tf + ) - # Download directly to the temporary file - file_size = sentry_client.download_artifact_to_file( - org=organization_id, project=project_id, artifact_id=artifact_id, out=tf - ) + # Success case + logger.info(f"Downloaded artifact {artifact_id}: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)") + logger.info(f"Saved artifact to temporary file: {temp_file}") + return temp_file - # Handle ErrorResult - if isinstance(file_size, ErrorResult): - error_category, error_description = categorize_http_error(file_size) - error_msg = f"Failed to download artifact ({error_category}): {error_description}" - logger.error(error_msg) + except Exception as e: + # Handle all errors (download errors, temp file creation errors, I/O errors) + error_msg = str(e) + logger.error(error_msg) - # Update artifact with error info - self._update_artifact_error( - sentry_client, - artifact_id, - project_id, - organization_id, - ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, - ProcessingErrorMessage.DOWNLOAD_FAILED, - error_description, - ) - raise RuntimeError(error_msg) + self._update_artifact_error( + sentry_client, + artifact_id, + project_id, + organization_id, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.DOWNLOAD_FAILED, + error_msg, + ) - logger.info(f"Downloaded artifact {artifact_id}: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)") - logger.info(f"Saved artifact to temporary file: {temp_file}") - return temp_file + if temp_file: + self._safe_cleanup(temp_file, "temporary file") + raise def _prepare_update_data(self, app_info: AppleAppInfo | AndroidAppInfo, artifact: Any) -> Dict[str, Any]: """Prepare update data based on app platform and artifact type.""" @@ -414,10 +416,10 @@ def _upload_results( ) if isinstance(upload_result, ErrorResult): - error_msg = f"Failed to upload analysis results: {upload_result.error}" + error_category, error_description = categorize_http_error(upload_result) + error_msg = f"Failed to upload analysis results: {error_description}" logger.error(error_msg) - # Extract just the error details for the database, the enum prefix will be added automatically - detailed_error = upload_result.error + # Use the categorized error description for the database self._update_artifact_error( sentry_client, artifact_id, @@ -425,23 +427,25 @@ def _upload_results( organization_id, ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.UPLOAD_FAILED, - detailed_error, + error_description, ) raise RuntimeError(error_msg) logger.info(f"Successfully uploaded analysis results for artifact {artifact_id}") - except RuntimeError as e: - detailed_error = str(e) - self._update_artifact_error( - sentry_client, - artifact_id, - project_id, - organization_id, - ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, - ProcessingErrorMessage.UPLOAD_FAILED, - detailed_error, - ) + except Exception as e: + if not isinstance(e, RuntimeError): + detailed_error = str(e) + self._update_artifact_error( + sentry_client, + artifact_id, + project_id, + organization_id, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.UPLOAD_FAILED, + detailed_error, + ) + raise finally: if analysis_file: