Skip to content

Commit dd3bea7

Browse files
Wire up end to end call path (kafka -> monolith endpoints)
1 parent 40b3763 commit dd3bea7

File tree

5 files changed

+334
-53
lines changed

5 files changed

+334
-53
lines changed

src/launchpad/sentry_client.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,3 +302,45 @@ def _create_multipart_body(self, boundary: str, filename: str, data: bytes) -> b
302302
]
303303

304304
return b"".join(parts)
305+
306+
307+
def categorize_http_error(error_result: Dict[str, Any]) -> tuple[str, str]:
308+
"""
309+
Categorize HTTP error results from SentryClient.
310+
311+
Returns:
312+
Tuple of (error_category, error_description)
313+
Categories: "not_found", "server_error", "client_error", "unknown"
314+
"""
315+
# First try to get the structured status code
316+
status_code = error_result.get("status_code")
317+
if isinstance(status_code, int):
318+
if status_code == 404:
319+
return "not_found", f"Resource not found (HTTP {status_code})"
320+
elif 500 <= status_code < 600:
321+
return "server_error", f"Server error (HTTP {status_code})"
322+
elif 400 <= status_code < 500:
323+
return "client_error", f"Client error (HTTP {status_code})"
324+
else:
325+
return "unknown", f"Unexpected HTTP status {status_code}"
326+
327+
# Fallback to parsing the error message string
328+
error_msg = error_result.get("error", "")
329+
if isinstance(error_msg, str):
330+
# Extract HTTP status code from error message like "HTTP 404"
331+
match = re.search(r"HTTP (\d+)", error_msg)
332+
if match:
333+
try:
334+
status_code = int(match.group(1))
335+
if status_code == 404:
336+
return "not_found", f"Resource not found (HTTP {status_code})"
337+
elif 500 <= status_code < 600:
338+
return "server_error", f"Server error (HTTP {status_code})"
339+
elif 400 <= status_code < 500:
340+
return "client_error", f"Client error (HTTP {status_code})"
341+
else:
342+
return "unknown", f"Unexpected HTTP status {status_code}"
343+
except ValueError:
344+
pass
345+
346+
return "unknown", f"Unknown error: {error_result}"

src/launchpad/service.py

Lines changed: 205 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
from __future__ import annotations
44

55
import asyncio
6+
import json
67
import os
78
import signal
9+
import tempfile
810
import time
911

12+
from pathlib import Path
1013
from typing import Any, Dict, cast
1114

1215
from arroyo.backends.kafka import KafkaPayload
@@ -15,6 +18,15 @@
1518
PreprodArtifactEvents,
1619
)
1720

21+
from launchpad.artifacts.android.aab import AAB
22+
from launchpad.artifacts.android.zipped_aab import ZippedAAB
23+
from launchpad.artifacts.artifact_factory import ArtifactFactory
24+
from launchpad.sentry_client import SentryClient, categorize_http_error
25+
from launchpad.size.analyzers.android import AndroidAnalyzer
26+
from launchpad.size.analyzers.apple import AppleAppAnalyzer
27+
from launchpad.size.models.android import AndroidAppInfo
28+
from launchpad.size.models.apple import AppleAppInfo
29+
from launchpad.size.runner import do_preprocess, do_size
1830
from launchpad.utils.logging import get_logger
1931
from launchpad.utils.statsd import DogStatsd, get_statsd
2032

@@ -26,6 +38,11 @@
2638
# Health check threshold - consider unhealthy if file not touched in 60 seconds
2739
HEALTHCHECK_MAX_AGE_SECONDS = 60.0
2840

41+
# Artifact type constants
42+
ARTIFACT_TYPE_XCARCHIVE = 0
43+
ARTIFACT_TYPE_AAB = 1
44+
ARTIFACT_TYPE_APK = 2
45+
2946

3047
class LaunchpadService:
3148
"""Main service that orchestrates HTTP server and Kafka consumer."""
@@ -37,11 +54,15 @@ def __init__(self) -> None:
3754
self._kafka_task: asyncio.Future[Any] | None = None
3855
self._statsd: DogStatsd | None = None
3956
self._healthcheck_file: str | None = None
57+
self._service_config: Dict[str, Any] | None = None
4058

4159
async def setup(self) -> None:
4260
"""Set up the service components."""
43-
service_config = get_service_config()
44-
self._statsd = get_statsd(host=service_config["statsd_host"], port=service_config["statsd_port"])
61+
self._service_config = get_service_config()
62+
self._statsd = get_statsd(
63+
host=self._service_config["statsd_host"],
64+
port=self._service_config["statsd_port"],
65+
)
4566

4667
# Setup HTTP server with health check callback
4768
server_config = get_server_config()
@@ -64,6 +85,29 @@ async def setup(self) -> None:
6485

6586
logger.info("Service components initialized")
6687

88+
async def start(self) -> None:
89+
"""Start all service components."""
90+
if not self.server or not self.kafka_processor:
91+
raise RuntimeError("Service not properly initialized. Call setup() first.")
92+
93+
logger.info("Starting Launchpad service...")
94+
95+
# Set up signal handlers for graceful shutdown
96+
self._setup_signal_handlers()
97+
98+
# Start Kafka processor in a background thread
99+
loop = asyncio.get_event_loop()
100+
self._kafka_task = loop.run_in_executor(None, self.kafka_processor.run)
101+
102+
# Start HTTP server as a background task
103+
server_task = asyncio.create_task(self.server.start())
104+
logger.info("Launchpad service started successfully")
105+
106+
try:
107+
await self._shutdown_event.wait()
108+
finally:
109+
await self._cleanup(server_task)
110+
67111
def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None:
68112
"""
69113
Handle incoming Kafka messages.
@@ -78,58 +122,187 @@ def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None:
78122
if self._statsd:
79123
self._statsd.increment("launchpad.artifact.processing.started")
80124

81-
# TODO: Implement actual analysis logic
82-
# This will need to:
83-
# 1. Fetch the artifact using artifact_id from storage/API
84-
# 2. Determine platform by examining the artifact
85-
# 3. Run appropriate analyzer (iOS/Android)
86-
# 4. Store results
125+
# Perform the actual artifact analysis
126+
self.process_artifact_analysis(artifact_id, project_id, organization_id)
87127

88-
# For now, just log
89-
logger.info(f"Analysis completed for artifact {artifact_id} (stub)")
128+
logger.info(f"Analysis completed for artifact {artifact_id}")
90129

91130
if self._statsd:
92131
self._statsd.increment("launchpad.artifact.processing.completed")
93132

94133
except Exception as e:
95-
logger.error(f"Analysis failed for artifact {artifact_id}: {e}", exc_info=True)
134+
# Log the full error for debugging
135+
logger.error(
136+
f"Failed to process artifact {artifact_id} (project: {project_id}, org: {organization_id}): {e}",
137+
exc_info=True,
138+
)
139+
96140
if self._statsd:
97141
self._statsd.increment("launchpad.artifact.processing.failed")
98-
# Re-raise to let Arroyo handle the error (can be configured for DLQ)
99-
raise
100142

101-
async def start(self) -> None:
102-
"""Start all service components."""
103-
if not self.server or not self.kafka_processor:
143+
def process_artifact_analysis(self, artifact_id: str, project_id: str, organization_id: str) -> None:
144+
"""
145+
Download artifact and perform size analysis.
146+
"""
147+
if not self._service_config:
104148
raise RuntimeError("Service not properly initialized. Call setup() first.")
105149

106-
logger.info("Starting Launchpad service...")
150+
sentry_client = SentryClient(base_url=self._service_config["sentry_base_url"])
107151

108-
# Set up signal handlers for graceful shutdown
109-
self._setup_signal_handlers()
152+
file_content, _ = self._download_artifact(sentry_client, artifact_id, project_id, organization_id)
153+
temp_file = self._save_to_temp_file(file_content, artifact_id)
110154

111-
# Start Kafka processor in a background thread
112-
loop = asyncio.get_event_loop()
113-
self._kafka_task = loop.run_in_executor(None, self.kafka_processor.run)
155+
try:
156+
artifact = ArtifactFactory.from_path(Path(temp_file))
157+
logger.info(f"Running preprocessing on {temp_file}...")
158+
app_info = do_preprocess(Path(temp_file))
159+
logger.info(f"Preprocessing completed for artifact {artifact_id}")
160+
update_data = self._prepare_update_data(app_info, artifact)
161+
162+
logger.info(f"Sending preprocessed info to Sentry for artifact {artifact_id}...")
163+
164+
update_result = sentry_client.update_artifact(
165+
org=organization_id,
166+
project=project_id,
167+
artifact_id=artifact_id,
168+
data=update_data,
169+
)
170+
171+
if "error" in update_result:
172+
logger.error(f"Failed to send preprocessed info: {update_result['error']}")
173+
else:
174+
logger.info(f"Successfully sent preprocessed info for artifact {artifact_id}")
175+
176+
analyzer = self._create_analyzer(app_info)
177+
logger.info(f"Running full analysis on {temp_file}...")
178+
results = do_size(Path(temp_file), analyzer=analyzer)
179+
logger.info(f"Size analysis completed for artifact {artifact_id}")
180+
self._upload_results(sentry_client, results, artifact_id, project_id, organization_id)
114181

115-
# Start HTTP server as a background task
116-
server_task = asyncio.create_task(self.server.start())
182+
finally:
183+
self._safe_cleanup(temp_file, "temporary file")
184+
185+
def _download_artifact(
186+
self,
187+
sentry_client: SentryClient,
188+
artifact_id: str,
189+
project_id: str,
190+
organization_id: str,
191+
) -> tuple[bytes, int]:
192+
"""Download artifact from Sentry and validate response."""
193+
logger.info(f"Downloading artifact {artifact_id}...")
194+
download_result = sentry_client.download_artifact(
195+
org=organization_id, project=project_id, artifact_id=artifact_id
196+
)
117197

118-
logger.info("Launchpad service started successfully")
198+
if "error" in download_result:
199+
error_category, error_description = categorize_http_error(download_result)
200+
raise RuntimeError(f"Failed to download artifact ({error_category}): {error_description}")
201+
202+
if not download_result.get("success"):
203+
raise RuntimeError(f"Download was not successful: {download_result}")
204+
205+
file_content = download_result["file_content"]
206+
file_size = download_result["file_size_bytes"]
207+
208+
logger.info(f"Downloaded artifact {artifact_id}: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)")
209+
return file_content, file_size
210+
211+
def _save_to_temp_file(self, file_content: bytes, artifact_id: str) -> str:
212+
"""Save file content to temporary file and return path."""
213+
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tf:
214+
tf.write(file_content)
215+
tf.flush()
216+
temp_file = tf.name
217+
218+
logger.info(f"Saved artifact to temporary file: {temp_file}")
219+
return temp_file
220+
221+
def _prepare_update_data(self, app_info: AppleAppInfo | AndroidAppInfo, artifact: Any) -> Dict[str, Any]:
222+
"""Prepare update data based on app platform and artifact type."""
223+
if isinstance(app_info, AppleAppInfo):
224+
return {
225+
"build_version": app_info.version,
226+
"build_number": (int(app_info.build) if str(app_info.build).isdigit() else app_info.build),
227+
"artifact_type": ARTIFACT_TYPE_XCARCHIVE,
228+
"apple_app_info": {
229+
"is_simulator": app_info.is_simulator,
230+
"codesigning_type": app_info.codesigning_type,
231+
"profile_name": app_info.profile_name,
232+
"is_code_signature_valid": app_info.is_code_signature_valid,
233+
"code_signature_errors": app_info.code_signature_errors,
234+
},
235+
}
236+
elif isinstance(app_info, AndroidAppInfo):
237+
artifact_type = ARTIFACT_TYPE_AAB if isinstance(artifact, (AAB, ZippedAAB)) else ARTIFACT_TYPE_APK
238+
return {
239+
"build_version": app_info.version,
240+
"build_number": (int(app_info.build) if app_info.build.isdigit() else None),
241+
"artifact_type": artifact_type,
242+
}
243+
else:
244+
raise ValueError(f"Unsupported app_info type: {type(app_info)}")
245+
246+
def _create_analyzer(self, app_info: AppleAppInfo | AndroidAppInfo) -> AndroidAnalyzer | AppleAppAnalyzer:
247+
"""Create analyzer with preprocessed app info."""
248+
if isinstance(app_info, AndroidAppInfo):
249+
analyzer = AndroidAnalyzer()
250+
analyzer.app_info = app_info
251+
return analyzer
252+
else: # AppleAppInfo
253+
analyzer = AppleAppAnalyzer()
254+
analyzer.app_info = app_info
255+
return analyzer
256+
257+
def _upload_results(
258+
self,
259+
sentry_client: SentryClient,
260+
results: Any,
261+
artifact_id: str,
262+
project_id: str,
263+
organization_id: str,
264+
) -> None:
265+
"""Upload analysis results to Sentry."""
266+
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as af:
267+
json.dump(results.to_dict(), af, indent=2)
268+
analysis_file = af.name
269+
270+
logger.info(f"Analysis results written to temporary file: {analysis_file}")
119271

120272
try:
121-
# Wait for shutdown signal
122-
await self._shutdown_event.wait()
273+
logger.info(f"Uploading analysis results for artifact {artifact_id}...")
274+
upload_result = sentry_client.upload_size_analysis_file(
275+
org=organization_id,
276+
project=project_id,
277+
artifact_id=artifact_id,
278+
file_path=analysis_file,
279+
)
280+
281+
if "error" in upload_result:
282+
logger.error(f"Failed to upload analysis results: {upload_result['error']}")
283+
else:
284+
logger.info(f"Successfully uploaded analysis results for artifact {artifact_id}")
285+
123286
finally:
124-
# Cleanup
125-
await self._cleanup(server_task)
287+
self._safe_cleanup(analysis_file, "analysis file")
288+
289+
def _safe_cleanup(self, file_path: str, description: str) -> None:
290+
"""Safely clean up a file with error handling."""
291+
if file_path and os.path.exists(file_path):
292+
try:
293+
os.remove(file_path)
294+
logger.debug(f"Cleaned up {description}: {file_path}")
295+
except Exception as e:
296+
logger.warning(f"Failed to clean up {description} {file_path}: {e}")
126297

127298
def _setup_signal_handlers(self) -> None:
128299
"""Set up signal handlers for graceful shutdown."""
129300

130301
def signal_handler(signum: int, frame: Any) -> None:
131302
if self._shutdown_event.is_set():
132-
logger.info(f"Received signal {signum} during shutdown, ignoring...")
303+
logger.info(f"Received signal {signum} during shutdown, forcing exit...")
304+
# Force exit if we get a second signal
305+
os._exit(1)
133306
return
134307

135308
logger.info(f"Received signal {signum}, initiating shutdown...")
@@ -248,6 +421,7 @@ def get_service_config() -> Dict[str, Any]:
248421
"""Get service configuration from environment."""
249422
statsd_host = os.getenv("STATSD_HOST", "127.0.0.1")
250423
statsd_port_str = os.getenv("STATSD_PORT", "8125")
424+
sentry_base_url = os.getenv("SENTRY_BASE_URL")
251425

252426
try:
253427
statsd_port = int(statsd_port_str)
@@ -257,6 +431,7 @@ def get_service_config() -> Dict[str, Any]:
257431
return {
258432
"statsd_host": statsd_host,
259433
"statsd_port": statsd_port,
434+
"sentry_base_url": sentry_base_url,
260435
}
261436

262437

0 commit comments

Comments
 (0)