Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
plumbum==1.9.0
requests>=2.31.0
filesplit==3.0.2
pulpcore-client==3.68.0
scikit_build==0.18.1
cerberus==1.3.5
Expand Down
3 changes: 3 additions & 0 deletions sign_node/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
DEFAULT_PULP_USER = "pulp"
DEFAULT_PULP_PASSWORD = "test_pwd"
DEFAULT_PULP_CHUNK_SIZE = 8388608 # 8 MiB
DEFAULT_UPLOAD_WORKERS = 4
# Max file size to allow parallel upload for
DEFAULT_PARALLEL_FILE_UPLOAD_SIZE = 524288000 # 500 MB
DEFAULT_PGP_PASSWORD = "test_pwd"
Expand Down Expand Up @@ -64,6 +65,7 @@ def __init__(self, config_file=None, **cmd_args):
"pulp_user": DEFAULT_PULP_USER,
"pulp_password": DEFAULT_PULP_PASSWORD,
"pulp_chunk_size": DEFAULT_PULP_CHUNK_SIZE,
"upload_workers": DEFAULT_UPLOAD_WORKERS,
"parallel_upload_file_size": DEFAULT_PARALLEL_FILE_UPLOAD_SIZE,
"dev_pgp_key_password": DEFAULT_PGP_PASSWORD,
'sentry_dsn': DEFAULT_SENTRY_DSN,
Expand All @@ -90,6 +92,7 @@ def __init__(self, config_file=None, **cmd_args):
"pulp_user": {"type": "string", "nullable": False},
"pulp_password": {"type": "string", "nullable": False},
"pulp_chunk_size": {"type": "integer", "nullable": False},
"upload_workers": {"type": "integer", "nullable": False},
"parallel_upload_file_size": {"type": "integer", "nullable": False},
"jwt_token": {"type": "string", "required": True},
"dev_pgp_key_password": {"type": "string", "nullable": False},
Expand Down
1 change: 1 addition & 0 deletions sign_node/signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, config, password_db, gpg):
self.__config.pulp_user,
self.__config.pulp_password,
self.__config.pulp_chunk_size,
upload_workers=self.__config.upload_workers,
)
self.__working_dir_path = Path(self.__config.working_dir)
self.__download_credentials = {
Expand Down
99 changes: 75 additions & 24 deletions sign_node/uploaders/pulp.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import csv
import logging
import math
import os
import tempfile
import time
import shutil
import typing
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List

from fsplit.filesplit import Filesplit
from pulpcore.client.pulpcore.configuration import Configuration
from pulpcore.client.pulpcore.api_client import ApiClient
from pulpcore.client.pulpcore.api.tasks_api import TasksApi
Expand All @@ -31,7 +29,8 @@ class PulpBaseUploader(BaseUploader):
Handles uploads to Pulp server.
"""

def __init__(self, host: str, username: str, password: str, chunk_size: int):
def __init__(self, host: str, username: str, password: str, chunk_size: int,
upload_workers: int = 4):
"""
Initiate uploader.

Expand All @@ -45,13 +44,15 @@ def __init__(self, host: str, username: str, password: str, chunk_size: int):
User password.
chunk_size : int
Size of chunk to split files during the upload.
upload_workers : int
Maximum number of concurrent workers for chunked uploads.
"""
api_client = self._prepare_api_client(host, username, password)
self._uploads_client = UploadsApi(api_client=api_client)
self._tasks_client = TasksApi(api_client=api_client)
self._artifacts_client = ArtifactsApi(api_client=api_client)
self._file_splitter = Filesplit()
self._chunk_size = chunk_size
self._upload_workers = upload_workers
self._logger = logging.getLogger(__file__)

@staticmethod
Expand Down Expand Up @@ -137,38 +138,88 @@ def _commit_upload(self, reference: str, file_sha256: str) -> str:
task_result = self._wait_for_task_completion(response.task)
return task_result.created_resources[0]

def _create_artifact_direct(self, file_path: str, file_sha256: str) -> str:
response = self._artifacts_client.create(file_path, sha256=file_sha256)
return response.pulp_href

def _put_large_file(self, file_path: str, reference: str):
temp_dir = tempfile.mkdtemp(prefix="pulp_uploader_")
try:
lower_bytes_limit = 0
total_size = os.path.getsize(file_path)
self._file_splitter.split(file_path, self._chunk_size, output_dir=temp_dir)
manifest_path = os.path.join(temp_dir, 'fs_manifest.csv')
for meta in csv.DictReader(open(manifest_path, 'r')):
split_file_path = os.path.join(temp_dir, meta['filename'])
upper_bytes_limit = lower_bytes_limit + int(meta['filesize']) - 1
total_size = os.path.getsize(file_path)

# Build list of (offset, length) chunk descriptors
chunks = []
offset = 0
while offset < total_size:
length = min(self._chunk_size, total_size - offset)
chunks.append((offset, length))
offset += length

def _upload_chunk(chunk_offset: int, chunk_length: int):
"""Read a byte range from source and upload via a temp file."""
tmp_path = None
try:
with open(file_path, 'rb') as src:
src.seek(chunk_offset)
data = src.read(chunk_length)
tmp = tempfile.NamedTemporaryFile(delete=False,
prefix='pulp_chunk_')
tmp_path = tmp.name
tmp.write(data)
tmp.close()
end_byte = chunk_offset + chunk_length - 1
content_range = (
f'bytes {chunk_offset}-{end_byte}/{total_size}'
)
self._uploads_client.update(
f'bytes {lower_bytes_limit}-{upper_bytes_limit}/'
f'{total_size}',
reference, split_file_path)
lower_bytes_limit += int(meta['filesize'])
finally:
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
content_range, reference, tmp_path,
)
finally:
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path)

with ThreadPoolExecutor(max_workers=self._upload_workers) as executor:
futures = {
executor.submit(_upload_chunk, off, length): (off, length)
for off, length in chunks
}
for future in as_completed(futures):
future.result() # propagates any exception

def _send_file(self, file_path: str) -> typing.Tuple[str, str]:
file_sha256 = hash_file(file_path, hash_type="sha256")
reference = self.check_if_artifact_exists(file_sha256)
if reference:
return file_sha256, reference
reference, file_size = self._create_upload(file_path)
file_size = os.path.getsize(file_path)
file_name = os.path.basename(file_path)
start_time = time.time()
if file_size < self._chunk_size:
artifact_href = self._create_artifact_direct(file_path, file_sha256)
elapsed = time.time() - start_time
self._logger.info(
'Upload complete: %s (%d bytes) via direct artifact in %.2fs',
file_name, file_size, elapsed,
)
return file_sha256, artifact_href
reference, _ = self._create_upload(file_path)
if file_size > self._chunk_size:
self._put_large_file(file_path, reference)
num_chunks = math.ceil(file_size / self._chunk_size)
artifact_href = self._commit_upload(reference, file_sha256)
elapsed = time.time() - start_time
self._logger.info(
'Upload complete: %s (%d bytes) via %d chunks in %.2fs',
file_name, file_size, num_chunks, elapsed,
)
else:
self._uploads_client.update(
f"bytes 0-{file_size - 1}/{file_size}", reference, file_path
)
artifact_href = self._commit_upload(reference, file_sha256)
artifact_href = self._commit_upload(reference, file_sha256)
elapsed = time.time() - start_time
self._logger.info(
'Upload complete: %s (%d bytes) via single chunk in %.2fs',
file_name, file_size, elapsed,
)
return file_sha256, artifact_href

def check_if_artifact_exists(self, sha256: str) -> str:
Expand Down
Loading
Loading