Skip to content

Wire up end to end call path (kafka -> monolith endpoints)#142

Merged
NicoHinderling merged 6 commits intomainfrom
07-08-wire_up_end_to_end_call_path_kafka_-_monolith_endpoints_
Jul 11, 2025
Merged

Wire up end to end call path (kafka -> monolith endpoints)#142
NicoHinderling merged 6 commits intomainfrom
07-08-wire_up_end_to_end_call_path_kafka_-_monolith_endpoints_

Conversation

@NicoHinderling
Copy link
Copy Markdown
Contributor

@NicoHinderling NicoHinderling commented Jul 8, 2025

A kafka message will now trigger the following flow:

  1. try to download the corresponding artifact
  2. preprocess it
  3. send the preprocess data to the monolith
  4. full size analysis
  5. send the full size analysis file to the monolith

Copy link
Copy Markdown
Contributor Author

NicoHinderling commented Jul 8, 2025

@NicoHinderling NicoHinderling force-pushed the 07-08-wire_up_end_to_end_call_path_kafka_-_monolith_endpoints_ branch from bc83d63 to 986ef83 Compare July 8, 2025 20:26
@NicoHinderling NicoHinderling force-pushed the 07-08-chore_launchpad_minor_cleanup_and_config_additions branch 2 times, most recently from 704b165 to 608a506 Compare July 8, 2025 21:14
@NicoHinderling NicoHinderling force-pushed the 07-08-wire_up_end_to_end_call_path_kafka_-_monolith_endpoints_ branch from 986ef83 to 5623c49 Compare July 8, 2025 21:14
Base automatically changed from 07-08-chore_launchpad_minor_cleanup_and_config_additions to main July 8, 2025 21:20
@NicoHinderling NicoHinderling force-pushed the 07-08-wire_up_end_to_end_call_path_kafka_-_monolith_endpoints_ branch from 5623c49 to 3827165 Compare July 8, 2025 21:41
return b"".join(parts)


def categorize_http_error(error_result: Dict[str, Any]) -> tuple[str, str]:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debatably helpful, can remove if ppl feel strongly

@NicoHinderling NicoHinderling force-pushed the 07-08-wire_up_end_to_end_call_path_kafka_-_monolith_endpoints_ branch from 3827165 to 2b7ccae Compare July 8, 2025 21:42
if self._statsd:
self._statsd.increment("launchpad.artifact.processing.failed")
# Re-raise to let Arroyo handle the error (can be configured for DLQ)
raise
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't re-raise - let the consumer continue processing other messages

@codecov
Copy link
Copy Markdown

codecov bot commented Jul 8, 2025

Codecov Report

Attention: Patch coverage is 68.40149% with 170 lines in your changes missing coverage. Please review.

Project coverage is 73.81%. Comparing base (40b3763) to head (f06ca0c).
Report is 30 commits behind head on main.

Files with missing lines Patch % Lines
src/launchpad/service.py 37.28% 109 Missing and 2 partials ⚠️
src/launchpad/sentry_client.py 41.57% 49 Missing and 3 partials ⚠️
src/launchpad/size/runner.py 60.00% 4 Missing and 2 partials ⚠️
src/launchpad/size/analyzers/android.py 85.71% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #142      +/-   ##
==========================================
+ Coverage   71.63%   73.81%   +2.17%     
==========================================
  Files          93      106      +13     
  Lines        7320     8399    +1079     
  Branches      860      958      +98     
==========================================
+ Hits         5244     6200     +956     
- Misses       1724     1828     +104     
- Partials      352      371      +19     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

if not self.app_info:
self.app_info = self.preprocess(artifact)

app_info = self.app_info
Copy link
Copy Markdown
Contributor Author

@NicoHinderling NicoHinderling Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now matches analyzers/apple.py and allows us to preprocess separately from the full size analysis

from launchpad.size.models.apple import AppleAppInfo
from launchpad.size.models.common import BaseAnalysisResults


Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow us to treat preprocess & full size analysis separately

@NicoHinderling NicoHinderling force-pushed the 07-08-wire_up_end_to_end_call_path_kafka_-_monolith_endpoints_ branch from 2b7ccae to dd3bea7 Compare July 8, 2025 21:47
@NicoHinderling NicoHinderling marked this pull request as ready for review July 8, 2025 21:48
async def start(self) -> None:
"""Start all service components."""
if not self.server or not self.kafka_processor:
def process_artifact_analysis(self, artifact_id: str, project_id: str, organization_id: str) -> None:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we have more than size analysis, we can move a lot of this code into product specific files (ex. size.py . snapshots.py , etc) but for now I figure its ok to just have it all in this file

@NicoHinderling NicoHinderling force-pushed the 07-08-wire_up_end_to_end_call_path_kafka_-_monolith_endpoints_ branch from fb4e622 to fbf1baf Compare July 9, 2025 01:51
# Set up signal handlers for graceful shutdown
self._setup_signal_handlers()
try:
file_content, _ = self._download_artifact(sentry_client, artifact_id, project_id, organization_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's a problem that we are keeping this large file in memory and then writing it, and reading it all again (in ArtifactFactory). Ideally the download would write to the file without storing the entire thing in memory. Maybe we should at least make sure this file_content reference isn't held when we read it the second time?

)
raise

def _save_to_temp_file(self, file_content: bytes, artifact_id: str) -> str:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handling of tempfile is a little odd. Ideally we would use the built in cleanup behaviour of tempfile rather than manually cleaning up ourselves. See comment at use.

"file_content": content,
"file_size_bytes": len(content),
"headers": dict(response.headers),
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DX of dicts (both types and access) are quite bad for Python vs. TypeScript so what it very idiomatic in TypeScript (making the return type an anonymous object with some fields) is not the most common in Python.

If there are only ~2 fields it's more common to use a tuple:

def foo():
  ...
  success = True
  content = "bar"
  return success,  content

However if there are many fields the fastest thing is to use a NamedTuple:

class DownloadResult(NamedTuple):
    success: bool
    file_content: string
    file_size_bytes: int

Which is a magic way to define an object with some fields (also handles a bunch of the operators). See:

https://typing.python.org/en/latest/spec/namedtuples.html#named-tuples

content += chunk
if len(content) > 5 * 1024 * 1024 * 1024: # 5GB limit
logger.warning("Download truncated at 5GB")
break
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should avoid reading the whole file into a string if possible since they can be quite large.
Python has a nice abstraction for file-like objects which is what I would use here.

We could either take the file-like object as an out parameter:

def download_artifact(..., out):
    for chunk in response.iter_content(chunk_size=8192):
        out.write(chunk)

Or you can have download_artifact make a tempfile/in memory file:

def download_artifact(...):
    f = tempfile.TemporaryFile()
    for chunk in response.iter_content(chunk_size=8192):
        f.write(chunk)
    return f
def download_artifact(...):
    f = io.StringIO()
    for chunk in response.iter_content(chunk_size=8192):
        f.write(chunk)
    return f

return file_content, file_size

except Exception as e:
if not isinstance(e, RuntimeError):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to catch only some kinds of errors you can do:

try:
   # ...
except RuntimeError as e:
   # handle RuntimeError here

try:
# Wait for shutdown signal
await self._shutdown_event.wait()
download_result = sentry_client.download_artifact(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it might be better to do:

with tempfile.TemporyFile() as tf:
  sentry_client.download_artifact(org=organization_id, project=project_id, artifact_id=artifact_id, out=tf)
  artifact = ArtifactFactory.fromFile(tf)
  ...
  

)

if not download_result.get("success"):
self._handle_download_error(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The control flow here seems odd, _handle_download_error ends up rasing iiuc. Can download_artifact just raise an error directly?

@NicoHinderling NicoHinderling merged commit 14c90db into main Jul 11, 2025
17 of 18 checks passed
@NicoHinderling NicoHinderling deleted the 07-08-wire_up_end_to_end_call_path_kafka_-_monolith_endpoints_ branch July 11, 2025 21:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants