Skip to content

Commit 3947ca8

Browse files
chore(deploy): download urls in parallel in config-processor (#6351)
Resolves #6350 Reduces time to process PPX config down from 2m to 6s: ``` ./generate_local_test_config.sh --values ~/code/pathoplexus/loculus_values/values.yaml --from-live --live-host preview-main.pathoplexus.org ``` Some savings also transfer to faster pod startup (via reduced time of init container) of backend/website, where this script also does lots of downloading. For example backend init container on Loculus takes only 6s vs 18s. On PPX this is even more marked, we might save almost a minute. Before (on Loculus main): ``` ../generate_local_test_config.sh --from-live 6.46s user 0.72s system 39% cpu 18.193 total ``` After: ``` ../generate_local_test_config.sh --from-live 3.99s user 0.44s system 100% cpu 4.403 total ``` ### Screenshot <img width="1087" height="849" alt="image" src="https://github.com/user-attachments/assets/57ea838b-13bf-45e8-ba5f-a161abb8e92d" /> Also works in init container on ArgoCD: <img width="1319" height="396" alt="image" src="https://github.com/user-attachments/assets/dfe5edd4-44a3-4084-8b93-72da38a849be" /> 🚀 Preview: https://speed-up-deploy.loculus.org
1 parent 39aaa21 commit 3947ca8

1 file changed

Lines changed: 57 additions & 8 deletions

File tree

kubernetes/config-processor/config-processor.py

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
import os
22
import re
33
import shutil
4+
import threading
5+
from concurrent.futures import ThreadPoolExecutor, as_completed
46

57
import requests
68

9+
DEFAULT_MAX_WORKERS = 16
10+
thread_local = threading.local()
11+
712

813
def copy_structure(input_dir, output_dir):
914
for root, dirs, files in os.walk(input_dir):
@@ -16,30 +21,74 @@ def copy_structure(input_dir, output_dir):
1621
os.makedirs(os.path.dirname(file_path), exist_ok=True)
1722
shutil.copy(os.path.join(root, file), file_path)
1823

19-
def replace_url_with_content(file_content):
24+
25+
def download_urls(urls):
26+
if not urls:
27+
return {}
28+
29+
max_workers = DEFAULT_MAX_WORKERS
30+
while True:
31+
try:
32+
return download_urls_with_workers(urls, max_workers)
33+
except requests.exceptions.RequestException as error:
34+
if "Too many open files" not in str(error) or max_workers == 1:
35+
raise
36+
max_workers = max(max_workers // 2, 1)
37+
print(f"Too many open files while downloading URLs, retrying with {max_workers} worker(s)")
38+
39+
40+
def download_urls_with_workers(urls, max_workers):
41+
print(f"Downloading {len(urls)} unique URL(s) with {max_workers} worker(s)")
42+
43+
downloaded_content = {}
44+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
45+
future_to_url = {executor.submit(download_url, url): url for url in urls}
46+
for future in as_completed(future_to_url):
47+
url = future_to_url[future]
48+
response = future.result()
49+
if response.status_code == 200:
50+
downloaded_content[url] = response.text.strip()
51+
else:
52+
error_details = f"URL: {url}, Status Code: {response.status_code}, Reason: {response.reason}"
53+
raise ValueError(f"Problem downloading {error_details}")
54+
return downloaded_content
55+
56+
57+
def download_url(url):
58+
if not hasattr(thread_local, "session"):
59+
thread_local.session = requests.Session()
60+
return thread_local.session.get(url)
61+
62+
63+
def replace_url_with_content(file_content, downloaded_content):
2064
urls = re.findall(r'\[\[URL:([^\]]*)\]\]', file_content)
2165
for url in set(urls):
22-
response = requests.get(url)
23-
if response.status_code == 200:
24-
file_content = file_content.replace(f"[[URL:{url}]]", response.text.strip())
25-
else:
26-
error_details = f"URL: {url}, Status Code: {response.status_code}, Reason: {response.reason}"
27-
raise ValueError(f"Problem downloading {error_details}")
66+
file_content = file_content.replace(f"[[URL:{url}]]", downloaded_content[url])
2867
return file_content
2968

3069
def make_substitutions(file_content, substitutions):
3170
for key, value in substitutions.items():
3271
file_content = file_content.replace(f"[[{key}]]", value)
3372
return file_content
3473

74+
def collect_urls(output_dir):
75+
urls = set()
76+
for root, dirs, files in os.walk(output_dir):
77+
for file in files:
78+
file_path = os.path.join(root, file)
79+
with open(file_path) as f:
80+
urls.update(re.findall(r'\[\[URL:([^\]]*)\]\]', f.read()))
81+
return urls
82+
3583
def process_files(output_dir, substitutions):
84+
downloaded_content = download_urls(collect_urls(output_dir))
3685
for root, dirs, files in os.walk(output_dir):
3786
for file in files:
3887
file_path = os.path.join(root, file)
3988
with open(file_path, 'r+') as f:
4089
print(f"Processing {file_path}")
4190
content = f.read()
42-
new_content = replace_url_with_content(content)
91+
new_content = replace_url_with_content(content, downloaded_content)
4392
new_content = make_substitutions(new_content, substitutions)
4493
if new_content != content:
4594
f.seek(0)

0 commit comments

Comments
 (0)