diff --git a/cterasdk/direct/client.py b/cterasdk/direct/client.py index 0933e3a7..5e63c447 100644 --- a/cterasdk/direct/client.py +++ b/cterasdk/direct/client.py @@ -4,7 +4,7 @@ from . import filters from .credentials import KeyPair, Bearer from .lib import get_chunks, decrypt_encryption_key, process_chunks -from .types import File, ByteRange, FileMetadata +from .types import ByteRange from .stream import Streamer from ..objects.endpoints import DefaultBuilder, EndpointBuilder @@ -23,10 +23,15 @@ def __init__(self, baseurl, credentials): self._client = AsyncClient(DefaultBuilder(), settings=cterasdk.settings.io.direct.storage.settings, authenticator=lambda *_: True) self._credentials = credentials - async def _direct(self, file_id): - server_object = await get_chunks(self._api, self._credentials, file_id) - encryption_key = decrypt_encryption_key(file_id, server_object.wrapped_key, self._credentials.secret_access_key) - return File(file_id, encryption_key, server_object.chunks) + async def _chunks(self, file_id): + metadata = await get_chunks(self._api, self._credentials, file_id) + if metadata.encrypted: + metadata.encryption_key = decrypt_encryption_key( + metadata.file_id, + metadata.encryption_key, + self._credentials.secret_access_key + ) + return metadata async def metadata(self, file_id): """ @@ -34,7 +39,8 @@ async def metadata(self, file_id): :param int file_id: File ID. """ - return FileMetadata(await self._direct(file_id)) + meta = await self._chunks(file_id) + return meta.serialize() async def blocks(self, file_id, blocks, max_workers): """ @@ -47,8 +53,8 @@ async def blocks(self, file_id, blocks, max_workers): :returns: List of Blocks. :rtype: list[cterasdk.direct.types.Block] """ - file = await self._direct(file_id) - executor = await self.executor(filters.blocks(file, blocks), file.encryption_key, file_id, max_workers) + meta = await self._chunks(file_id) + executor = self.executor(filters.blocks(meta, blocks), meta.encryption_key, meta.file_id, max_workers) return await executor() async def streamer(self, file_id, byte_range): @@ -60,13 +66,13 @@ async def streamer(self, file_id, byte_range): :returns: Streamer Object :rtype: cterasdk.direct.stream.Streamer """ - file = await self._direct(file_id) + meta = await self._chunks(file_id) byte_range = byte_range if byte_range is not None else ByteRange.default() max_workers = cterasdk.settings.sessions.ctera_direct.streamer.max_workers - executor = await self.executor(filters.span(file, byte_range), file.encryption_key, file_id, max_workers) + executor = self.executor(filters.span(meta, byte_range), meta.encryption_key, file_id, max_workers) return Streamer(executor, byte_range) - async def executor(self, chunks, encryption_key, file_id=None, max_workers=None): + def executor(self, chunks, encryption_key, file_id=None, max_workers=None): """ Get Blocks. @@ -142,6 +148,12 @@ async def streamer(self, file_id, byte_range=None): """ return await self._client.streamer(file_id, byte_range) + def executor(self, chunks, encryption_key, max_workers): + """ + Get download executor for download from chunk metadata + """ + return self._client.executor(chunks, encryption_key, None, max_workers) + async def close(self): await self._client.close() diff --git a/cterasdk/direct/exceptions.py b/cterasdk/direct/exceptions.py index 2e1a556f..c2c5f1ef 100644 --- a/cterasdk/direct/exceptions.py +++ b/cterasdk/direct/exceptions.py @@ -45,19 +45,19 @@ def __init__(self, filename): class BlocksNotFoundError(DirectIOAPIError): def __init__(self, filename): - super().__init__(errno.ENODATA, 'Blocks not found', filename) + super().__init__(errno.ENODATA, f'Could not find blocks for file ID: {filename}', filename) class BlockListConnectionError(DirectIOAPIError): def __init__(self, filename): - super().__init__(errno.ENETRESET, 'Failed to list blocks. Connection error', filename) + super().__init__(errno.ENETRESET, f'Failed to list blocks for file ID: {filename} due to a connection error', filename) class BlockListTimeout(DirectIOAPIError): def __init__(self, filename): - super().__init__(errno.ETIMEDOUT, 'Failed to list blocks. Timed out', filename) + super().__init__(errno.ETIMEDOUT, f'Timed out while listing blocks for file ID: {filename}', filename) class DecryptKeyError(DirectIOError): diff --git a/cterasdk/direct/lib.py b/cterasdk/direct/lib.py index 7adeeb15..a7e71b05 100644 --- a/cterasdk/direct/lib.py +++ b/cterasdk/direct/lib.py @@ -1,7 +1,7 @@ import logging import asyncio -from .types import DirectIOResponse, Block +from .types import Metadata, Block from .credentials import KeyPair, Bearer from .crypto import decrypt_key, decrypt_block from .decompressor import decompress @@ -32,7 +32,7 @@ async def retry(coro, retries=3, backoff=1): if attempts == retries: raise error wait = backoff * (2 ** (attempts - 1)) - logger.debug('Failed attempt number %s. Retrying in %s seconds.', attempts, wait) + logger.debug('Download of block failed on attempt %s. Retrying in %s seconds...', attempts, wait) await asyncio.sleep(wait) @@ -46,23 +46,51 @@ async def get_object(client, file_id, chunk): :rtype: bytes """ async def get_object_coro(): - parameters = {'file_id': file_id, 'number': chunk.index, 'offset': chunk.offset} - logger.debug('Downloading Block. %s', parameters) + + message = ( + f"Downloading block #{chunk.index} " + f"(offset={chunk.offset}, length={chunk.length})" + ) + + if file_id: + message += f" for file ID {file_id}" + + error_message, exception = None, None + + logger.debug(message) try: response = await client.get(chunk.url) return await response.read() except ConnectionError: - logger.error('Failed to download block. Connection error. %s', parameters) - raise DownloadConnectionError(file_id, chunk) + error_message = 'connection' + exception = DownloadConnectionError(file_id, chunk) except asyncio.TimeoutError: - logger.error('Failed to download block. Timed out. %s', parameters) - raise DownloadTimeout(file_id, chunk) + error_message = 'timeout' + exception = DownloadTimeout(file_id, chunk) except IOError as error: - logger.error('Failed to download block. IO Error. %s', parameters) - raise DownloadError(error, file_id, chunk) + error_message = 'io' + exception = DownloadError(error, file_id, chunk) except ClientResponseException as error: - logger.error('Failed to download block. Error. %s', parameters) - raise DownloadError(error.response, file_id, chunk) + error_message = 'unknown' + exception = DownloadError(error.response, file_id, chunk) + + error_messages = { + "connection": "Connection error", + "timeout": "Timed out", + "io": "I/O error", + "unknown": "Unknown error" + } + + message = ( + f"Failed to download block #{chunk.index} " + f"(offset={chunk.offset}, length={chunk.length})" + ) + if file_id: + message = message + f" for file ID {file_id}" + + message = message + f": {error_messages.get(error_message, 'Unknown error')}." + logger.error(message) + raise exception return await retry(get_object_coro) @@ -118,9 +146,13 @@ async def process_chunk(client, file_id, chunk, encryption_key, semaphore): :rtype: cterasdk.direct.types.Block """ async def process(client, chunk, encryption_key): - parameters = {'file_id': file_id, 'number': chunk.index, 'offset': chunk.offset} - logger.debug('Processing Block. %s', parameters) - + message = ( + f"Processing block {chunk.index} " + f"(offset={chunk.offset}, length={chunk.length})" + ) + if file_id: + message = message + f" for file ID {file_id}" + logger.debug(message) encrypted_object = await get_object(client, file_id, chunk) decrypted_object = await decrypt_object(file_id, encrypted_object, encryption_key, chunk) decompressed_object = await decompress_object(file_id, decrypted_object, chunk) @@ -144,10 +176,12 @@ async def process_chunks(client, file_id, chunks, encryption_key, semaphore=None :returns: List of futures. :rtype: list[asyncio.Task] """ - parameters = {'file_id': file_id, 'blocks': len(chunks)} + message = [f"Processing {len(chunks)} blocks"] + if file_id: + message.append(f"for file ID {file_id}") if semaphore: - parameters['max_workers'] = semaphore._value # pylint: disable=protected-access - logger.debug('Processing Blocks. %s', parameters) + message.append(f"using up to {semaphore._value} workers") # pylint: disable=protected-access + logger.debug(' '.join(message)) futures = [] for chunk in chunks: futures.append(asyncio.create_task(process_chunk(client, file_id, chunk, encryption_key, semaphore))) @@ -182,9 +216,11 @@ def create_authorization_header(credentials): authorization_header = None if isinstance(credentials, Bearer): + logger.debug('Initializing client using bearer token') authorization_header = f'Bearer {credentials.bearer}' elif isinstance(credentials, KeyPair): + logger.debug('Initializing client using key pair.') authorization_header = f'Bearer {credentials.access_key_id}' return {'Authorization': authorization_header} @@ -197,17 +233,16 @@ async def get_chunks(api, credentials, file_id): :param cterasdk.clients.clients.AsyncJSON api: Asynchronous JSON Client. :param int file_id: File ID. :returns: Wrapped key and file chunks. - :rtype: cterasdk.direct.types.DirectIOResponse + :rtype: cterasdk.direct.types.Metadata """ async def get_chunks_coro(): - parameters = {'file_id': file_id} - logger.debug('Listing blocks. %s', parameters) + logger.debug('Listing blocks for file ID: %s', file_id) try: response = await api.get(f'{file_id}', headers=create_authorization_header(credentials)) if not response.chunks: - logger.error('Blocks not found. %s', parameters) + logger.error('Could not find blocks for file ID: %s.', file_id) raise BlocksNotFoundError(file_id) - return DirectIOResponse(response) + return Metadata(file_id, response) except ClientResponseException as error: if error.response.status == 400: raise NotFoundError(file_id) @@ -217,10 +252,10 @@ async def get_chunks_coro(): raise UnprocessableContent(file_id) raise error except ConnectionError: - logger.error('Failed to list blocks. Connection error. %s', parameters) + logger.error('Failed to list blocks for file ID: %s due to a connection error.', file_id) raise BlockListConnectionError(file_id) except asyncio.TimeoutError: - logger.error('Failed to list blocks. Timed out. %s', parameters) + logger.error('Timed out while listing blocks for file ID: %s.', file_id) raise BlockListTimeout(file_id) return await retry(get_chunks_coro) diff --git a/cterasdk/direct/types.py b/cterasdk/direct/types.py index b6fc3167..d7061a4c 100644 --- a/cterasdk/direct/types.py +++ b/cterasdk/direct/types.py @@ -1,3 +1,4 @@ +import copy import base64 from ..common import Object, utils @@ -40,20 +41,63 @@ def default(): return ByteRange(0) -class DirectIOResponse: +class CompressionLib: + """ + Compression Library + + :ivar str Snappy: Snappy + :ivar int Gzip: Gzip + :ivar int Off: No Compression + """ + Snappy = 'SNAPPY' + Gzip = 'GZIP' + Off = 'NONE' + + +class Chunk(Object): + + def __init__(self, index, offset, url, length): + """ + Initialize a Chunk. + + :param int index: Chunk index. + :param int offset: Chunk offset. + :param str url: Signed URL. + :param int length: Object length. + """ + super().__init__( + index=index, + offset=offset, + url=url, + length=length + ) + + +class Metadata(Object): + """ + CTERA Direct IO File Metadata + """ - def __init__(self, server_object): + def __init__(self, file_id, server_object): """ - Initialize a Get Response Object. + Initialize a Direct IO metadata response object. :param int file_id: File ID. :param cterasdk.common.object.Object server_object: Response Object. """ - self._wrapped_key = server_object.wrapped_key - self._chunks = DirectIOResponse._create_chunks(server_object.chunks) + super().__init__( + file_id=file_id, + encrypted=server_object.encrypt_info.data_encrypted, + compressed=server_object.compression_type != CompressionLib.Off, + chunks=Metadata._format_chunks(server_object.chunks) + ) + self.encryption_key = server_object.encrypt_info.wrapped_key if self.encrypted else None + self.compression_library = server_object.compression_type if self.compressed else None + last_chunk = self.chunks[-1] + self.size = last_chunk.offset + last_chunk.length @staticmethod - def _create_chunks(server_object): + def _format_chunks(server_object): """ Create Chunks. @@ -70,79 +114,14 @@ def _create_chunks(server_object): offset = offset + chunk.len return chunks - @property - def wrapped_key(self): - return self._wrapped_key - - @property - def chunks(self): - return self._chunks - - -class Chunk: - """Chunk to Retrieve""" - - def __init__(self, index, offset, url, length): - """ - Initialize a Chunk. - - :param int index: Chunk index. - :param int offset: Chunk offset. - :param str url: Signed URL. - :param int length: Object length. - """ - self._index = index - self._offset = offset - self._url = url - self._length = length - - @property - def index(self): - return self._index - - @property - def offset(self): - return self._offset - - @property - def url(self): - return self._url - - @property - def length(self): - return self._length - - -class File: - - def __init__(self, file_id, encryption_key, chunks): + def serialize(self): """ - Initialize a File Object. - - :param int file_id: File ID. - :param str encryption_key: Encryption Key. - :param cterasdk.direct.types.Chunk chunks: List of Chunks. + Serialize Direct IO metadata to a dictionary. """ - self._file_id = file_id - self._encryption_key = encryption_key - self._chunks = chunks - - @property - def file_id(self): - return self._file_id - - @property - def encryption_key(self): - return self._encryption_key - - @property - def chunks(self): - return self._chunks - - @property - def size(self): - last_chunk = self._chunks[-1] - return last_chunk.offset + last_chunk.length + x = copy.deepcopy(self) + if self.encrypted: + x.encryption_key = utils.utf8_decode(base64.b64encode(self.encryption_key)) + return x class Block: @@ -197,31 +176,3 @@ def fragment(self, byte_range): data = self._data[start:end] return Block(self._file_id, self._number, self._offset + start if start else self._offset, data, len(data)) - - -class ChunkMetadata(Object): - """ - Direct IO File Chunk Metadata Object - - :ivar str url: Part URL - :ivar int index: Part Index - :ivar int offset: Part Offset - :ivar int length: Part Length - """ - def __init__(self, url, index, offset, length): - super().__init__() - self.url = url - self.index = index - self.offset = offset - self.length = length - - -class FileMetadata(Object): - """ - Direct IO File Metadata Object - """ - - def __init__(self, f): - super().__init__() - self.encryption_key = utils.utf8_decode(base64.b64encode(f.encryption_key)) - self.chunks = [ChunkMetadata(chunk.url, chunk.index, chunk.offset, chunk.length) for chunk in f.chunks] diff --git a/tests/ut/aio/direct/test_get_metadata.py b/tests/ut/aio/direct/test_get_metadata.py index 14efb80d..1f5f6b2a 100644 --- a/tests/ut/aio/direct/test_get_metadata.py +++ b/tests/ut/aio/direct/test_get_metadata.py @@ -31,7 +31,7 @@ async def test_get_file_metadata_not_found(self): with self.assertRaises(ctera_direct.exceptions.BlocksNotFoundError) as error: await self._direct.metadata(self._file_id) self.assertEqual(error.exception.errno, errno.ENODATA) - self.assertEqual(error.exception.strerror, 'Blocks not found') + self.assertEqual(error.exception.strerror, f'Could not find blocks for file ID: {self._file_id}') self.assertEqual(error.exception.filename, self._file_id) async def test_get_file_metadata_error_400(self): @@ -82,7 +82,8 @@ async def test_get_file_metadata_connection_error(self): with self.assertRaises(ctera_direct.exceptions.BlockListConnectionError) as error: await self._direct.metadata(self._file_id) self.assertEqual(error.exception.errno, errno.ENETRESET) - self.assertEqual(error.exception.strerror, 'Failed to list blocks. Connection error') + self.assertEqual(error.exception.strerror, + f'Failed to list blocks for file ID: {self._file_id} due to a connection error') self.assertEqual(error.exception.filename, self._file_id) async def test_get_file_metadata_timeout(self): @@ -91,7 +92,8 @@ async def test_get_file_metadata_timeout(self): with self.assertRaises(ctera_direct.exceptions.BlockListTimeout) as error: await self._direct.metadata(self._file_id) self.assertEqual(error.exception.errno, errno.ETIMEDOUT) - self.assertEqual(error.exception.strerror, 'Failed to list blocks. Timed out') + self.assertEqual(error.exception.strerror, + f'Timed out while listing blocks for file ID: {self._file_id}') self.assertEqual(error.exception.filename, self._file_id) @staticmethod