Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .envrc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export LAUNCHPAD_ENV="development"
export LAUNCHPAD_HOST="0.0.0.0"
export LAUNCHPAD_PORT="2218"
export LAUNCHPAD_RPC_SHARED_SECRET="launchpad-also-very-long-value-haha"
export SENTRY_BASE_URL="http://localhost:8000"
# STATSD_HOST=... # defaults to 127.0.0.1
# STATSD_PORT=... # defaults to 8125

Expand Down
3 changes: 2 additions & 1 deletion src/launchpad/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def create_kafka_consumer(
consumer_config = {
"bootstrap.servers": config["bootstrap_servers"],
"group.id": config["group_id"],
"auto.offset.reset": "latest",
"auto.offset.reset": config["auto_offset_reset"],
"enable.auto.commit": False,
"enable.auto.offset.store": False,
}
Expand Down Expand Up @@ -136,4 +136,5 @@ def get_kafka_config() -> Dict[str, Any]:
"concurrency": int(os.getenv("KAFKA_CONCURRENCY", "4")),
"max_pending_futures": int(os.getenv("KAFKA_MAX_PENDING_FUTURES", "100")),
"healthcheck_file": os.getenv("KAFKA_HEALTHCHECK_FILE"),
"auto_offset_reset": os.getenv("KAFKA_AUTO_OFFSET_RESET", "latest"), # latest = skip old messages
}
52 changes: 26 additions & 26 deletions src/launchpad/sentry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,6 @@ def __init__(self, base_url: str) -> None:
if not self.shared_secret:
raise RuntimeError("LAUNCHPAD_RPC_SHARED_SECRET must be provided or set as environment variable")

def assemble_size_analysis(
self,
org: str | int,
project: str | int,
artifact_id: str | int,
checksum: str,
chunks: list[str],
) -> Dict[str, Any]:
"""Call the assemble size analysis endpoint."""
# Validate hex strings
if not re.match(r"^[a-fA-F0-9]+$", checksum):
raise ValueError("Invalid checksum format")
for chunk in chunks:
if not re.match(r"^[a-fA-F0-9]+$", chunk):
raise ValueError("Invalid chunk format")

data = {
"checksum": checksum,
"chunks": chunks,
"assemble_type": "size_analysis",
}

endpoint = f"/api/0/internal/{org}/{project}/files/preprodartifacts/{artifact_id}/assemble-generic/"
return self._make_json_request("POST", endpoint, data, operation="Assemble request")

def download_artifact(self, org: str, project: str, artifact_id: str) -> Dict[str, Any]:
"""Download preprod artifact."""
endpoint = f"/api/0/internal/{org}/{project}/files/preprodartifacts/{artifact_id}/"
Expand Down Expand Up @@ -136,7 +111,7 @@ def upload_size_analysis_file(
for attempt in range(max_retries):
logger.debug(f"Assembly attempt {attempt + 1}/{max_retries}")

result = self.assemble_size_analysis(
result = self._assemble_size_analysis(
org=org,
project=project,
artifact_id=artifact_id,
Expand Down Expand Up @@ -285,6 +260,31 @@ def _upload_chunk(self, org: str, chunk: Dict[str, Any]) -> bool:
logger.error(f"Chunk upload error: {e}")
return False

def _assemble_size_analysis(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not called anywhere outside of this file, so make it clear it's internally used only

self,
org: str | int,
project: str | int,
artifact_id: str | int,
checksum: str,
chunks: list[str],
) -> Dict[str, Any]:
"""Call the assemble size analysis endpoint."""
# Validate hex strings
if not re.match(r"^[a-fA-F0-9]+$", checksum):
raise ValueError("Invalid checksum format")
for chunk in chunks:
if not re.match(r"^[a-fA-F0-9]+$", chunk):
raise ValueError("Invalid chunk format")

data = {
"checksum": checksum,
"chunks": chunks,
"assemble_type": "size_analysis",
}

endpoint = f"/api/0/internal/{org}/{project}/files/preprodartifacts/{artifact_id}/assemble-generic/"
return self._make_json_request("POST", endpoint, data, operation="Assemble request")

def _create_multipart_body(self, boundary: str, filename: str, data: bytes) -> bytes:
"""Create multipart/form-data body."""
lines = [
Expand Down