Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/launchpad/sentry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,45 @@ def _create_multipart_body(self, boundary: str, filename: str, data: bytes) -> b
]

return b"".join(parts)


def categorize_http_error(error_result: Dict[str, Any]) -> tuple[str, str]:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debatably helpful, can remove if ppl feel strongly

"""
Categorize HTTP error results from SentryClient.

Returns:
Tuple of (error_category, error_description)
Categories: "not_found", "server_error", "client_error", "unknown"
"""
# First try to get the structured status code
status_code = error_result.get("status_code")
if isinstance(status_code, int):
if status_code == 404:
return "not_found", f"Resource not found (HTTP {status_code})"
elif 500 <= status_code < 600:
return "server_error", f"Server error (HTTP {status_code})"
elif 400 <= status_code < 500:
return "client_error", f"Client error (HTTP {status_code})"
else:
return "unknown", f"Unexpected HTTP status {status_code}"

# Fallback to parsing the error message string
error_msg = error_result.get("error", "")
if isinstance(error_msg, str):
# Extract HTTP status code from error message like "HTTP 404"
match = re.search(r"HTTP (\d+)", error_msg)
if match:
try:
status_code = int(match.group(1))
if status_code == 404:
return "not_found", f"Resource not found (HTTP {status_code})"
elif 500 <= status_code < 600:
return "server_error", f"Server error (HTTP {status_code})"
elif 400 <= status_code < 500:
return "client_error", f"Client error (HTTP {status_code})"
else:
return "unknown", f"Unexpected HTTP status {status_code}"
except ValueError:
pass

return "unknown", f"Unknown error: {error_result}"
235 changes: 205 additions & 30 deletions src/launchpad/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
from __future__ import annotations

import asyncio
import json
import os
import signal
import tempfile
import time

from pathlib import Path
from typing import Any, Dict, cast

from arroyo.backends.kafka import KafkaPayload
Expand All @@ -15,6 +18,15 @@
PreprodArtifactEvents,
)

from launchpad.artifacts.android.aab import AAB
from launchpad.artifacts.android.zipped_aab import ZippedAAB
from launchpad.artifacts.artifact_factory import ArtifactFactory
from launchpad.sentry_client import SentryClient, categorize_http_error
from launchpad.size.analyzers.android import AndroidAnalyzer
from launchpad.size.analyzers.apple import AppleAppAnalyzer
from launchpad.size.models.android import AndroidAppInfo
from launchpad.size.models.apple import AppleAppInfo
from launchpad.size.runner import do_preprocess, do_size
from launchpad.utils.logging import get_logger
from launchpad.utils.statsd import DogStatsd, get_statsd

Expand All @@ -26,6 +38,11 @@
# Health check threshold - consider unhealthy if file not touched in 60 seconds
HEALTHCHECK_MAX_AGE_SECONDS = 60.0

# Artifact type constants
ARTIFACT_TYPE_XCARCHIVE = 0
ARTIFACT_TYPE_AAB = 1
ARTIFACT_TYPE_APK = 2


class LaunchpadService:
"""Main service that orchestrates HTTP server and Kafka consumer."""
Expand All @@ -37,11 +54,15 @@ def __init__(self) -> None:
self._kafka_task: asyncio.Future[Any] | None = None
self._statsd: DogStatsd | None = None
self._healthcheck_file: str | None = None
self._service_config: Dict[str, Any] | None = None

async def setup(self) -> None:
"""Set up the service components."""
service_config = get_service_config()
self._statsd = get_statsd(host=service_config["statsd_host"], port=service_config["statsd_port"])
self._service_config = get_service_config()
self._statsd = get_statsd(
host=self._service_config["statsd_host"],
port=self._service_config["statsd_port"],
)

# Setup HTTP server with health check callback
server_config = get_server_config()
Expand All @@ -64,6 +85,29 @@ async def setup(self) -> None:

logger.info("Service components initialized")

async def start(self) -> None:
"""Start all service components."""
if not self.server or not self.kafka_processor:
raise RuntimeError("Service not properly initialized. Call setup() first.")

logger.info("Starting Launchpad service...")

# Set up signal handlers for graceful shutdown
self._setup_signal_handlers()

# Start Kafka processor in a background thread
loop = asyncio.get_event_loop()
self._kafka_task = loop.run_in_executor(None, self.kafka_processor.run)

# Start HTTP server as a background task
server_task = asyncio.create_task(self.server.start())
logger.info("Launchpad service started successfully")

try:
await self._shutdown_event.wait()
finally:
await self._cleanup(server_task)

def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None:
"""
Handle incoming Kafka messages.
Expand All @@ -78,58 +122,187 @@ def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None:
if self._statsd:
self._statsd.increment("launchpad.artifact.processing.started")

# TODO: Implement actual analysis logic
# This will need to:
# 1. Fetch the artifact using artifact_id from storage/API
# 2. Determine platform by examining the artifact
# 3. Run appropriate analyzer (iOS/Android)
# 4. Store results
# Perform the actual artifact analysis
self.process_artifact_analysis(artifact_id, project_id, organization_id)

# For now, just log
logger.info(f"Analysis completed for artifact {artifact_id} (stub)")
logger.info(f"Analysis completed for artifact {artifact_id}")

if self._statsd:
self._statsd.increment("launchpad.artifact.processing.completed")

except Exception as e:
logger.error(f"Analysis failed for artifact {artifact_id}: {e}", exc_info=True)
# Log the full error for debugging
logger.error(
f"Failed to process artifact {artifact_id} (project: {project_id}, org: {organization_id}): {e}",
exc_info=True,
)

if self._statsd:
self._statsd.increment("launchpad.artifact.processing.failed")
# Re-raise to let Arroyo handle the error (can be configured for DLQ)
raise
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't re-raise - let the consumer continue processing other messages


async def start(self) -> None:
"""Start all service components."""
if not self.server or not self.kafka_processor:
def process_artifact_analysis(self, artifact_id: str, project_id: str, organization_id: str) -> None:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we have more than size analysis, we can move a lot of this code into product specific files (ex. size.py . snapshots.py , etc) but for now I figure its ok to just have it all in this file

"""
Download artifact and perform size analysis.
"""
if not self._service_config:
raise RuntimeError("Service not properly initialized. Call setup() first.")

logger.info("Starting Launchpad service...")
sentry_client = SentryClient(base_url=self._service_config["sentry_base_url"])

# Set up signal handlers for graceful shutdown
self._setup_signal_handlers()
file_content, _ = self._download_artifact(sentry_client, artifact_id, project_id, organization_id)
temp_file = self._save_to_temp_file(file_content, artifact_id)

# Start Kafka processor in a background thread
loop = asyncio.get_event_loop()
self._kafka_task = loop.run_in_executor(None, self.kafka_processor.run)
try:
artifact = ArtifactFactory.from_path(Path(temp_file))
logger.info(f"Running preprocessing on {temp_file}...")
app_info = do_preprocess(Path(temp_file))
logger.info(f"Preprocessing completed for artifact {artifact_id}")
update_data = self._prepare_update_data(app_info, artifact)

logger.info(f"Sending preprocessed info to Sentry for artifact {artifact_id}...")

update_result = sentry_client.update_artifact(
org=organization_id,
project=project_id,
artifact_id=artifact_id,
data=update_data,
)

if "error" in update_result:
logger.error(f"Failed to send preprocessed info: {update_result['error']}")
else:
logger.info(f"Successfully sent preprocessed info for artifact {artifact_id}")

analyzer = self._create_analyzer(app_info)
logger.info(f"Running full analysis on {temp_file}...")
Comment thread
NicoHinderling marked this conversation as resolved.
results = do_size(Path(temp_file), analyzer=analyzer)
logger.info(f"Size analysis completed for artifact {artifact_id}")
self._upload_results(sentry_client, results, artifact_id, project_id, organization_id)

# Start HTTP server as a background task
server_task = asyncio.create_task(self.server.start())
finally:
self._safe_cleanup(temp_file, "temporary file")

def _download_artifact(
self,
sentry_client: SentryClient,
artifact_id: str,
project_id: str,
organization_id: str,
) -> tuple[bytes, int]:
"""Download artifact from Sentry and validate response."""
logger.info(f"Downloading artifact {artifact_id}...")
download_result = sentry_client.download_artifact(
org=organization_id, project=project_id, artifact_id=artifact_id
)

logger.info("Launchpad service started successfully")
if "error" in download_result:
error_category, error_description = categorize_http_error(download_result)
raise RuntimeError(f"Failed to download artifact ({error_category}): {error_description}")

if not download_result.get("success"):
raise RuntimeError(f"Download was not successful: {download_result}")

file_content = download_result["file_content"]
file_size = download_result["file_size_bytes"]

logger.info(f"Downloaded artifact {artifact_id}: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)")
return file_content, file_size

def _save_to_temp_file(self, file_content: bytes, artifact_id: str) -> str:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handling of tempfile is a little odd. Ideally we would use the built in cleanup behaviour of tempfile rather than manually cleaning up ourselves. See comment at use.

"""Save file content to temporary file and return path."""
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tf:
tf.write(file_content)
tf.flush()
temp_file = tf.name

logger.info(f"Saved artifact to temporary file: {temp_file}")
return temp_file

def _prepare_update_data(self, app_info: AppleAppInfo | AndroidAppInfo, artifact: Any) -> Dict[str, Any]:
"""Prepare update data based on app platform and artifact type."""
if isinstance(app_info, AppleAppInfo):
return {
"build_version": app_info.version,
"build_number": (int(app_info.build) if str(app_info.build).isdigit() else app_info.build),
"artifact_type": ARTIFACT_TYPE_XCARCHIVE,
"apple_app_info": {
"is_simulator": app_info.is_simulator,
"codesigning_type": app_info.codesigning_type,
"profile_name": app_info.profile_name,
"is_code_signature_valid": app_info.is_code_signature_valid,
"code_signature_errors": app_info.code_signature_errors,
},
}
elif isinstance(app_info, AndroidAppInfo):
artifact_type = ARTIFACT_TYPE_AAB if isinstance(artifact, (AAB, ZippedAAB)) else ARTIFACT_TYPE_APK
return {
"build_version": app_info.version,
"build_number": (int(app_info.build) if app_info.build.isdigit() else None),
"artifact_type": artifact_type,
}
else:
raise ValueError(f"Unsupported app_info type: {type(app_info)}")

def _create_analyzer(self, app_info: AppleAppInfo | AndroidAppInfo) -> AndroidAnalyzer | AppleAppAnalyzer:
"""Create analyzer with preprocessed app info."""
if isinstance(app_info, AndroidAppInfo):
analyzer = AndroidAnalyzer()
analyzer.app_info = app_info
return analyzer
else: # AppleAppInfo
analyzer = AppleAppAnalyzer()
analyzer.app_info = app_info
return analyzer

def _upload_results(
self,
sentry_client: SentryClient,
results: Any,
artifact_id: str,
project_id: str,
organization_id: str,
) -> None:
"""Upload analysis results to Sentry."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as af:
json.dump(results.to_dict(), af, indent=2)
analysis_file = af.name

logger.info(f"Analysis results written to temporary file: {analysis_file}")

try:
# Wait for shutdown signal
await self._shutdown_event.wait()
logger.info(f"Uploading analysis results for artifact {artifact_id}...")
upload_result = sentry_client.upload_size_analysis_file(
org=organization_id,
project=project_id,
artifact_id=artifact_id,
file_path=analysis_file,
)

if "error" in upload_result:
logger.error(f"Failed to upload analysis results: {upload_result['error']}")
else:
logger.info(f"Successfully uploaded analysis results for artifact {artifact_id}")

finally:
# Cleanup
await self._cleanup(server_task)
self._safe_cleanup(analysis_file, "analysis file")

def _safe_cleanup(self, file_path: str, description: str) -> None:
"""Safely clean up a file with error handling."""
if file_path and os.path.exists(file_path):
try:
os.remove(file_path)
logger.debug(f"Cleaned up {description}: {file_path}")
except Exception as e:
logger.warning(f"Failed to clean up {description} {file_path}: {e}")

def _setup_signal_handlers(self) -> None:
"""Set up signal handlers for graceful shutdown."""

def signal_handler(signum: int, frame: Any) -> None:
if self._shutdown_event.is_set():
logger.info(f"Received signal {signum} during shutdown, ignoring...")
logger.info(f"Received signal {signum} during shutdown, forcing exit...")
# Force exit if we get a second signal
os._exit(1)
return

logger.info(f"Received signal {signum}, initiating shutdown...")
Expand Down Expand Up @@ -248,6 +421,7 @@ def get_service_config() -> Dict[str, Any]:
"""Get service configuration from environment."""
statsd_host = os.getenv("STATSD_HOST", "127.0.0.1")
statsd_port_str = os.getenv("STATSD_PORT", "8125")
sentry_base_url = os.getenv("SENTRY_BASE_URL")

try:
statsd_port = int(statsd_port_str)
Expand All @@ -257,6 +431,7 @@ def get_service_config() -> Dict[str, Any]:
return {
"statsd_host": statsd_host,
"statsd_port": statsd_port,
"sentry_base_url": sentry_base_url,
}


Expand Down
Loading
Loading