-
-
Notifications
You must be signed in to change notification settings - Fork 3
Wire up end to end call path (kafka -> monolith endpoints) #142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
dd3bea7
8ccc467
fbf1baf
b05d194
7246703
f06ca0c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,10 +3,13 @@ | |
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import json | ||
| import os | ||
| import signal | ||
| import tempfile | ||
| import time | ||
|
|
||
| from pathlib import Path | ||
| from typing import Any, Dict, cast | ||
|
|
||
| from arroyo.backends.kafka import KafkaPayload | ||
|
|
@@ -15,6 +18,15 @@ | |
| PreprodArtifactEvents, | ||
| ) | ||
|
|
||
| from launchpad.artifacts.android.aab import AAB | ||
| from launchpad.artifacts.android.zipped_aab import ZippedAAB | ||
| from launchpad.artifacts.artifact_factory import ArtifactFactory | ||
| from launchpad.sentry_client import SentryClient, categorize_http_error | ||
| from launchpad.size.analyzers.android import AndroidAnalyzer | ||
| from launchpad.size.analyzers.apple import AppleAppAnalyzer | ||
| from launchpad.size.models.android import AndroidAppInfo | ||
| from launchpad.size.models.apple import AppleAppInfo | ||
| from launchpad.size.runner import do_preprocess, do_size | ||
| from launchpad.utils.logging import get_logger | ||
| from launchpad.utils.statsd import DogStatsd, get_statsd | ||
|
|
||
|
|
@@ -26,6 +38,11 @@ | |
| # Health check threshold - consider unhealthy if file not touched in 60 seconds | ||
| HEALTHCHECK_MAX_AGE_SECONDS = 60.0 | ||
|
|
||
| # Artifact type constants | ||
| ARTIFACT_TYPE_XCARCHIVE = 0 | ||
| ARTIFACT_TYPE_AAB = 1 | ||
| ARTIFACT_TYPE_APK = 2 | ||
|
|
||
|
|
||
| class LaunchpadService: | ||
| """Main service that orchestrates HTTP server and Kafka consumer.""" | ||
|
|
@@ -37,11 +54,15 @@ def __init__(self) -> None: | |
| self._kafka_task: asyncio.Future[Any] | None = None | ||
| self._statsd: DogStatsd | None = None | ||
| self._healthcheck_file: str | None = None | ||
| self._service_config: Dict[str, Any] | None = None | ||
|
|
||
| async def setup(self) -> None: | ||
| """Set up the service components.""" | ||
| service_config = get_service_config() | ||
| self._statsd = get_statsd(host=service_config["statsd_host"], port=service_config["statsd_port"]) | ||
| self._service_config = get_service_config() | ||
| self._statsd = get_statsd( | ||
| host=self._service_config["statsd_host"], | ||
| port=self._service_config["statsd_port"], | ||
| ) | ||
|
|
||
| # Setup HTTP server with health check callback | ||
| server_config = get_server_config() | ||
|
|
@@ -64,6 +85,29 @@ async def setup(self) -> None: | |
|
|
||
| logger.info("Service components initialized") | ||
|
|
||
| async def start(self) -> None: | ||
| """Start all service components.""" | ||
| if not self.server or not self.kafka_processor: | ||
| raise RuntimeError("Service not properly initialized. Call setup() first.") | ||
|
|
||
| logger.info("Starting Launchpad service...") | ||
|
|
||
| # Set up signal handlers for graceful shutdown | ||
| self._setup_signal_handlers() | ||
|
|
||
| # Start Kafka processor in a background thread | ||
| loop = asyncio.get_event_loop() | ||
| self._kafka_task = loop.run_in_executor(None, self.kafka_processor.run) | ||
|
|
||
| # Start HTTP server as a background task | ||
| server_task = asyncio.create_task(self.server.start()) | ||
| logger.info("Launchpad service started successfully") | ||
|
|
||
| try: | ||
| await self._shutdown_event.wait() | ||
| finally: | ||
| await self._cleanup(server_task) | ||
|
|
||
| def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None: | ||
| """ | ||
| Handle incoming Kafka messages. | ||
|
|
@@ -78,58 +122,187 @@ def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None: | |
| if self._statsd: | ||
| self._statsd.increment("launchpad.artifact.processing.started") | ||
|
|
||
| # TODO: Implement actual analysis logic | ||
| # This will need to: | ||
| # 1. Fetch the artifact using artifact_id from storage/API | ||
| # 2. Determine platform by examining the artifact | ||
| # 3. Run appropriate analyzer (iOS/Android) | ||
| # 4. Store results | ||
| # Perform the actual artifact analysis | ||
| self.process_artifact_analysis(artifact_id, project_id, organization_id) | ||
|
|
||
| # For now, just log | ||
| logger.info(f"Analysis completed for artifact {artifact_id} (stub)") | ||
| logger.info(f"Analysis completed for artifact {artifact_id}") | ||
|
|
||
| if self._statsd: | ||
| self._statsd.increment("launchpad.artifact.processing.completed") | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Analysis failed for artifact {artifact_id}: {e}", exc_info=True) | ||
| # Log the full error for debugging | ||
| logger.error( | ||
| f"Failed to process artifact {artifact_id} (project: {project_id}, org: {organization_id}): {e}", | ||
| exc_info=True, | ||
| ) | ||
|
|
||
| if self._statsd: | ||
| self._statsd.increment("launchpad.artifact.processing.failed") | ||
| # Re-raise to let Arroyo handle the error (can be configured for DLQ) | ||
| raise | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't re-raise - let the consumer continue processing other messages |
||
|
|
||
| async def start(self) -> None: | ||
| """Start all service components.""" | ||
| if not self.server or not self.kafka_processor: | ||
| def process_artifact_analysis(self, artifact_id: str, project_id: str, organization_id: str) -> None: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once we have more than size analysis, we can move a lot of this code into product specific files (ex. size.py . snapshots.py , etc) but for now I figure its ok to just have it all in this file |
||
| """ | ||
| Download artifact and perform size analysis. | ||
| """ | ||
| if not self._service_config: | ||
| raise RuntimeError("Service not properly initialized. Call setup() first.") | ||
|
|
||
| logger.info("Starting Launchpad service...") | ||
| sentry_client = SentryClient(base_url=self._service_config["sentry_base_url"]) | ||
|
|
||
| # Set up signal handlers for graceful shutdown | ||
| self._setup_signal_handlers() | ||
| file_content, _ = self._download_artifact(sentry_client, artifact_id, project_id, organization_id) | ||
| temp_file = self._save_to_temp_file(file_content, artifact_id) | ||
|
|
||
| # Start Kafka processor in a background thread | ||
| loop = asyncio.get_event_loop() | ||
| self._kafka_task = loop.run_in_executor(None, self.kafka_processor.run) | ||
| try: | ||
| artifact = ArtifactFactory.from_path(Path(temp_file)) | ||
| logger.info(f"Running preprocessing on {temp_file}...") | ||
| app_info = do_preprocess(Path(temp_file)) | ||
| logger.info(f"Preprocessing completed for artifact {artifact_id}") | ||
| update_data = self._prepare_update_data(app_info, artifact) | ||
|
|
||
| logger.info(f"Sending preprocessed info to Sentry for artifact {artifact_id}...") | ||
|
|
||
| update_result = sentry_client.update_artifact( | ||
| org=organization_id, | ||
| project=project_id, | ||
| artifact_id=artifact_id, | ||
| data=update_data, | ||
| ) | ||
|
|
||
| if "error" in update_result: | ||
| logger.error(f"Failed to send preprocessed info: {update_result['error']}") | ||
| else: | ||
| logger.info(f"Successfully sent preprocessed info for artifact {artifact_id}") | ||
|
|
||
| analyzer = self._create_analyzer(app_info) | ||
| logger.info(f"Running full analysis on {temp_file}...") | ||
|
NicoHinderling marked this conversation as resolved.
|
||
| results = do_size(Path(temp_file), analyzer=analyzer) | ||
| logger.info(f"Size analysis completed for artifact {artifact_id}") | ||
| self._upload_results(sentry_client, results, artifact_id, project_id, organization_id) | ||
|
|
||
| # Start HTTP server as a background task | ||
| server_task = asyncio.create_task(self.server.start()) | ||
| finally: | ||
| self._safe_cleanup(temp_file, "temporary file") | ||
|
|
||
| def _download_artifact( | ||
| self, | ||
| sentry_client: SentryClient, | ||
| artifact_id: str, | ||
| project_id: str, | ||
| organization_id: str, | ||
| ) -> tuple[bytes, int]: | ||
| """Download artifact from Sentry and validate response.""" | ||
| logger.info(f"Downloading artifact {artifact_id}...") | ||
| download_result = sentry_client.download_artifact( | ||
| org=organization_id, project=project_id, artifact_id=artifact_id | ||
| ) | ||
|
|
||
| logger.info("Launchpad service started successfully") | ||
| if "error" in download_result: | ||
| error_category, error_description = categorize_http_error(download_result) | ||
| raise RuntimeError(f"Failed to download artifact ({error_category}): {error_description}") | ||
|
|
||
| if not download_result.get("success"): | ||
| raise RuntimeError(f"Download was not successful: {download_result}") | ||
|
|
||
| file_content = download_result["file_content"] | ||
| file_size = download_result["file_size_bytes"] | ||
|
|
||
| logger.info(f"Downloaded artifact {artifact_id}: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)") | ||
| return file_content, file_size | ||
|
|
||
| def _save_to_temp_file(self, file_content: bytes, artifact_id: str) -> str: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The handling of tempfile is a little odd. Ideally we would use the built in cleanup behaviour of tempfile rather than manually cleaning up ourselves. See comment at use. |
||
| """Save file content to temporary file and return path.""" | ||
| with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tf: | ||
| tf.write(file_content) | ||
| tf.flush() | ||
| temp_file = tf.name | ||
|
|
||
| logger.info(f"Saved artifact to temporary file: {temp_file}") | ||
| return temp_file | ||
|
|
||
| def _prepare_update_data(self, app_info: AppleAppInfo | AndroidAppInfo, artifact: Any) -> Dict[str, Any]: | ||
| """Prepare update data based on app platform and artifact type.""" | ||
| if isinstance(app_info, AppleAppInfo): | ||
| return { | ||
| "build_version": app_info.version, | ||
| "build_number": (int(app_info.build) if str(app_info.build).isdigit() else app_info.build), | ||
| "artifact_type": ARTIFACT_TYPE_XCARCHIVE, | ||
| "apple_app_info": { | ||
| "is_simulator": app_info.is_simulator, | ||
| "codesigning_type": app_info.codesigning_type, | ||
| "profile_name": app_info.profile_name, | ||
| "is_code_signature_valid": app_info.is_code_signature_valid, | ||
| "code_signature_errors": app_info.code_signature_errors, | ||
| }, | ||
| } | ||
| elif isinstance(app_info, AndroidAppInfo): | ||
| artifact_type = ARTIFACT_TYPE_AAB if isinstance(artifact, (AAB, ZippedAAB)) else ARTIFACT_TYPE_APK | ||
| return { | ||
| "build_version": app_info.version, | ||
| "build_number": (int(app_info.build) if app_info.build.isdigit() else None), | ||
| "artifact_type": artifact_type, | ||
| } | ||
| else: | ||
| raise ValueError(f"Unsupported app_info type: {type(app_info)}") | ||
|
|
||
| def _create_analyzer(self, app_info: AppleAppInfo | AndroidAppInfo) -> AndroidAnalyzer | AppleAppAnalyzer: | ||
| """Create analyzer with preprocessed app info.""" | ||
| if isinstance(app_info, AndroidAppInfo): | ||
| analyzer = AndroidAnalyzer() | ||
| analyzer.app_info = app_info | ||
| return analyzer | ||
| else: # AppleAppInfo | ||
| analyzer = AppleAppAnalyzer() | ||
| analyzer.app_info = app_info | ||
| return analyzer | ||
|
|
||
| def _upload_results( | ||
| self, | ||
| sentry_client: SentryClient, | ||
| results: Any, | ||
| artifact_id: str, | ||
| project_id: str, | ||
| organization_id: str, | ||
| ) -> None: | ||
| """Upload analysis results to Sentry.""" | ||
| with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as af: | ||
| json.dump(results.to_dict(), af, indent=2) | ||
| analysis_file = af.name | ||
|
|
||
| logger.info(f"Analysis results written to temporary file: {analysis_file}") | ||
|
|
||
| try: | ||
| # Wait for shutdown signal | ||
| await self._shutdown_event.wait() | ||
| logger.info(f"Uploading analysis results for artifact {artifact_id}...") | ||
| upload_result = sentry_client.upload_size_analysis_file( | ||
| org=organization_id, | ||
| project=project_id, | ||
| artifact_id=artifact_id, | ||
| file_path=analysis_file, | ||
| ) | ||
|
|
||
| if "error" in upload_result: | ||
| logger.error(f"Failed to upload analysis results: {upload_result['error']}") | ||
| else: | ||
| logger.info(f"Successfully uploaded analysis results for artifact {artifact_id}") | ||
|
|
||
| finally: | ||
| # Cleanup | ||
| await self._cleanup(server_task) | ||
| self._safe_cleanup(analysis_file, "analysis file") | ||
|
|
||
| def _safe_cleanup(self, file_path: str, description: str) -> None: | ||
| """Safely clean up a file with error handling.""" | ||
| if file_path and os.path.exists(file_path): | ||
| try: | ||
| os.remove(file_path) | ||
| logger.debug(f"Cleaned up {description}: {file_path}") | ||
| except Exception as e: | ||
| logger.warning(f"Failed to clean up {description} {file_path}: {e}") | ||
|
|
||
| def _setup_signal_handlers(self) -> None: | ||
| """Set up signal handlers for graceful shutdown.""" | ||
|
|
||
| def signal_handler(signum: int, frame: Any) -> None: | ||
| if self._shutdown_event.is_set(): | ||
| logger.info(f"Received signal {signum} during shutdown, ignoring...") | ||
| logger.info(f"Received signal {signum} during shutdown, forcing exit...") | ||
| # Force exit if we get a second signal | ||
| os._exit(1) | ||
| return | ||
|
|
||
| logger.info(f"Received signal {signum}, initiating shutdown...") | ||
|
|
@@ -248,6 +421,7 @@ def get_service_config() -> Dict[str, Any]: | |
| """Get service configuration from environment.""" | ||
| statsd_host = os.getenv("STATSD_HOST", "127.0.0.1") | ||
| statsd_port_str = os.getenv("STATSD_PORT", "8125") | ||
| sentry_base_url = os.getenv("SENTRY_BASE_URL") | ||
|
|
||
| try: | ||
| statsd_port = int(statsd_port_str) | ||
|
|
@@ -257,6 +431,7 @@ def get_service_config() -> Dict[str, Any]: | |
| return { | ||
| "statsd_host": statsd_host, | ||
| "statsd_port": statsd_port, | ||
| "sentry_base_url": sentry_base_url, | ||
| } | ||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debatably helpful, can remove if ppl feel strongly