diff --git a/scripts/test_download_artifact.py b/scripts/test_download_artifact.py deleted file mode 100755 index 04ee51b5..00000000 --- a/scripts/test_download_artifact.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python3 -"""Script to test downloading artifacts from Sentry using the internal endpoint.""" - -import json -import logging -import os -import sys -import time - -import click - -sys.path.insert(0, "src") -from launchpad.sentry_client import SentryClient - - -@click.command() -@click.option( - "--base-url", default="http://localhost:8000", help="Base URL for Sentry API" -) -@click.option("--org", default="sentry", help="Organization slug") -@click.option("--project", default="internal", help="Project slug") -@click.option("--artifact-id", default="1", help="Artifact ID to download") -@click.option("--verbose", is_flag=True, help="Enable verbose logging") -def main( - base_url: str, org: str, project: str, artifact_id: str, verbose: bool -) -> None: - """Test downloading artifacts from Sentry using the internal endpoint.""" - - logging.basicConfig( - level=logging.DEBUG if verbose else logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - ) - - try: - click.echo(f"Testing download: {org}/{project}/artifacts/{artifact_id}") - - client = SentryClient(base_url=base_url) - start_time = time.time() - - response = client.download_artifact(org, project, artifact_id) - duration = time.time() - start_time - - if "error" in response: - click.echo( - f"❌ Failed: {response['error']} (Status: {response.get('status_code', 'Unknown')})" - ) - sys.exit(1) - - file_content = response.get("file_content", b"") - file_size = len(file_content) - - if not file_content: - click.echo("❌ No file content received") - sys.exit(1) - - # Save file to disk - timestamp = int(time.time()) - file_ext = ".zip" if file_content.startswith(b"PK") else ".bin" - filename = ( - f"preprod_artifact_{org}_{project}_{artifact_id}_{timestamp}{file_ext}" - ) - file_path = os.path.join(os.getcwd(), filename) - - with open(file_path, "wb") as f: - f.write(file_content) - - # Verify and report - disk_size = os.path.getsize(file_path) - integrity_ok = file_size == disk_size - - click.echo(f"✅ Downloaded {file_size:,} bytes in {duration:.2f}s") - click.echo(f"💾 Saved to: {file_path}") - click.echo( - f"{'✅' if integrity_ok else '⚠️ '} File integrity: {'OK' if integrity_ok else 'MISMATCH'}" - ) - - if verbose: - click.echo( - f"📄 Headers: {json.dumps(response.get('headers', {}), indent=2)}" - ) - - except Exception as e: - click.echo(f"❌ Error: {e}") - if verbose: - import traceback - - click.echo(traceback.format_exc()) - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/src/launchpad/constants.py b/src/launchpad/constants.py index 9bac61a9..144cf2f7 100644 --- a/src/launchpad/constants.py +++ b/src/launchpad/constants.py @@ -1,4 +1,71 @@ """Constants used throughout the Launchpad application.""" +from enum import Enum + # Kafka topic names PREPROD_ARTIFACT_EVENTS_TOPIC = "preprod-artifact-events" + + +# Error code constants (matching the Django model) +class ProcessingErrorCode(Enum): + """Error codes for artifact processing (matching the Django model).""" + + UNKNOWN = 0 + UPLOAD_TIMEOUT = 1 + ARTIFACT_PROCESSING_TIMEOUT = 2 + ARTIFACT_PROCESSING_ERROR = 3 + + +# Artifact type constants +class ArtifactType(Enum): + """Artifact types for different platforms and formats.""" + + XCARCHIVE = 0 + AAB = 1 + APK = 2 + + +# Retry configuration +MAX_RETRY_ATTEMPTS = 3 + +# Health check threshold - consider unhealthy if file not touched in 60 seconds +HEALTHCHECK_MAX_AGE_SECONDS = 60.0 + + +class OperationName(Enum): + """Enum for operation names used in retry logic.""" + + PREPROCESSING = "preprocessing" + SIZE_ANALYSIS = "size analysis" + + +class ProcessingErrorMessage(Enum): + """Fixed set of error messages for artifact processing.""" + + # Network-related errors + DOWNLOAD_FAILED = "Failed to download artifact from Sentry" + UPLOAD_FAILED = "Failed to upload analysis results to Sentry" + UPDATE_FAILED = "Failed to update artifact info in Sentry" + + # Processing-related errors + PREPROCESSING_FAILED = "Failed to extract basic app information" + SIZE_ANALYSIS_FAILED = "Failed to perform size analysis" + ARTIFACT_PARSING_FAILED = "Failed to parse artifact file" + UNSUPPORTED_ARTIFACT_TYPE = "Unsupported artifact type" + + # System-related errors + TEMP_FILE_CREATION_FAILED = "Failed to create temporary file" + CLEANUP_FAILED = "Failed to clean up temporary files" + + # Timeout errors + PROCESSING_TIMEOUT = "Processing timed out" + + # Unknown errors + UNKNOWN_ERROR = "An unknown error occurred" + + +# Operation to error message mapping +OPERATION_ERRORS = { + OperationName.PREPROCESSING: ProcessingErrorMessage.PREPROCESSING_FAILED, + OperationName.SIZE_ANALYSIS: ProcessingErrorMessage.SIZE_ANALYSIS_FAILED, +} diff --git a/src/launchpad/sentry_client.py b/src/launchpad/sentry_client.py index 44e6a26c..92608c75 100644 --- a/src/launchpad/sentry_client.py +++ b/src/launchpad/sentry_client.py @@ -11,13 +11,59 @@ import secrets from pathlib import Path -from typing import Any, Dict +from typing import Any, Dict, NamedTuple, cast import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + logger = logging.getLogger(__name__) +class DownloadResult(NamedTuple): + """Result of artifact download operation.""" + + success: bool + file_content: bytes + file_size_bytes: int + headers: dict[str, str] + + +class ErrorResult(NamedTuple): + """Result when an operation fails.""" + + error: str + status_code: int + + +class UploadResult(NamedTuple): + """Result of upload operation.""" + + success: bool + state: str | None = None + message: str | None = None + + +def create_retry_session(max_retries: int = 3) -> requests.Session: + """Create a requests session with retry configuration.""" + session = requests.Session() + + retry_strategy = Retry( + total=max_retries, + backoff_factor=0.1, + status_forcelist=[429, 500, 502, 503, 504], # Retry on these HTTP status codes + allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"], + raise_on_status=False, # Don't raise on HTTP errors, let our code handle them + ) + + adapter = HTTPAdapter(max_retries=retry_strategy) + session.mount("http://", adapter) + session.mount("https://", adapter) + + return session + + class SentryClient: """Client for authenticated API calls to the Sentry monolith.""" @@ -27,38 +73,49 @@ def __init__(self, base_url: str) -> None: if not self.shared_secret: raise RuntimeError("LAUNCHPAD_RPC_SHARED_SECRET must be provided or set as environment variable") - def download_artifact(self, org: str, project: str, artifact_id: str) -> Dict[str, Any]: - """Download preprod artifact.""" + self.session = create_retry_session() + + def download_artifact_to_file(self, org: str, project: str, artifact_id: str, out) -> int: + """Download preprod artifact directly to a file-like object. + + Args: + org: Organization slug + project: Project slug + artifact_id: Artifact ID + out: File-like object to write to (must support write() method) + + Returns: + Number of bytes written on success + + Raises: + RuntimeError: With categorized error message on failure + """ endpoint = f"/api/0/internal/{org}/{project}/files/preprodartifacts/{artifact_id}/" url = self._build_url(endpoint) - try: - logger.debug(f"GET {url}") - response = requests.get(url, headers=self._get_auth_headers(), timeout=120, stream=True) - - if response.status_code != 200: - return self._handle_error_response(response, "Download") - - # Read content with size limit - content = b"" - for chunk in response.iter_content(chunk_size=8192): - if chunk: - content += chunk - if len(content) > 5 * 1024 * 1024 * 1024: # 5GB limit - logger.warning("Download truncated at 5GB") - break - - return { - "success": True, - "file_content": content, - "file_size_bytes": len(content), - "headers": dict(response.headers), - } - except Exception as e: - logger.error(f"Download failed: {e}") - return {"error": str(e)} + logger.debug(f"GET {url}") - def update_artifact(self, org: str, project: str, artifact_id: str, data: Dict[str, Any]) -> Dict[str, Any]: + response = self.session.get(url, headers=self._get_auth_headers(), timeout=120, stream=True) + + if response.status_code != 200: + error_result = self._handle_error_response(response, "Download artifact") + error_category, error_description = categorize_http_error(error_result) + raise RuntimeError(f"Failed to download artifact ({error_category}): {error_description}") + + # Stream directly to the file-like object + file_size = 0 + for chunk in response.iter_content(chunk_size=8192): + if chunk: + out.write(chunk) + file_size += len(chunk) + if file_size > 5 * 1024 * 1024 * 1024: # 5GB limit + raise RuntimeError("Failed to download artifact (client_error): File size exceeds 5GB limit") + + return file_size + + def update_artifact( + self, org: str, project: str, artifact_id: str, data: Dict[str, Any] + ) -> Dict[str, Any] | ErrorResult: """Update preprod artifact.""" endpoint = f"/api/0/internal/{org}/{project}/files/preprodartifacts/{artifact_id}/update/" return self._make_json_request("PUT", endpoint, data, operation="Update") @@ -70,7 +127,7 @@ def upload_size_analysis_file( artifact_id: str, file_path: str, max_retries: int = 3, - ) -> Dict[str, Any]: + ) -> Dict[str, Any] | ErrorResult: """Upload size analysis file with chunking following Rust sentry-cli pattern.""" # Basic path validation path = Path(file_path).resolve() @@ -87,8 +144,11 @@ def upload_size_analysis_file( # Step 1: Get chunk upload options from server logger.debug("Getting chunk upload options...") options_result = self._get_chunk_upload_options(org) - if "error" in options_result: - return {"error": f"Failed to get chunk upload options: {options_result['error']}"} + if isinstance(options_result, ErrorResult): + return ErrorResult( + error=f"Failed to get chunk upload options: {options_result.error}", + status_code=options_result.status_code, + ) chunk_options = options_result.get("chunking", {}) chunk_size = chunk_options.get("chunk_size", 8 * 1024 * 1024) # fallback to 8MB @@ -119,6 +179,13 @@ def upload_size_analysis_file( chunks=chunk_checksums, ) + # Handle ErrorResult from _assemble_size_analysis + if isinstance(result, ErrorResult): + logger.warning(f"Assembly attempt {attempt + 1} failed: {result}") + if attempt == max_retries - 1: # Last attempt + return result + continue + state = result.get("state") if state in ["ok", "created"]: logger.info("Upload and assembly successful") @@ -137,7 +204,7 @@ def upload_size_analysis_file( if attempt == max_retries - 1: # Last attempt return result - return {"error": f"Failed after {max_retries} attempts"} + return ErrorResult(error=f"Failed after {max_retries} attempts", status_code=500) def _get_auth_headers(self, body: bytes | None = None) -> Dict[str, str]: """Get authentication headers for a request.""" @@ -152,13 +219,15 @@ def _build_url(self, endpoint: str) -> str: """Build full URL from endpoint.""" return f"{self.base_url}{endpoint}" - def _handle_error_response(self, response: requests.Response, operation: str) -> Dict[str, Any]: + def _handle_error_response(self, response: requests.Response, operation: str) -> ErrorResult: """Handle non-200 response with consistent error format.""" logger.warning(f"{operation} failed: {response.status_code}") - return { - "error": f"HTTP {response.status_code}", - "status_code": response.status_code, - } + # Cast to int to help type checker understand status_code is int + status_code = cast(int, response.status_code) + return ErrorResult( + error=f"HTTP {status_code}", + status_code=status_code, + ) def _make_json_request( self, @@ -167,14 +236,14 @@ def _make_json_request( data: Dict[str, Any] | None = None, timeout: int = 30, operation: str | None = None, - ) -> Dict[str, Any]: + ) -> Dict[str, Any] | ErrorResult: """Make a JSON request with standard error handling.""" url = self._build_url(endpoint) body = json.dumps(data).encode("utf-8") if data else b"" operation = operation or f"{method} {endpoint}" logger.debug(f"{method} {url}") - response = requests.request( + response = self.session.request( method=method, url=url, data=body or None, @@ -187,7 +256,7 @@ def _make_json_request( return response.json() - def _get_chunk_upload_options(self, org: str) -> Dict[str, Any]: + def _get_chunk_upload_options(self, org: str) -> Dict[str, Any] | ErrorResult: """Get chunk upload configuration from server.""" endpoint = f"/api/0/organizations/{org}/chunk-upload/" return self._make_json_request("GET", endpoint, operation="Get chunk options") @@ -249,7 +318,7 @@ def _upload_chunk(self, org: str, chunk: Dict[str, Any]) -> bool: } try: - response = requests.post(url, data=body, headers=headers, timeout=60) + response = self.session.post(url, data=body, headers=headers, timeout=60) success = response.status_code in [200, 201, 409] # 409 = already exists if not success: @@ -267,7 +336,7 @@ def _assemble_size_analysis( artifact_id: str | int, checksum: str, chunks: list[str], - ) -> Dict[str, Any]: + ) -> Dict[str, Any] | ErrorResult: """Call the assemble size analysis endpoint.""" # Validate hex strings if not re.match(r"^[a-fA-F0-9]+$", checksum): @@ -302,3 +371,59 @@ def _create_multipart_body(self, boundary: str, filename: str, data: bytes) -> b ] return b"".join(parts) + + +def categorize_http_error(error_result: ErrorResult | Dict[str, Any]) -> tuple[str, str]: + """ + Categorize HTTP error results from SentryClient. + + Returns: + Tuple of (error_category, error_description) + Categories: "not_found", "server_error", "client_error", "unknown" + """ + # Handle ErrorResult NamedTuple + if isinstance(error_result, ErrorResult): + status_code = error_result.status_code + if status_code == 404: + return "not_found", f"Resource not found (HTTP {status_code})" + elif 500 <= status_code < 600: + return "server_error", f"Server error (HTTP {status_code})" + elif 400 <= status_code < 500: + return "client_error", f"Client error (HTTP {status_code})" + else: + return "unknown", f"Unexpected HTTP status {status_code}" + + # Handle legacy dict format (for backward compatibility) + if isinstance(error_result, dict): + # First try to get the structured status code + status_code = error_result.get("status_code") + if isinstance(status_code, int): + if status_code == 404: + return "not_found", f"Resource not found (HTTP {status_code})" + elif 500 <= status_code < 600: + return "server_error", f"Server error (HTTP {status_code})" + elif 400 <= status_code < 500: + return "client_error", f"Client error (HTTP {status_code})" + else: + return "unknown", f"Unexpected HTTP status {status_code}" + + # Fallback to parsing the error message string + error_msg = error_result.get("error", "") + if isinstance(error_msg, str): + # Extract HTTP status code from error message like "HTTP 404" + match = re.search(r"HTTP (\d+)", error_msg) + if match: + try: + status_code = int(match.group(1)) + if status_code == 404: + return "not_found", f"Resource not found (HTTP {status_code})" + elif 500 <= status_code < 600: + return "server_error", f"Server error (HTTP {status_code})" + elif 400 <= status_code < 500: + return "client_error", f"Client error (HTTP {status_code})" + else: + return "unknown", f"Unexpected HTTP status {status_code}" + except ValueError: + pass + + return "unknown", f"Unknown error: {error_result}" diff --git a/src/launchpad/service.py b/src/launchpad/service.py index d3cb2a7b..18173ef1 100644 --- a/src/launchpad/service.py +++ b/src/launchpad/service.py @@ -3,10 +3,13 @@ from __future__ import annotations import asyncio +import json import os import signal +import tempfile import time +from pathlib import Path from typing import Any, Dict, cast from arroyo.backends.kafka import KafkaPayload @@ -15,6 +18,24 @@ PreprodArtifactEvents, ) +from launchpad.artifacts.android.aab import AAB +from launchpad.artifacts.android.zipped_aab import ZippedAAB +from launchpad.artifacts.artifact_factory import ArtifactFactory +from launchpad.constants import ( + HEALTHCHECK_MAX_AGE_SECONDS, + MAX_RETRY_ATTEMPTS, + OPERATION_ERRORS, + ArtifactType, + OperationName, + ProcessingErrorCode, + ProcessingErrorMessage, +) +from launchpad.sentry_client import ErrorResult, SentryClient, categorize_http_error +from launchpad.size.analyzers.android import AndroidAnalyzer +from launchpad.size.analyzers.apple import AppleAppAnalyzer +from launchpad.size.models.android import AndroidAppInfo +from launchpad.size.models.apple import AppleAppInfo +from launchpad.size.runner import do_preprocess, do_size from launchpad.utils.logging import get_logger from launchpad.utils.statsd import DogStatsd, get_statsd @@ -23,9 +44,6 @@ logger = get_logger(__name__) -# Health check threshold - consider unhealthy if file not touched in 60 seconds -HEALTHCHECK_MAX_AGE_SECONDS = 60.0 - class LaunchpadService: """Main service that orchestrates HTTP server and Kafka consumer.""" @@ -37,11 +55,15 @@ def __init__(self) -> None: self._kafka_task: asyncio.Future[Any] | None = None self._statsd: DogStatsd | None = None self._healthcheck_file: str | None = None + self._service_config: Dict[str, Any] | None = None async def setup(self) -> None: """Set up the service components.""" - service_config = get_service_config() - self._statsd = get_statsd(host=service_config["statsd_host"], port=service_config["statsd_port"]) + self._service_config = get_service_config() + self._statsd = get_statsd( + host=self._service_config["statsd_host"], + port=self._service_config["statsd_port"], + ) # Setup HTTP server with health check callback server_config = get_server_config() @@ -64,6 +86,29 @@ async def setup(self) -> None: logger.info("Service components initialized") + async def start(self) -> None: + """Start all service components.""" + if not self.server or not self.kafka_processor: + raise RuntimeError("Service not properly initialized. Call setup() first.") + + logger.info("Starting Launchpad service...") + + # Set up signal handlers for graceful shutdown + self._setup_signal_handlers() + + # Start Kafka processor in a background thread + loop = asyncio.get_event_loop() + self._kafka_task = loop.run_in_executor(None, self.kafka_processor.run) + + # Start HTTP server as a background task + server_task = asyncio.create_task(self.server.start()) + logger.info("Launchpad service started successfully") + + try: + await self._shutdown_event.wait() + finally: + await self._cleanup(server_task) + def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None: """ Handle incoming Kafka messages. @@ -78,58 +123,351 @@ def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None: if self._statsd: self._statsd.increment("launchpad.artifact.processing.started") - # TODO: Implement actual analysis logic - # This will need to: - # 1. Fetch the artifact using artifact_id from storage/API - # 2. Determine platform by examining the artifact - # 3. Run appropriate analyzer (iOS/Android) - # 4. Store results + self.process_artifact(artifact_id, project_id, organization_id) - # For now, just log - logger.info(f"Analysis completed for artifact {artifact_id} (stub)") + logger.info(f"Analysis completed for artifact {artifact_id}") if self._statsd: self._statsd.increment("launchpad.artifact.processing.completed") except Exception as e: - logger.error(f"Analysis failed for artifact {artifact_id}: {e}", exc_info=True) + # Log the full error for debugging + logger.error( + f"Failed to process artifact {artifact_id} (project: {project_id}, org: {organization_id}): {e}", + exc_info=True, + ) + if self._statsd: self._statsd.increment("launchpad.artifact.processing.failed") - # Re-raise to let Arroyo handle the error (can be configured for DLQ) - raise - async def start(self) -> None: - """Start all service components.""" - if not self.server or not self.kafka_processor: + def process_artifact(self, artifact_id: str, project_id: str, organization_id: str) -> None: + """ + Download artifact and perform size analysis. + """ + if not self._service_config: raise RuntimeError("Service not properly initialized. Call setup() first.") - logger.info("Starting Launchpad service...") + sentry_client = SentryClient(base_url=self._service_config["sentry_base_url"]) + temp_file = None - # Set up signal handlers for graceful shutdown - self._setup_signal_handlers() + try: + temp_file = self._download_artifact_to_temp_file(sentry_client, artifact_id, project_id, organization_id) + + artifact = ArtifactFactory.from_path(Path(temp_file)) + logger.info(f"Running preprocessing on {temp_file}...") + app_info = self._retry_operation( + lambda: do_preprocess(Path(temp_file)), + OperationName.PREPROCESSING, + ) + logger.info(f"Preprocessing completed for artifact {artifact_id}") + + update_data = self._prepare_update_data(app_info, artifact) + logger.info(f"Sending preprocessed info to Sentry for artifact {artifact_id}...") + update_result = sentry_client.update_artifact( + org=organization_id, + project=project_id, + artifact_id=artifact_id, + data=update_data, + ) + + # Check if update_result is an ErrorResult + if isinstance(update_result, ErrorResult): + error_category, error_description = categorize_http_error(update_result) + error_msg = f"Failed to send preprocessed info: {error_description}" + logger.error(error_msg) + # Use the categorized error description for the database + self._update_artifact_error( + sentry_client, + artifact_id, + project_id, + organization_id, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.UPDATE_FAILED, + error_description, + ) + return - # Start Kafka processor in a background thread - loop = asyncio.get_event_loop() - self._kafka_task = loop.run_in_executor(None, self.kafka_processor.run) + logger.info(f"Successfully sent preprocessed info for artifact {artifact_id}") - # Start HTTP server as a background task - server_task = asyncio.create_task(self.server.start()) + analyzer = self._create_analyzer(app_info) + logger.info(f"Running full analysis on {temp_file}...") + results = self._retry_operation( + lambda: do_size(Path(temp_file), analyzer=analyzer), + OperationName.SIZE_ANALYSIS, + ) + logger.info(f"Size analysis completed for artifact {artifact_id}") - logger.info("Launchpad service started successfully") + self._upload_results(sentry_client, results, artifact_id, project_id, organization_id) + + except Exception as e: + logger.error(f"Failed to process artifact {artifact_id}: {e}", exc_info=True) + + error_code, error_message = self._categorize_processing_error(e) + + # Include detailed error information for better debugging + detailed_error = str(e) + + self._update_artifact_error( + sentry_client, artifact_id, project_id, organization_id, error_code, error_message, detailed_error + ) + raise + + finally: + if temp_file: + self._safe_cleanup(temp_file, "temporary file") + + def _retry_operation(self, operation, operation_name: OperationName): + """Retry an operation up to MAX_RETRY_ATTEMPTS times.""" + error_message = OPERATION_ERRORS[operation_name] + last_exception = None + for attempt in range(1, MAX_RETRY_ATTEMPTS + 1): + try: + logger.debug(f"Attempting {operation_name.value} (attempt {attempt}/{MAX_RETRY_ATTEMPTS})") + return operation() + except Exception as e: + last_exception = e + logger.warning(f"{operation_name.value} failed on attempt {attempt}/{MAX_RETRY_ATTEMPTS}: {e}") + + if self._is_non_retryable_error(e): + logger.info(f"Non-retryable error for {operation_name.value}, not retrying") + break + + if attempt < MAX_RETRY_ATTEMPTS: + logger.info(f"Retrying {operation_name.value} in a moment...") + time.sleep(1) + + logger.error(f"All {MAX_RETRY_ATTEMPTS} attempts failed for {operation_name.value}") + raise RuntimeError(f"{error_message.value}: {str(last_exception)}") from last_exception + + def _is_non_retryable_error(self, exception: Exception) -> bool: + """Determine if an error should not be retried.""" + return isinstance(exception, (ValueError, NotImplementedError, FileNotFoundError)) + + def _categorize_processing_error(self, exception: Exception) -> tuple[ProcessingErrorCode, ProcessingErrorMessage]: + """Categorize an exception into error code and message.""" + if isinstance(exception, ValueError): + return ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.ARTIFACT_PARSING_FAILED + elif isinstance(exception, NotImplementedError): + return ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.UNSUPPORTED_ARTIFACT_TYPE + elif isinstance(exception, FileNotFoundError): + return ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.ARTIFACT_PARSING_FAILED + elif isinstance(exception, RuntimeError): + error_str = str(exception).lower() + if "timeout" in error_str: + return ProcessingErrorCode.ARTIFACT_PROCESSING_TIMEOUT, ProcessingErrorMessage.PROCESSING_TIMEOUT + elif "preprocess" in error_str: + return ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.PREPROCESSING_FAILED + elif "size" in error_str or "analysis" in error_str: + return ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.SIZE_ANALYSIS_FAILED + else: + return ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, ProcessingErrorMessage.UNKNOWN_ERROR + else: + return ProcessingErrorCode.UNKNOWN, ProcessingErrorMessage.UNKNOWN_ERROR + + def _update_artifact_error( + self, + sentry_client: SentryClient, + artifact_id: str, + project_id: str, + organization_id: str, + error_code: ProcessingErrorCode, + error_message: ProcessingErrorMessage, + detailed_error: str | None = None, + ) -> None: + """Update artifact with error information.""" try: - # Wait for shutdown signal - await self._shutdown_event.wait() + logger.info(f"Updating artifact {artifact_id} with error code {error_code.value}") + + # Use detailed error message if provided, otherwise use enum value + final_error_message = f"{error_message.value}: {detailed_error}" if detailed_error else error_message.value + + # Log error to datadog with tags for better monitoring + if self._statsd: + self._statsd.increment( + "launchpad.artifact.processing.error", + tags=[ + f"error_code:{error_code.value}", + f"error_type:{error_message.name}", + f"project_id:{project_id}", + f"organization_id:{organization_id}", + ], + ) + + result = sentry_client.update_artifact( + org=organization_id, + project=project_id, + artifact_id=artifact_id, + data={"error_code": error_code.value, "error_message": final_error_message}, + ) + + if isinstance(result, ErrorResult): + logger.error(f"Failed to update artifact with error: {result.error}") + else: + logger.info(f"Successfully updated artifact {artifact_id} with error information") + + except Exception as e: + logger.error(f"Failed to update artifact {artifact_id} with error information: {e}", exc_info=True) + + def _download_artifact_to_temp_file( + self, + sentry_client: SentryClient, + artifact_id: str, + project_id: str, + organization_id: str, + ) -> str: + """Download artifact from Sentry directly to a temporary file.""" + logger.info(f"Downloading artifact {artifact_id}...") + + temp_file = None + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tf: + temp_file = tf.name + file_size = sentry_client.download_artifact_to_file( + org=organization_id, project=project_id, artifact_id=artifact_id, out=tf + ) + + # Success case + logger.info(f"Downloaded artifact {artifact_id}: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)") + logger.info(f"Saved artifact to temporary file: {temp_file}") + return temp_file + + except Exception as e: + # Handle all errors (download errors, temp file creation errors, I/O errors) + error_msg = str(e) + logger.error(error_msg) + + self._update_artifact_error( + sentry_client, + artifact_id, + project_id, + organization_id, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.DOWNLOAD_FAILED, + error_msg, + ) + + if temp_file: + self._safe_cleanup(temp_file, "temporary file") + raise + + def _prepare_update_data(self, app_info: AppleAppInfo | AndroidAppInfo, artifact: Any) -> Dict[str, Any]: + """Prepare update data based on app platform and artifact type.""" + if isinstance(app_info, AppleAppInfo): + # TODO: add "date_built" field once exposed in 'AppleAppInfo' + return { + "build_version": app_info.version, + "build_number": (int(app_info.build) if str(app_info.build).isdigit() else app_info.build), + "artifact_type": ArtifactType.XCARCHIVE.value, + "apple_app_info": { + "is_simulator": app_info.is_simulator, + "codesigning_type": app_info.codesigning_type, + "profile_name": app_info.profile_name, + "is_code_signature_valid": app_info.is_code_signature_valid, + "code_signature_errors": app_info.code_signature_errors, + }, + } + elif isinstance(app_info, AndroidAppInfo): + artifact_type = ArtifactType.AAB if isinstance(artifact, (AAB, ZippedAAB)) else ArtifactType.APK + # TODO: add "date_built" and custom android fields + return { + "build_version": app_info.version, + "build_number": (int(app_info.build) if app_info.build.isdigit() else None), + "artifact_type": artifact_type.value, + } + else: + raise ValueError(f"Unsupported app_info type: {type(app_info)}") + + def _create_analyzer(self, app_info: AppleAppInfo | AndroidAppInfo) -> AndroidAnalyzer | AppleAppAnalyzer: + """Create analyzer with preprocessed app info.""" + if isinstance(app_info, AndroidAppInfo): + analyzer = AndroidAnalyzer() + analyzer.app_info = app_info + return analyzer + else: # AppleAppInfo + analyzer = AppleAppAnalyzer() + analyzer.app_info = app_info + return analyzer + + def _upload_results( + self, + sentry_client: SentryClient, + results: Any, + artifact_id: str, + project_id: str, + organization_id: str, + ) -> None: + """Upload analysis results to Sentry.""" + analysis_file = None + + try: + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as af: + json.dump(results.to_dict(), af, indent=2) + analysis_file = af.name + + logger.info(f"Analysis results written to temporary file: {analysis_file}") + logger.info(f"Uploading analysis results for artifact {artifact_id}...") + + upload_result = sentry_client.upload_size_analysis_file( + org=organization_id, + project=project_id, + artifact_id=artifact_id, + file_path=analysis_file, + ) + + if isinstance(upload_result, ErrorResult): + error_category, error_description = categorize_http_error(upload_result) + error_msg = f"Failed to upload analysis results: {error_description}" + logger.error(error_msg) + # Use the categorized error description for the database + self._update_artifact_error( + sentry_client, + artifact_id, + project_id, + organization_id, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.UPLOAD_FAILED, + error_description, + ) + raise RuntimeError(error_msg) + + logger.info(f"Successfully uploaded analysis results for artifact {artifact_id}") + + except Exception as e: + if not isinstance(e, RuntimeError): + detailed_error = str(e) + self._update_artifact_error( + sentry_client, + artifact_id, + project_id, + organization_id, + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.UPLOAD_FAILED, + detailed_error, + ) + raise + finally: - # Cleanup - await self._cleanup(server_task) + if analysis_file: + self._safe_cleanup(analysis_file, "analysis file") + + def _safe_cleanup(self, file_path: str, description: str) -> None: + """Safely clean up a file with error handling.""" + if file_path and os.path.exists(file_path): + try: + os.remove(file_path) + logger.debug(f"Cleaned up {description}: {file_path}") + except Exception as e: + logger.warning(f"Failed to clean up {description} {file_path}: {e}") def _setup_signal_handlers(self) -> None: """Set up signal handlers for graceful shutdown.""" def signal_handler(signum: int, frame: Any) -> None: if self._shutdown_event.is_set(): - logger.info(f"Received signal {signum} during shutdown, ignoring...") + logger.info(f"Received signal {signum} during shutdown, forcing exit...") + # Force exit if we get a second signal + os._exit(1) return logger.info(f"Received signal {signum}, initiating shutdown...") @@ -248,6 +586,7 @@ def get_service_config() -> Dict[str, Any]: """Get service configuration from environment.""" statsd_host = os.getenv("STATSD_HOST", "127.0.0.1") statsd_port_str = os.getenv("STATSD_PORT", "8125") + sentry_base_url = os.getenv("SENTRY_BASE_URL") try: statsd_port = int(statsd_port_str) @@ -257,6 +596,7 @@ def get_service_config() -> Dict[str, Any]: return { "statsd_host": statsd_host, "statsd_port": statsd_port, + "sentry_base_url": sentry_base_url, } diff --git a/src/launchpad/size/analyzers/android.py b/src/launchpad/size/analyzers/android.py index 7b5dee15..a575f2e0 100644 --- a/src/launchpad/size/analyzers/android.py +++ b/src/launchpad/size/analyzers/android.py @@ -48,17 +48,35 @@ def __init__( skip_insights: Skip insights generation for faster analysis """ self.skip_insights = skip_insights + self.app_info: AndroidAppInfo | None = None - def analyze(self, artifact: AndroidArtifact) -> AndroidAnalysisResults: + def preprocess(self, artifact: AndroidArtifact) -> AndroidAppInfo: + """Extract basic app information from the manifest. + + Args: + artifact: Android artifact to preprocess + + Returns: + Basic app information extracted from manifest + """ manifest_dict = artifact.get_manifest().model_dump() - app_info = AndroidAppInfo( + self.app_info = AndroidAppInfo( name=manifest_dict["application"]["label"] or "Unknown", version=manifest_dict["version_name"] or "Unknown", build=manifest_dict["version_code"] or "Unknown", package_name=manifest_dict["package_name"], ) + return self.app_info + + def analyze(self, artifact: AndroidArtifact) -> AndroidAnalysisResults: + # Use preprocessed app info if available, otherwise extract it + if not self.app_info: + self.app_info = self.preprocess(artifact) + + app_info = self.app_info + apks: list[APK] = [] # Split AAB into APKs, or use the APK directly if isinstance(artifact, AAB): diff --git a/src/launchpad/size/runner.py b/src/launchpad/size/runner.py index ca361446..11e7975f 100644 --- a/src/launchpad/size/runner.py +++ b/src/launchpad/size/runner.py @@ -8,22 +8,60 @@ from launchpad.artifacts.artifact_factory import ArtifactFactory from launchpad.size.analyzers.android import AndroidAnalyzer from launchpad.size.analyzers.apple import AppleAppAnalyzer +from launchpad.size.models.android import AndroidAppInfo +from launchpad.size.models.apple import AppleAppInfo from launchpad.size.models.common import BaseAnalysisResults -def do_size(path: Path, **flags: Any) -> BaseAnalysisResults: - start_time = time.time() - artifact = ArtifactFactory.from_path(path) +def do_preprocess(path: Path, **flags: Any) -> AndroidAppInfo | AppleAppInfo: + """Perform preprocessing step only to extract basic app info. + + Args: + path: Path to the artifact + **flags: Additional flags passed to analyzer + + Returns: + App info extracted during preprocessing + """ # isinstance switch below is a bit sad. Ryan suggested a # get_analyzer method on artifact which might be nicer. - analyzer: AndroidAnalyzer | AppleAppAnalyzer + artifact = ArtifactFactory.from_path(path) if isinstance(artifact, AndroidArtifact): analyzer = AndroidAnalyzer(**flags) + return analyzer.preprocess(cast(AndroidArtifact, artifact)) elif isinstance(artifact, AppleArtifact): analyzer = AppleAppAnalyzer(**flags) + return analyzer.preprocess(cast(AppleArtifact, artifact)) else: raise ValueError(f"Unknown artifact kind {artifact}") + + +def do_size( + path: Path, analyzer: AndroidAnalyzer | AppleAppAnalyzer | None = None, **flags: Any +) -> BaseAnalysisResults: + """Perform full size analysis. + + Args: + path: Path to the artifact + analyzer: Optional pre-configured analyzer (with preprocessing already done) + **flags: Additional flags passed to analyzer if creating new one + + Returns: + Full analysis results + """ + start_time = time.time() + artifact = ArtifactFactory.from_path(path) + + # If no analyzer provided, create one + if analyzer is None: + if isinstance(artifact, AndroidArtifact): + analyzer = AndroidAnalyzer(**flags) + elif isinstance(artifact, AppleArtifact): + analyzer = AppleAppAnalyzer(**flags) + else: + raise ValueError(f"Unknown artifact kind {artifact}") + results = analyzer.analyze(cast(Any, artifact)) end_time = time.time() diff --git a/tests/test_integration.py b/tests/test_integration.py index 85b7e6bc..a99371b0 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -47,29 +47,48 @@ async def test_kafka_message_processing(self): # Mock statsd service._statsd = Mock() - # Test artifact analysis message with iOS artifact - ios_payload: PreprodArtifactEvents = { - "artifact_id": "ios-test-123", - "project_id": "test-project-ios", - "organization_id": "test-org-123", + # Mock service config to make the service appear initialized + service._service_config = { + "statsd_host": "127.0.0.1", + "statsd_port": 8125, + "sentry_base_url": "https://sentry.example.com", } - # handle_kafka_message is synchronous - service.handle_kafka_message(ios_payload) + # Mock process_artifact to avoid actual processing + with patch.object(service, "process_artifact") as mock_process: + # Test artifact analysis message with iOS artifact + ios_payload: PreprodArtifactEvents = { + "artifact_id": "ios-test-123", + "project_id": "test-project-ios", + "organization_id": "test-org-123", + } - # Verify statsd metrics were sent - service._statsd.increment.assert_any_call("launchpad.artifact.processing.started") - service._statsd.increment.assert_any_call("launchpad.artifact.processing.completed") + # handle_kafka_message is synchronous + service.handle_kafka_message(ios_payload) - # Test artifact analysis message with Android artifact - android_payload: PreprodArtifactEvents = { - "artifact_id": "android-test-456", - "project_id": "test-project-android", - "organization_id": "test-org-456", - } + # Verify the processing method was called + mock_process.assert_called_once_with("ios-test-123", "test-project-ios", "test-org-123") + + # Verify statsd metrics were sent + service._statsd.increment.assert_any_call("launchpad.artifact.processing.started") + service._statsd.increment.assert_any_call("launchpad.artifact.processing.completed") + + # Reset mocks for next test + mock_process.reset_mock() + service._statsd.reset_mock() + + # Test artifact analysis message with Android artifact + android_payload: PreprodArtifactEvents = { + "artifact_id": "android-test-456", + "project_id": "test-project-android", + "organization_id": "test-org-456", + } + + # handle_kafka_message is synchronous + service.handle_kafka_message(android_payload) - # handle_kafka_message is synchronous - service.handle_kafka_message(android_payload) + # Verify the processing method was called + mock_process.assert_called_once_with("android-test-456", "test-project-android", "test-org-456") @pytest.mark.asyncio async def test_error_handling_in_message_processing(self): diff --git a/tests/test_service.py b/tests/test_service.py index e9e0cd75..e9ffb2cb 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -5,7 +5,7 @@ import os import time -from unittest.mock import Mock +from unittest.mock import Mock, patch import pytest @@ -61,7 +61,8 @@ async def test_ready_check(self): class TestLaunchpadService: """Test cases for LaunchpadService.""" - def test_handle_kafka_message_ios(self): + @patch.object(LaunchpadService, "process_artifact") + def test_handle_kafka_message_ios(self, mock_process): """Test handling iOS artifact messages.""" service = LaunchpadService() @@ -78,11 +79,15 @@ def test_handle_kafka_message_ios(self): # handle_kafka_message is synchronous service.handle_kafka_message(payload) + # Verify process_artifact was called with correct args + mock_process.assert_called_once_with("ios-test-123", "test-project-ios", "test-org-123") + # Verify metrics were recorded service._statsd.increment.assert_any_call("launchpad.artifact.processing.started") service._statsd.increment.assert_any_call("launchpad.artifact.processing.completed") - def test_handle_kafka_message_android(self): + @patch.object(LaunchpadService, "process_artifact") + def test_handle_kafka_message_android(self, mock_process): """Test handling Android artifact messages.""" service = LaunchpadService() @@ -99,21 +104,23 @@ def test_handle_kafka_message_android(self): # handle_kafka_message is synchronous service.handle_kafka_message(payload) + # Verify process_artifact was called with correct args + mock_process.assert_called_once_with("android-test-456", "test-project-android", "test-org-456") + # Verify metrics were recorded service._statsd.increment.assert_any_call("launchpad.artifact.processing.started") service._statsd.increment.assert_any_call("launchpad.artifact.processing.completed") - def test_handle_kafka_message_error(self): + @patch.object(LaunchpadService, "process_artifact") + def test_handle_kafka_message_error(self, mock_process): """Test error handling in message processing.""" service = LaunchpadService() - # Mock statsd to raise an exception on the second call + # Mock statsd service._statsd = Mock() - service._statsd.increment.side_effect = [ - None, # First call: processing.started - Exception("Simulated error"), # Second call: processing.completed (raises) - None, # Third call: processing.failed - ] + + # Make process_artifact raise an exception + mock_process.side_effect = RuntimeError("Download failed: HTTP 404") # Create a valid payload payload: PreprodArtifactEvents = { @@ -122,16 +129,17 @@ def test_handle_kafka_message_error(self): "organization_id": "test-org", } - # This should raise the exception (to be handled by Arroyo) - with pytest.raises(Exception, match="Simulated error"): - service.handle_kafka_message(payload) + # This should not raise (simplified error handling catches all exceptions) + service.handle_kafka_message(payload) + + # Verify process_artifact was called + mock_process.assert_called_once_with("test-123", "test-project", "test-org") - # Verify the metrics were called in the expected order + # Verify the metrics were called correctly calls = service._statsd.increment.call_args_list - assert len(calls) == 3 + assert len(calls) == 2 assert calls[0][0][0] == "launchpad.artifact.processing.started" - assert calls[1][0][0] == "launchpad.artifact.processing.completed" - assert calls[2][0][0] == "launchpad.artifact.processing.failed" + assert calls[1][0][0] == "launchpad.artifact.processing.failed" @pytest.mark.asyncio async def test_health_check_with_healthcheck_file(self, tmp_path): diff --git a/tests/unit/test_sentry_client_retry.py b/tests/unit/test_sentry_client_retry.py new file mode 100644 index 00000000..55c11486 --- /dev/null +++ b/tests/unit/test_sentry_client_retry.py @@ -0,0 +1,187 @@ +"""Tests for retry logic in SentryClient.""" + +from unittest.mock import Mock, patch + +import requests + +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from launchpad.sentry_client import SentryClient, create_retry_session + + +class TestSentryClientRetry: + """Test retry logic in SentryClient.""" + + def setup_method(self): + """Set up test fixtures.""" + with patch.dict("os.environ", {"LAUNCHPAD_RPC_SHARED_SECRET": "test_secret"}): + self.client = SentryClient(base_url="https://test.sentry.io") + + def test_create_retry_session_configuration(self): + """Test that create_retry_session creates a session with correct retry configuration.""" + session = create_retry_session(max_retries=5) + + # Check that session has the right type + assert isinstance(session, requests.Session) + + # Check that the session has adapters mounted + assert "http://" in session.adapters + assert "https://" in session.adapters + + # Check that adapters are HTTPAdapter instances + http_adapter = session.adapters["http://"] + https_adapter = session.adapters["https://"] + assert isinstance(http_adapter, HTTPAdapter) + assert isinstance(https_adapter, HTTPAdapter) + + # Check that the retry strategy is configured + assert http_adapter.max_retries.total == 5 + assert https_adapter.max_retries.total == 5 + + def test_create_retry_session_default_retries(self): + """Test that create_retry_session uses default retry count.""" + session = create_retry_session() + + http_adapter = session.adapters["http://"] + assert http_adapter.max_retries.total == 3 + + def test_sentry_client_uses_retry_session(self): + """Test that SentryClient uses a retry session.""" + with patch.dict("os.environ", {"LAUNCHPAD_RPC_SHARED_SECRET": "test_secret"}): + client = SentryClient(base_url="https://test.sentry.io") + + # Check that the client has a session + assert hasattr(client, "session") + assert isinstance(client.session, requests.Session) + + # Check that the session has retry adapters + assert "http://" in client.session.adapters + assert "https://" in client.session.adapters + assert isinstance(client.session.adapters["http://"], HTTPAdapter) + + @patch("launchpad.sentry_client.requests.Session") + def test_download_artifact_to_file_with_retry_session(self, mock_session_class): + """Test that download_artifact_to_file uses the retry session.""" + # Mock the session and its methods + mock_session = Mock() + mock_session_class.return_value = mock_session + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.iter_content.return_value = [b"test content"] + mock_session.get.return_value = mock_response + + # Create client with mocked session + with patch.dict("os.environ", {"LAUNCHPAD_RPC_SHARED_SECRET": "test_secret"}): + client = SentryClient(base_url="https://test.sentry.io") + client.session = mock_session + + # Mock file object + mock_file = Mock() + result = client.download_artifact_to_file("test-org", "test-project", "test-artifact", mock_file) + + assert result == 12 # Length of "test content" + assert mock_session.get.called + mock_file.write.assert_called_with(b"test content") + + @patch("launchpad.sentry_client.requests.Session") + def test_update_artifact_with_retry_session(self, mock_session_class): + """Test that update_artifact uses the retry session.""" + # Mock the session and its methods + mock_session = Mock() + mock_session_class.return_value = mock_session + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {"success": True} + mock_session.request.return_value = mock_response + + # Create client with mocked session + with patch.dict("os.environ", {"LAUNCHPAD_RPC_SHARED_SECRET": "test_secret"}): + client = SentryClient(base_url="https://test.sentry.io") + client.session = mock_session + + result = client.update_artifact("test-org", "test-project", "test-artifact", {"version": "1.0"}) + + assert result == {"success": True} + assert mock_session.request.called + + @patch("launchpad.sentry_client.requests.Session") + def test_upload_chunk_with_retry_session(self, mock_session_class): + """Test that _upload_chunk uses the retry session.""" + # Mock the session and its methods + mock_session = Mock() + mock_session_class.return_value = mock_session + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_session.post.return_value = mock_response + + # Create client with mocked session + with patch.dict("os.environ", {"LAUNCHPAD_RPC_SHARED_SECRET": "test_secret"}): + client = SentryClient(base_url="https://test.sentry.io") + client.session = mock_session + + chunk = {"checksum": "abcd1234", "data": b"test data", "size": 9} + + result = client._upload_chunk("test-org", chunk) + + assert result is True + assert mock_session.post.called + + def test_retry_strategy_configuration(self): + """Test that the retry strategy is configured correctly.""" + session = create_retry_session() + adapter = session.adapters["https://"] + retry_strategy = adapter.max_retries + + # Check retry configuration + assert isinstance(retry_strategy, Retry) + assert retry_strategy.total == 3 + assert retry_strategy.backoff_factor == 0.1 + assert retry_strategy.status_forcelist == [429, 500, 502, 503, 504] + assert retry_strategy.raise_on_status is False + + # Check allowed methods + expected_methods = ["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"] + assert retry_strategy.allowed_methods == expected_methods + + def test_retry_strategy_custom_max_retries(self): + """Test that custom max retries is applied correctly.""" + session = create_retry_session(max_retries=5) + adapter = session.adapters["https://"] + retry_strategy = adapter.max_retries + + assert retry_strategy.total == 5 + + @patch("launchpad.sentry_client.requests.Session") + def test_json_request_uses_session(self, mock_session_class): + """Test that _make_json_request uses the retry session.""" + # Mock the session and its methods + mock_session = Mock() + mock_session_class.return_value = mock_session + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {"result": "success"} + mock_session.request.return_value = mock_response + + # Create client with mocked session + with patch.dict("os.environ", {"LAUNCHPAD_RPC_SHARED_SECRET": "test_secret"}): + client = SentryClient(base_url="https://test.sentry.io") + client.session = mock_session + + result = client._make_json_request("POST", "/test", {"key": "value"}) + + assert result == {"result": "success"} + assert mock_session.request.called + + # Verify the call was made with correct parameters + call_args = mock_session.request.call_args + assert call_args[1]["method"] == "POST" + assert "test.sentry.io/test" in call_args[1]["url"] diff --git a/tests/unit/test_service_error_handling.py b/tests/unit/test_service_error_handling.py new file mode 100644 index 00000000..53865692 --- /dev/null +++ b/tests/unit/test_service_error_handling.py @@ -0,0 +1,269 @@ +"""Tests for error handling and retry logic in LaunchpadService.""" + +from unittest.mock import Mock, patch + +import pytest + +from launchpad.constants import ( + MAX_RETRY_ATTEMPTS, + OperationName, + ProcessingErrorCode, + ProcessingErrorMessage, +) +from launchpad.service import LaunchpadService + + +class TestLaunchpadServiceErrorHandling: + """Test error handling and retry logic in LaunchpadService.""" + + def setup_method(self): + """Set up test fixtures.""" + self.service = LaunchpadService() + self.service._service_config = { + "sentry_base_url": "https://test.sentry.io", + "statsd_host": "localhost", + "statsd_port": 8125, + } + self.service._statsd = Mock() + + def test_retry_operation_success_on_first_attempt(self): + """Test that _retry_operation succeeds on first attempt.""" + operation = Mock(return_value="success") + + result = self.service._retry_operation( + operation, + OperationName.PREPROCESSING, + ) + + assert result == "success" + assert operation.call_count == 1 + + def test_retry_operation_success_on_second_attempt(self): + """Test that _retry_operation succeeds on second attempt after one failure.""" + operation = Mock() + operation.side_effect = [RuntimeError("Temporary failure"), "success"] + + result = self.service._retry_operation( + operation, + OperationName.PREPROCESSING, + ) + + assert result == "success" + assert operation.call_count == 2 + + def test_retry_operation_fails_after_max_attempts(self): + """Test that _retry_operation fails after MAX_RETRY_ATTEMPTS.""" + operation = Mock() + operation.side_effect = RuntimeError("Persistent failure") + + with pytest.raises(RuntimeError, match="Failed to extract basic app information"): + self.service._retry_operation( + operation, + OperationName.PREPROCESSING, + ) + + assert operation.call_count == MAX_RETRY_ATTEMPTS + + def test_retry_operation_non_retryable_error(self): + """Test that _retry_operation doesn't retry non-retryable errors.""" + operation = Mock() + operation.side_effect = ValueError("Invalid input") + + with pytest.raises(RuntimeError, match="Failed to perform size analysis"): + self.service._retry_operation( + operation, + OperationName.SIZE_ANALYSIS, + ) + + assert operation.call_count == 1 # Should not retry + + def test_retry_operation_maps_operation_to_correct_error_message(self): + """Test that _retry_operation correctly maps operation names to error messages.""" + operation = Mock() + operation.side_effect = RuntimeError("Persistent failure") + + # Test PREPROCESSING maps to PREPROCESSING_FAILED + with pytest.raises(RuntimeError, match="Failed to extract basic app information"): + self.service._retry_operation(operation, OperationName.PREPROCESSING) + + # Test SIZE_ANALYSIS maps to SIZE_ANALYSIS_FAILED + with pytest.raises(RuntimeError, match="Failed to perform size analysis"): + self.service._retry_operation(operation, OperationName.SIZE_ANALYSIS) + + def test_is_non_retryable_error(self): + """Test that _is_non_retryable_error correctly identifies non-retryable errors.""" + # Non-retryable errors + assert self.service._is_non_retryable_error(ValueError("test")) + assert self.service._is_non_retryable_error(NotImplementedError("test")) + assert self.service._is_non_retryable_error(FileNotFoundError("test")) + + # Retryable errors + assert not self.service._is_non_retryable_error(RuntimeError("test")) + assert not self.service._is_non_retryable_error(ConnectionError("test")) + assert not self.service._is_non_retryable_error(Exception("test")) + + def test_categorize_processing_error(self): + """Test that processing errors are categorized correctly.""" + service = LaunchpadService() + + # Test ValueError -> ARTIFACT_PARSING_FAILED + error_code, error_message = service._categorize_processing_error(ValueError("Invalid format")) + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR + assert error_message == ProcessingErrorMessage.ARTIFACT_PARSING_FAILED + + # Test NotImplementedError -> UNSUPPORTED_ARTIFACT_TYPE + error_code, error_message = service._categorize_processing_error(NotImplementedError("Not supported")) + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR + assert error_message == ProcessingErrorMessage.UNSUPPORTED_ARTIFACT_TYPE + + # Test FileNotFoundError -> ARTIFACT_PARSING_FAILED + error_code, error_message = service._categorize_processing_error(FileNotFoundError("File not found")) + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR + assert error_message == ProcessingErrorMessage.ARTIFACT_PARSING_FAILED + + # Test RuntimeError with timeout -> PROCESSING_TIMEOUT + error_code, error_message = service._categorize_processing_error(RuntimeError("Processing timeout occurred")) + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_TIMEOUT + assert error_message == ProcessingErrorMessage.PROCESSING_TIMEOUT + + # Test RuntimeError with preprocessing keywords -> PREPROCESSING_FAILED + error_code, error_message = service._categorize_processing_error(RuntimeError("Preprocessing failed")) + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR + assert error_message == ProcessingErrorMessage.PREPROCESSING_FAILED + + # Test RuntimeError with size keywords -> SIZE_ANALYSIS_FAILED + error_code, error_message = service._categorize_processing_error(RuntimeError("Size analysis failed")) + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR + assert error_message == ProcessingErrorMessage.SIZE_ANALYSIS_FAILED + + # Test RuntimeError with unknown content -> UNKNOWN_ERROR + error_code, error_message = service._categorize_processing_error(RuntimeError("Something unknown happened")) + assert error_code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR + assert error_message == ProcessingErrorMessage.UNKNOWN_ERROR + + # Test generic exception -> UNKNOWN_ERROR + error_code, error_message = service._categorize_processing_error(Exception("Generic error")) + assert error_code == ProcessingErrorCode.UNKNOWN + assert error_message == ProcessingErrorMessage.UNKNOWN_ERROR + + @patch("launchpad.service.SentryClient") + def test_update_artifact_error_success(self, mock_sentry_client_class): + """Test that _update_artifact_error successfully updates artifact with error.""" + mock_sentry_client = Mock() + mock_sentry_client_class.return_value = mock_sentry_client + mock_sentry_client.update_artifact.return_value = {"success": True} + + self.service._update_artifact_error( + mock_sentry_client, + "test-artifact-id", + "test-project-id", + "test-org-id", + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.PREPROCESSING_FAILED, + ) + + mock_sentry_client.update_artifact.assert_called_once_with( + org="test-org-id", + project="test-project-id", + artifact_id="test-artifact-id", + data={ + "error_code": ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR.value, + "error_message": ProcessingErrorMessage.PREPROCESSING_FAILED.value, + }, + ) + + @patch("launchpad.service.SentryClient") + def test_update_artifact_error_failure(self, mock_sentry_client_class): + """Test that _update_artifact_error handles update failures gracefully.""" + mock_sentry_client = Mock() + mock_sentry_client_class.return_value = mock_sentry_client + mock_sentry_client.update_artifact.return_value = {"error": "Update failed"} + + # Should not raise an exception + self.service._update_artifact_error( + mock_sentry_client, + "test-artifact-id", + "test-project-id", + "test-org-id", + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.PREPROCESSING_FAILED, + ) + + mock_sentry_client.update_artifact.assert_called_once() + + @patch("launchpad.service.SentryClient") + def test_update_artifact_error_exception(self, mock_sentry_client_class): + """Test that _update_artifact_error handles exceptions gracefully.""" + mock_sentry_client = Mock() + mock_sentry_client_class.return_value = mock_sentry_client + mock_sentry_client.update_artifact.side_effect = Exception("Network error") + + # Should not raise an exception + self.service._update_artifact_error( + mock_sentry_client, + "test-artifact-id", + "test-project-id", + "test-org-id", + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.PREPROCESSING_FAILED, + ) + + mock_sentry_client.update_artifact.assert_called_once() + + @patch("launchpad.service.SentryClient") + def test_update_artifact_error_with_detailed_message(self, mock_sentry_client): + """Test that _update_artifact_error uses detailed error message when provided.""" + service = LaunchpadService() + service._statsd = Mock() + + mock_client = Mock() + mock_client.update_artifact.return_value = {"success": True} + mock_sentry_client.return_value = mock_client + + detailed_error = "Failed to parse Info.plist: [Errno 2] No such file or directory" + + service._update_artifact_error( + mock_client, + "test_artifact_id", + "test_project_id", + "test_org_id", + ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR, + ProcessingErrorMessage.PREPROCESSING_FAILED, + detailed_error, + ) + + # Verify that the detailed error message is used instead of the enum value + expected_error_message = f"{ProcessingErrorMessage.PREPROCESSING_FAILED.value}: {detailed_error}" + mock_client.update_artifact.assert_called_once_with( + org="test_org_id", + project="test_project_id", + artifact_id="test_artifact_id", + data={ + "error_code": ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR.value, + "error_message": expected_error_message, + }, + ) + + # Verify datadog logging + service._statsd.increment.assert_called_once_with( + "launchpad.artifact.processing.error", + tags=[ + "error_code:3", + "error_type:PREPROCESSING_FAILED", + "project_id:test_project_id", + "organization_id:test_org_id", + ], + ) + + def test_processing_error_message_enum_values(self): + """Test that ProcessingErrorMessage enum has expected values.""" + # Test that all enum values are strings + for error_message in ProcessingErrorMessage: + assert isinstance(error_message.value, str) + assert len(error_message.value) > 0 + + # Test some specific values + assert ProcessingErrorMessage.DOWNLOAD_FAILED.value == "Failed to download artifact from Sentry" + assert ProcessingErrorMessage.PREPROCESSING_FAILED.value == "Failed to extract basic app information" + assert ProcessingErrorMessage.SIZE_ANALYSIS_FAILED.value == "Failed to perform size analysis" + assert ProcessingErrorMessage.UNKNOWN_ERROR.value == "An unknown error occurred"