Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions cterasdk/direct/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from . import filters
from .credentials import KeyPair, Bearer
from .lib import get_chunks, decrypt_encryption_key, process_chunks
from .types import File, ByteRange, FileMetadata
from .types import ByteRange
from .stream import Streamer

from ..objects.endpoints import DefaultBuilder, EndpointBuilder
Expand All @@ -23,18 +23,24 @@ def __init__(self, baseurl, credentials):
self._client = AsyncClient(DefaultBuilder(), settings=cterasdk.settings.io.direct.storage.settings, authenticator=lambda *_: True)
self._credentials = credentials

async def _direct(self, file_id):
server_object = await get_chunks(self._api, self._credentials, file_id)
encryption_key = decrypt_encryption_key(file_id, server_object.wrapped_key, self._credentials.secret_access_key)
return File(file_id, encryption_key, server_object.chunks)
async def _chunks(self, file_id):
metadata = await get_chunks(self._api, self._credentials, file_id)
if metadata.encrypted:
metadata.encryption_key = decrypt_encryption_key(
metadata.file_id,
metadata.encryption_key,
self._credentials.secret_access_key
)
return metadata

async def metadata(self, file_id):
"""
Direct IO Metadata API.

:param int file_id: File ID.
"""
return FileMetadata(await self._direct(file_id))
meta = await self._chunks(file_id)
return meta.serialize()

async def blocks(self, file_id, blocks, max_workers):
"""
Expand All @@ -47,8 +53,8 @@ async def blocks(self, file_id, blocks, max_workers):
:returns: List of Blocks.
:rtype: list[cterasdk.direct.types.Block]
"""
file = await self._direct(file_id)
executor = await self.executor(filters.blocks(file, blocks), file.encryption_key, file_id, max_workers)
meta = await self._chunks(file_id)
executor = self.executor(filters.blocks(meta, blocks), meta.encryption_key, meta.file_id, max_workers)
return await executor()

async def streamer(self, file_id, byte_range):
Expand All @@ -60,13 +66,13 @@ async def streamer(self, file_id, byte_range):
:returns: Streamer Object
:rtype: cterasdk.direct.stream.Streamer
"""
file = await self._direct(file_id)
meta = await self._chunks(file_id)
byte_range = byte_range if byte_range is not None else ByteRange.default()
max_workers = cterasdk.settings.sessions.ctera_direct.streamer.max_workers
executor = await self.executor(filters.span(file, byte_range), file.encryption_key, file_id, max_workers)
executor = self.executor(filters.span(meta, byte_range), meta.encryption_key, file_id, max_workers)
return Streamer(executor, byte_range)

async def executor(self, chunks, encryption_key, file_id=None, max_workers=None):
def executor(self, chunks, encryption_key, file_id=None, max_workers=None):
"""
Get Blocks.

Expand Down Expand Up @@ -142,6 +148,12 @@ async def streamer(self, file_id, byte_range=None):
"""
return await self._client.streamer(file_id, byte_range)

def executor(self, chunks, encryption_key, max_workers):
"""
Get download executor for download from chunk metadata
"""
return self._client.executor(chunks, encryption_key, None, max_workers)

async def close(self):
await self._client.close()

Expand Down
6 changes: 3 additions & 3 deletions cterasdk/direct/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,19 @@ def __init__(self, filename):
class BlocksNotFoundError(DirectIOAPIError):

def __init__(self, filename):
super().__init__(errno.ENODATA, 'Blocks not found', filename)
super().__init__(errno.ENODATA, f'Could not find blocks for file ID: {filename}', filename)


class BlockListConnectionError(DirectIOAPIError):

def __init__(self, filename):
super().__init__(errno.ENETRESET, 'Failed to list blocks. Connection error', filename)
super().__init__(errno.ENETRESET, f'Failed to list blocks for file ID: {filename} due to a connection error', filename)


class BlockListTimeout(DirectIOAPIError):

def __init__(self, filename):
super().__init__(errno.ETIMEDOUT, 'Failed to list blocks. Timed out', filename)
super().__init__(errno.ETIMEDOUT, f'Timed out while listing blocks for file ID: {filename}', filename)


class DecryptKeyError(DirectIOError):
Expand Down
85 changes: 60 additions & 25 deletions cterasdk/direct/lib.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import asyncio

from .types import DirectIOResponse, Block
from .types import Metadata, Block
from .credentials import KeyPair, Bearer
from .crypto import decrypt_key, decrypt_block
from .decompressor import decompress
Expand Down Expand Up @@ -32,7 +32,7 @@ async def retry(coro, retries=3, backoff=1):
if attempts == retries:
raise error
wait = backoff * (2 ** (attempts - 1))
logger.debug('Failed attempt number %s. Retrying in %s seconds.', attempts, wait)
logger.debug('Download of block failed on attempt %s. Retrying in %s seconds...', attempts, wait)
await asyncio.sleep(wait)


Expand All @@ -46,23 +46,51 @@ async def get_object(client, file_id, chunk):
:rtype: bytes
"""
async def get_object_coro():
parameters = {'file_id': file_id, 'number': chunk.index, 'offset': chunk.offset}
logger.debug('Downloading Block. %s', parameters)

message = (
f"Downloading block #{chunk.index} "
f"(offset={chunk.offset}, length={chunk.length})"
)

if file_id:
message += f" for file ID {file_id}"

error_message, exception = None, None

logger.debug(message)
try:
response = await client.get(chunk.url)
return await response.read()
except ConnectionError:
logger.error('Failed to download block. Connection error. %s', parameters)
raise DownloadConnectionError(file_id, chunk)
error_message = 'connection'
exception = DownloadConnectionError(file_id, chunk)
except asyncio.TimeoutError:
logger.error('Failed to download block. Timed out. %s', parameters)
raise DownloadTimeout(file_id, chunk)
error_message = 'timeout'
exception = DownloadTimeout(file_id, chunk)
except IOError as error:
logger.error('Failed to download block. IO Error. %s', parameters)
raise DownloadError(error, file_id, chunk)
error_message = 'io'
exception = DownloadError(error, file_id, chunk)
except ClientResponseException as error:
logger.error('Failed to download block. Error. %s', parameters)
raise DownloadError(error.response, file_id, chunk)
error_message = 'unknown'
exception = DownloadError(error.response, file_id, chunk)

error_messages = {
"connection": "Connection error",
"timeout": "Timed out",
"io": "I/O error",
"unknown": "Unknown error"
}

message = (
f"Failed to download block #{chunk.index} "
f"(offset={chunk.offset}, length={chunk.length})"
)
if file_id:
message = message + f" for file ID {file_id}"

message = message + f": {error_messages.get(error_message, 'Unknown error')}."
logger.error(message)
raise exception

return await retry(get_object_coro)

Expand Down Expand Up @@ -118,9 +146,13 @@ async def process_chunk(client, file_id, chunk, encryption_key, semaphore):
:rtype: cterasdk.direct.types.Block
"""
async def process(client, chunk, encryption_key):
parameters = {'file_id': file_id, 'number': chunk.index, 'offset': chunk.offset}
logger.debug('Processing Block. %s', parameters)

message = (
f"Processing block {chunk.index} "
f"(offset={chunk.offset}, length={chunk.length})"
)
if file_id:
message = message + f" for file ID {file_id}"
logger.debug(message)
encrypted_object = await get_object(client, file_id, chunk)
decrypted_object = await decrypt_object(file_id, encrypted_object, encryption_key, chunk)
decompressed_object = await decompress_object(file_id, decrypted_object, chunk)
Expand All @@ -144,10 +176,12 @@ async def process_chunks(client, file_id, chunks, encryption_key, semaphore=None
:returns: List of futures.
:rtype: list[asyncio.Task]
"""
parameters = {'file_id': file_id, 'blocks': len(chunks)}
message = [f"Processing {len(chunks)} blocks"]
if file_id:
message.append(f"for file ID {file_id}")
if semaphore:
parameters['max_workers'] = semaphore._value # pylint: disable=protected-access
logger.debug('Processing Blocks. %s', parameters)
message.append(f"using up to {semaphore._value} workers") # pylint: disable=protected-access
logger.debug(' '.join(message))
futures = []
for chunk in chunks:
futures.append(asyncio.create_task(process_chunk(client, file_id, chunk, encryption_key, semaphore)))
Expand Down Expand Up @@ -182,9 +216,11 @@ def create_authorization_header(credentials):
authorization_header = None

if isinstance(credentials, Bearer):
logger.debug('Initializing client using bearer token')
authorization_header = f'Bearer {credentials.bearer}'

elif isinstance(credentials, KeyPair):
logger.debug('Initializing client using key pair.')
authorization_header = f'Bearer {credentials.access_key_id}'

return {'Authorization': authorization_header}
Expand All @@ -197,17 +233,16 @@ async def get_chunks(api, credentials, file_id):
:param cterasdk.clients.clients.AsyncJSON api: Asynchronous JSON Client.
:param int file_id: File ID.
:returns: Wrapped key and file chunks.
:rtype: cterasdk.direct.types.DirectIOResponse
:rtype: cterasdk.direct.types.Metadata
"""
async def get_chunks_coro():
parameters = {'file_id': file_id}
logger.debug('Listing blocks. %s', parameters)
logger.debug('Listing blocks for file ID: %s', file_id)
try:
response = await api.get(f'{file_id}', headers=create_authorization_header(credentials))
if not response.chunks:
logger.error('Blocks not found. %s', parameters)
logger.error('Could not find blocks for file ID: %s.', file_id)
raise BlocksNotFoundError(file_id)
return DirectIOResponse(response)
return Metadata(file_id, response)
except ClientResponseException as error:
if error.response.status == 400:
raise NotFoundError(file_id)
Expand All @@ -217,10 +252,10 @@ async def get_chunks_coro():
raise UnprocessableContent(file_id)
raise error
except ConnectionError:
logger.error('Failed to list blocks. Connection error. %s', parameters)
logger.error('Failed to list blocks for file ID: %s due to a connection error.', file_id)
raise BlockListConnectionError(file_id)
except asyncio.TimeoutError:
logger.error('Failed to list blocks. Timed out. %s', parameters)
logger.error('Timed out while listing blocks for file ID: %s.', file_id)
raise BlockListTimeout(file_id)

return await retry(get_chunks_coro)
Loading