Skip to content

Commit 1e9da96

Browse files
Optimize Pulp upload: parallel chunks, direct artifact creation, configurable concurrency
- Replace sequential filesplit-based disk splitting with parallel ThreadPoolExecutor chunk uploads using byte-range reads via temp files - Add direct artifact creation via ArtifactsApi.create() for files smaller than chunk_size, bypassing the 3-step upload+commit flow - Add configurable upload_workers setting (default 4) to SignNodeConfig, wired through Signer to PulpBaseUploader - Add per-file upload timing logs at INFO level for all three upload paths (direct artifact, single chunk, multi-chunk) - Remove filesplit dependency from requirements.txt - Add 6 new tests: parallel chunk upload, chunk error propagation, direct artifact creation, configurable workers, default workers, and upload timing log verification
1 parent 4bb3107 commit 1e9da96

5 files changed

Lines changed: 435 additions & 25 deletions

File tree

requirements.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
plumbum==1.9.0
22
requests>=2.31.0
3-
filesplit==3.0.2
43
pulpcore-client==3.68.0
54
scikit_build==0.18.1
65
cerberus==1.3.5

sign_node/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
DEFAULT_PULP_USER = "pulp"
1919
DEFAULT_PULP_PASSWORD = "test_pwd"
2020
DEFAULT_PULP_CHUNK_SIZE = 8388608 # 8 MiB
21+
DEFAULT_UPLOAD_WORKERS = 4
2122
# Max file size to allow parallel upload for
2223
DEFAULT_PARALLEL_FILE_UPLOAD_SIZE = 524288000 # 500 MB
2324
DEFAULT_PGP_PASSWORD = "test_pwd"
@@ -64,6 +65,7 @@ def __init__(self, config_file=None, **cmd_args):
6465
"pulp_user": DEFAULT_PULP_USER,
6566
"pulp_password": DEFAULT_PULP_PASSWORD,
6667
"pulp_chunk_size": DEFAULT_PULP_CHUNK_SIZE,
68+
"upload_workers": DEFAULT_UPLOAD_WORKERS,
6769
"parallel_upload_file_size": DEFAULT_PARALLEL_FILE_UPLOAD_SIZE,
6870
"dev_pgp_key_password": DEFAULT_PGP_PASSWORD,
6971
'sentry_dsn': DEFAULT_SENTRY_DSN,
@@ -90,6 +92,7 @@ def __init__(self, config_file=None, **cmd_args):
9092
"pulp_user": {"type": "string", "nullable": False},
9193
"pulp_password": {"type": "string", "nullable": False},
9294
"pulp_chunk_size": {"type": "integer", "nullable": False},
95+
"upload_workers": {"type": "integer", "nullable": False},
9396
"parallel_upload_file_size": {"type": "integer", "nullable": False},
9497
"jwt_token": {"type": "string", "required": True},
9598
"dev_pgp_key_password": {"type": "string", "nullable": False},

sign_node/signer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def __init__(self, config, password_db, gpg):
6363
self.__config.pulp_user,
6464
self.__config.pulp_password,
6565
self.__config.pulp_chunk_size,
66+
upload_workers=self.__config.upload_workers,
6667
)
6768
self.__working_dir_path = Path(self.__config.working_dir)
6869
self.__download_credentials = {

sign_node/uploaders/pulp.py

Lines changed: 75 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1-
import csv
21
import logging
2+
import math
33
import os
44
import tempfile
55
import time
6-
import shutil
76
import typing
7+
from concurrent.futures import ThreadPoolExecutor, as_completed
88
from typing import List
9-
10-
from fsplit.filesplit import Filesplit
119
from pulpcore.client.pulpcore.configuration import Configuration
1210
from pulpcore.client.pulpcore.api_client import ApiClient
1311
from pulpcore.client.pulpcore.api.tasks_api import TasksApi
@@ -31,7 +29,8 @@ class PulpBaseUploader(BaseUploader):
3129
Handles uploads to Pulp server.
3230
"""
3331

34-
def __init__(self, host: str, username: str, password: str, chunk_size: int):
32+
def __init__(self, host: str, username: str, password: str, chunk_size: int,
33+
upload_workers: int = 4):
3534
"""
3635
Initiate uploader.
3736
@@ -45,13 +44,15 @@ def __init__(self, host: str, username: str, password: str, chunk_size: int):
4544
User password.
4645
chunk_size : int
4746
Size of chunk to split files during the upload.
47+
upload_workers : int
48+
Maximum number of concurrent workers for chunked uploads.
4849
"""
4950
api_client = self._prepare_api_client(host, username, password)
5051
self._uploads_client = UploadsApi(api_client=api_client)
5152
self._tasks_client = TasksApi(api_client=api_client)
5253
self._artifacts_client = ArtifactsApi(api_client=api_client)
53-
self._file_splitter = Filesplit()
5454
self._chunk_size = chunk_size
55+
self._upload_workers = upload_workers
5556
self._logger = logging.getLogger(__file__)
5657

5758
@staticmethod
@@ -137,38 +138,88 @@ def _commit_upload(self, reference: str, file_sha256: str) -> str:
137138
task_result = self._wait_for_task_completion(response.task)
138139
return task_result.created_resources[0]
139140

141+
def _create_artifact_direct(self, file_path: str, file_sha256: str) -> str:
142+
response = self._artifacts_client.create(file_path, sha256=file_sha256)
143+
return response.pulp_href
144+
140145
def _put_large_file(self, file_path: str, reference: str):
141-
temp_dir = tempfile.mkdtemp(prefix="pulp_uploader_")
142-
try:
143-
lower_bytes_limit = 0
144-
total_size = os.path.getsize(file_path)
145-
self._file_splitter.split(file_path, self._chunk_size, output_dir=temp_dir)
146-
manifest_path = os.path.join(temp_dir, 'fs_manifest.csv')
147-
for meta in csv.DictReader(open(manifest_path, 'r')):
148-
split_file_path = os.path.join(temp_dir, meta['filename'])
149-
upper_bytes_limit = lower_bytes_limit + int(meta['filesize']) - 1
146+
total_size = os.path.getsize(file_path)
147+
148+
# Build list of (offset, length) chunk descriptors
149+
chunks = []
150+
offset = 0
151+
while offset < total_size:
152+
length = min(self._chunk_size, total_size - offset)
153+
chunks.append((offset, length))
154+
offset += length
155+
156+
def _upload_chunk(chunk_offset: int, chunk_length: int):
157+
"""Read a byte range from source and upload via a temp file."""
158+
tmp_path = None
159+
try:
160+
with open(file_path, 'rb') as src:
161+
src.seek(chunk_offset)
162+
data = src.read(chunk_length)
163+
tmp = tempfile.NamedTemporaryFile(delete=False,
164+
prefix='pulp_chunk_')
165+
tmp_path = tmp.name
166+
tmp.write(data)
167+
tmp.close()
168+
end_byte = chunk_offset + chunk_length - 1
169+
content_range = (
170+
f'bytes {chunk_offset}-{end_byte}/{total_size}'
171+
)
150172
self._uploads_client.update(
151-
f'bytes {lower_bytes_limit}-{upper_bytes_limit}/'
152-
f'{total_size}',
153-
reference, split_file_path)
154-
lower_bytes_limit += int(meta['filesize'])
155-
finally:
156-
if temp_dir and os.path.exists(temp_dir):
157-
shutil.rmtree(temp_dir)
173+
content_range, reference, tmp_path,
174+
)
175+
finally:
176+
if tmp_path and os.path.exists(tmp_path):
177+
os.unlink(tmp_path)
178+
179+
with ThreadPoolExecutor(max_workers=self._upload_workers) as executor:
180+
futures = {
181+
executor.submit(_upload_chunk, off, length): (off, length)
182+
for off, length in chunks
183+
}
184+
for future in as_completed(futures):
185+
future.result() # propagates any exception
158186

159187
def _send_file(self, file_path: str) -> typing.Tuple[str, str]:
160188
file_sha256 = hash_file(file_path, hash_type="sha256")
161189
reference = self.check_if_artifact_exists(file_sha256)
162190
if reference:
163191
return file_sha256, reference
164-
reference, file_size = self._create_upload(file_path)
192+
file_size = os.path.getsize(file_path)
193+
file_name = os.path.basename(file_path)
194+
start_time = time.time()
195+
if file_size < self._chunk_size:
196+
artifact_href = self._create_artifact_direct(file_path, file_sha256)
197+
elapsed = time.time() - start_time
198+
self._logger.info(
199+
'Upload complete: %s (%d bytes) via direct artifact in %.2fs',
200+
file_name, file_size, elapsed,
201+
)
202+
return file_sha256, artifact_href
203+
reference, _ = self._create_upload(file_path)
165204
if file_size > self._chunk_size:
166205
self._put_large_file(file_path, reference)
206+
num_chunks = math.ceil(file_size / self._chunk_size)
207+
artifact_href = self._commit_upload(reference, file_sha256)
208+
elapsed = time.time() - start_time
209+
self._logger.info(
210+
'Upload complete: %s (%d bytes) via %d chunks in %.2fs',
211+
file_name, file_size, num_chunks, elapsed,
212+
)
167213
else:
168214
self._uploads_client.update(
169215
f"bytes 0-{file_size - 1}/{file_size}", reference, file_path
170216
)
171-
artifact_href = self._commit_upload(reference, file_sha256)
217+
artifact_href = self._commit_upload(reference, file_sha256)
218+
elapsed = time.time() - start_time
219+
self._logger.info(
220+
'Upload complete: %s (%d bytes) via single chunk in %.2fs',
221+
file_name, file_size, elapsed,
222+
)
172223
return file_sha256, artifact_href
173224

174225
def check_if_artifact_exists(self, sha256: str) -> str:

0 commit comments

Comments
 (0)