Skip to content

Commit 3827165

Browse files
Wire up end to end call path (kafka -> monolith endpoints)
1 parent 40b3763 commit 3827165

File tree

5 files changed

+344
-58
lines changed

5 files changed

+344
-58
lines changed

src/launchpad/sentry_client.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,3 +302,45 @@ def _create_multipart_body(self, boundary: str, filename: str, data: bytes) -> b
302302
]
303303

304304
return b"".join(parts)
305+
306+
307+
def categorize_http_error(error_result: Dict[str, Any]) -> tuple[str, str]:
308+
"""
309+
Categorize HTTP error results from SentryClient.
310+
311+
Returns:
312+
Tuple of (error_category, error_description)
313+
Categories: "not_found", "server_error", "client_error", "unknown"
314+
"""
315+
# First try to get the structured status code
316+
status_code = error_result.get("status_code")
317+
if isinstance(status_code, int):
318+
if status_code == 404:
319+
return "not_found", f"Resource not found (HTTP {status_code})"
320+
elif 500 <= status_code < 600:
321+
return "server_error", f"Server error (HTTP {status_code})"
322+
elif 400 <= status_code < 500:
323+
return "client_error", f"Client error (HTTP {status_code})"
324+
else:
325+
return "unknown", f"Unexpected HTTP status {status_code}"
326+
327+
# Fallback to parsing the error message string
328+
error_msg = error_result.get("error", "")
329+
if isinstance(error_msg, str):
330+
# Extract HTTP status code from error message like "HTTP 404"
331+
match = re.search(r"HTTP (\d+)", error_msg)
332+
if match:
333+
try:
334+
status_code = int(match.group(1))
335+
if status_code == 404:
336+
return "not_found", f"Resource not found (HTTP {status_code})"
337+
elif 500 <= status_code < 600:
338+
return "server_error", f"Server error (HTTP {status_code})"
339+
elif 400 <= status_code < 500:
340+
return "client_error", f"Client error (HTTP {status_code})"
341+
else:
342+
return "unknown", f"Unexpected HTTP status {status_code}"
343+
except ValueError:
344+
pass
345+
346+
return "unknown", f"Unknown error: {error_result}"

src/launchpad/service.py

Lines changed: 212 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
from __future__ import annotations
44

55
import asyncio
6+
import json
67
import os
78
import signal
9+
import tempfile
810
import time
911

12+
from pathlib import Path
1013
from typing import Any, Dict, cast
1114

1215
from arroyo.backends.kafka import KafkaPayload
@@ -15,6 +18,15 @@
1518
PreprodArtifactEvents,
1619
)
1720

21+
from launchpad.artifacts.android.aab import AAB
22+
from launchpad.artifacts.android.zipped_aab import ZippedAAB
23+
from launchpad.artifacts.artifact_factory import ArtifactFactory
24+
from launchpad.sentry_client import SentryClient, categorize_http_error
25+
from launchpad.size.analyzers.android import AndroidAnalyzer
26+
from launchpad.size.analyzers.apple import AppleAppAnalyzer
27+
from launchpad.size.models.android import AndroidAppInfo
28+
from launchpad.size.models.apple import AppleAppInfo
29+
from launchpad.size.runner import do_preprocess, do_size
1830
from launchpad.utils.logging import get_logger
1931
from launchpad.utils.statsd import DogStatsd, get_statsd
2032

@@ -26,6 +38,11 @@
2638
# Health check threshold - consider unhealthy if file not touched in 60 seconds
2739
HEALTHCHECK_MAX_AGE_SECONDS = 60.0
2840

41+
# Artifact type constants
42+
ARTIFACT_TYPE_XCARCHIVE = 0
43+
ARTIFACT_TYPE_AAB = 1
44+
ARTIFACT_TYPE_APK = 2
45+
2946

3047
class LaunchpadService:
3148
"""Main service that orchestrates HTTP server and Kafka consumer."""
@@ -37,11 +54,15 @@ def __init__(self) -> None:
3754
self._kafka_task: asyncio.Future[Any] | None = None
3855
self._statsd: DogStatsd | None = None
3956
self._healthcheck_file: str | None = None
57+
self._service_config: Dict[str, Any] | None = None
4058

4159
async def setup(self) -> None:
4260
"""Set up the service components."""
43-
service_config = get_service_config()
44-
self._statsd = get_statsd(host=service_config["statsd_host"], port=service_config["statsd_port"])
61+
self._service_config = get_service_config()
62+
self._statsd = get_statsd(
63+
host=self._service_config["statsd_host"],
64+
port=self._service_config["statsd_port"],
65+
)
4566

4667
# Setup HTTP server with health check callback
4768
server_config = get_server_config()
@@ -64,6 +85,71 @@ async def setup(self) -> None:
6485

6586
logger.info("Service components initialized")
6687

88+
async def start(self) -> None:
89+
"""Start all service components."""
90+
if not self.server or not self.kafka_processor:
91+
raise RuntimeError("Service not properly initialized. Call setup() first.")
92+
93+
logger.info("Starting Launchpad service...")
94+
95+
# Set up signal handlers for graceful shutdown
96+
self._setup_signal_handlers()
97+
98+
# Start Kafka processor in a background thread
99+
loop = asyncio.get_event_loop()
100+
self._kafka_task = loop.run_in_executor(None, self.kafka_processor.run)
101+
102+
# Start HTTP server as a background task
103+
server_task = asyncio.create_task(self.server.start())
104+
logger.info("Launchpad service started successfully")
105+
106+
try:
107+
await self._shutdown_event.wait()
108+
finally:
109+
await self._cleanup(server_task)
110+
111+
def process_artifact_analysis(self, artifact_id: str, project_id: str, organization_id: str) -> None:
112+
"""
113+
Download artifact and perform size analysis.
114+
"""
115+
if not self._service_config:
116+
raise RuntimeError("Service not properly initialized. Call setup() first.")
117+
118+
sentry_client = SentryClient(base_url=self._service_config["sentry_base_url"])
119+
120+
file_content, _ = self._download_artifact(sentry_client, artifact_id, project_id, organization_id)
121+
temp_file = self._save_to_temp_file(file_content, artifact_id)
122+
123+
try:
124+
artifact = ArtifactFactory.from_path(Path(temp_file))
125+
logger.info(f"Running preprocessing on {temp_file}...")
126+
app_info = do_preprocess(Path(temp_file))
127+
logger.info(f"Preprocessing completed for artifact {artifact_id}")
128+
update_data = self._prepare_update_data(app_info, artifact)
129+
130+
logger.info(f"Sending preprocessed info to Sentry for artifact {artifact_id}...")
131+
132+
update_result = sentry_client.update_artifact(
133+
org=organization_id,
134+
project=project_id,
135+
artifact_id=artifact_id,
136+
data=update_data,
137+
)
138+
139+
if "error" in update_result:
140+
logger.error(f"Failed to send preprocessed info: {update_result['error']}")
141+
else:
142+
logger.info(f"Successfully sent preprocessed info for artifact {artifact_id}")
143+
144+
analyzer = self._create_analyzer(app_info)
145+
logger.info(f"Running full analysis on {temp_file}...")
146+
results = do_size(Path(temp_file), analyzer=analyzer)
147+
logger.info(f"Size analysis completed for artifact {artifact_id}")
148+
self._upload_results(sentry_client, results, artifact_id, project_id, organization_id)
149+
150+
finally:
151+
self._safe_cleanup(temp_file, "temporary file")
152+
67153
def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None:
68154
"""
69155
Handle incoming Kafka messages.
@@ -78,58 +164,147 @@ def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None:
78164
if self._statsd:
79165
self._statsd.increment("launchpad.artifact.processing.started")
80166

81-
# TODO: Implement actual analysis logic
82-
# This will need to:
83-
# 1. Fetch the artifact using artifact_id from storage/API
84-
# 2. Determine platform by examining the artifact
85-
# 3. Run appropriate analyzer (iOS/Android)
86-
# 4. Store results
167+
# Perform the actual artifact analysis
168+
self.process_artifact_analysis(artifact_id, project_id, organization_id)
87169

88-
# For now, just log
89-
logger.info(f"Analysis completed for artifact {artifact_id} (stub)")
170+
logger.info(f"Analysis completed for artifact {artifact_id}")
90171

91172
if self._statsd:
92173
self._statsd.increment("launchpad.artifact.processing.completed")
93174

94175
except Exception as e:
95-
logger.error(f"Analysis failed for artifact {artifact_id}: {e}", exc_info=True)
176+
# Log the full error for debugging
177+
logger.error(
178+
f"Failed to process artifact {artifact_id} (project: {project_id}, org: {organization_id}): {e}",
179+
exc_info=True,
180+
)
181+
96182
if self._statsd:
97183
self._statsd.increment("launchpad.artifact.processing.failed")
98-
# Re-raise to let Arroyo handle the error (can be configured for DLQ)
99-
raise
100-
101-
async def start(self) -> None:
102-
"""Start all service components."""
103-
if not self.server or not self.kafka_processor:
104-
raise RuntimeError("Service not properly initialized. Call setup() first.")
105-
106-
logger.info("Starting Launchpad service...")
107-
108-
# Set up signal handlers for graceful shutdown
109-
self._setup_signal_handlers()
110184

111-
# Start Kafka processor in a background thread
112-
loop = asyncio.get_event_loop()
113-
self._kafka_task = loop.run_in_executor(None, self.kafka_processor.run)
114-
115-
# Start HTTP server as a background task
116-
server_task = asyncio.create_task(self.server.start())
185+
# Don't re-raise - let the consumer continue processing other messages
186+
187+
def _download_artifact(
188+
self,
189+
sentry_client: SentryClient,
190+
artifact_id: str,
191+
project_id: str,
192+
organization_id: str,
193+
) -> tuple[bytes, int]:
194+
"""Download artifact from Sentry and validate response."""
195+
logger.info(f"Downloading artifact {artifact_id}...")
196+
download_result = sentry_client.download_artifact(
197+
org=organization_id, project=project_id, artifact_id=artifact_id
198+
)
117199

118-
logger.info("Launchpad service started successfully")
200+
if "error" in download_result:
201+
error_category, error_description = categorize_http_error(download_result)
202+
raise RuntimeError(f"Failed to download artifact ({error_category}): {error_description}")
203+
204+
if not download_result.get("success"):
205+
raise RuntimeError(f"Download was not successful: {download_result}")
206+
207+
file_content = download_result["file_content"]
208+
file_size = download_result["file_size_bytes"]
209+
210+
logger.info(f"Downloaded artifact {artifact_id}: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)")
211+
return file_content, file_size
212+
213+
def _save_to_temp_file(self, file_content: bytes, artifact_id: str) -> str:
214+
"""Save file content to temporary file and return path."""
215+
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tf:
216+
tf.write(file_content)
217+
tf.flush()
218+
temp_file = tf.name
219+
220+
logger.info(f"Saved artifact to temporary file: {temp_file}")
221+
return temp_file
222+
223+
def _prepare_update_data(self, app_info: AppleAppInfo | AndroidAppInfo, artifact: Any) -> Dict[str, Any]:
224+
"""Prepare update data based on app platform and artifact type."""
225+
if isinstance(app_info, AppleAppInfo):
226+
return {
227+
"build_version": app_info.version,
228+
"build_number": (int(app_info.build) if str(app_info.build).isdigit() else app_info.build),
229+
"artifact_type": ARTIFACT_TYPE_XCARCHIVE,
230+
"apple_app_info": {
231+
"is_simulator": app_info.is_simulator,
232+
"codesigning_type": app_info.codesigning_type,
233+
"profile_name": app_info.profile_name,
234+
"is_code_signature_valid": app_info.is_code_signature_valid,
235+
"code_signature_errors": app_info.code_signature_errors,
236+
},
237+
}
238+
elif isinstance(app_info, AndroidAppInfo):
239+
artifact_type = ARTIFACT_TYPE_AAB if isinstance(artifact, (AAB, ZippedAAB)) else ARTIFACT_TYPE_APK
240+
return {
241+
"build_version": app_info.version,
242+
"build_number": (int(app_info.build) if app_info.build.isdigit() else None),
243+
"artifact_type": artifact_type,
244+
}
245+
else:
246+
raise ValueError(f"Unsupported app_info type: {type(app_info)}")
247+
248+
def _create_analyzer(self, app_info: AppleAppInfo | AndroidAppInfo) -> AndroidAnalyzer | AppleAppAnalyzer:
249+
"""Create analyzer with preprocessed app info."""
250+
if isinstance(app_info, AndroidAppInfo):
251+
analyzer = AndroidAnalyzer()
252+
analyzer.app_info = app_info
253+
return analyzer
254+
else: # AppleAppInfo
255+
analyzer = AppleAppAnalyzer()
256+
analyzer.app_info = app_info
257+
return analyzer
258+
259+
def _upload_results(
260+
self,
261+
sentry_client: SentryClient,
262+
results: Any,
263+
artifact_id: str,
264+
project_id: str,
265+
organization_id: str,
266+
) -> None:
267+
"""Upload analysis results to Sentry."""
268+
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as af:
269+
json.dump(results.to_dict(), af, indent=2)
270+
analysis_file = af.name
271+
272+
logger.info(f"Analysis results written to temporary file: {analysis_file}")
119273

120274
try:
121-
# Wait for shutdown signal
122-
await self._shutdown_event.wait()
275+
logger.info(f"Uploading analysis results for artifact {artifact_id}...")
276+
upload_result = sentry_client.upload_size_analysis_file(
277+
org=organization_id,
278+
project=project_id,
279+
artifact_id=artifact_id,
280+
file_path=analysis_file,
281+
)
282+
283+
if "error" in upload_result:
284+
logger.error(f"Failed to upload analysis results: {upload_result['error']}")
285+
else:
286+
logger.info(f"Successfully uploaded analysis results for artifact {artifact_id}")
287+
123288
finally:
124-
# Cleanup
125-
await self._cleanup(server_task)
289+
self._safe_cleanup(analysis_file, "analysis file")
290+
291+
def _safe_cleanup(self, file_path: str, description: str) -> None:
292+
"""Safely clean up a file with error handling."""
293+
if file_path and os.path.exists(file_path):
294+
try:
295+
os.remove(file_path)
296+
logger.debug(f"Cleaned up {description}: {file_path}")
297+
except Exception as e:
298+
logger.warning(f"Failed to clean up {description} {file_path}: {e}")
126299

127300
def _setup_signal_handlers(self) -> None:
128301
"""Set up signal handlers for graceful shutdown."""
129302

130303
def signal_handler(signum: int, frame: Any) -> None:
131304
if self._shutdown_event.is_set():
132-
logger.info(f"Received signal {signum} during shutdown, ignoring...")
305+
logger.info(f"Received signal {signum} during shutdown, forcing exit...")
306+
# Force exit if we get a second signal
307+
os._exit(1)
133308
return
134309

135310
logger.info(f"Received signal {signum}, initiating shutdown...")
@@ -248,6 +423,7 @@ def get_service_config() -> Dict[str, Any]:
248423
"""Get service configuration from environment."""
249424
statsd_host = os.getenv("STATSD_HOST", "127.0.0.1")
250425
statsd_port_str = os.getenv("STATSD_PORT", "8125")
426+
sentry_base_url = os.getenv("SENTRY_BASE_URL")
251427

252428
try:
253429
statsd_port = int(statsd_port_str)
@@ -257,6 +433,7 @@ def get_service_config() -> Dict[str, Any]:
257433
return {
258434
"statsd_host": statsd_host,
259435
"statsd_port": statsd_port,
436+
"sentry_base_url": sentry_base_url,
260437
}
261438

262439

0 commit comments

Comments
 (0)