Skip to content

Commit 17b8131

Browse files
authored
Saimon/direct io v83 portal (#298)
1 parent cb4691b commit 17b8131

5 files changed

Lines changed: 147 additions & 147 deletions

File tree

cterasdk/direct/client.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from . import filters
55
from .credentials import KeyPair, Bearer
66
from .lib import get_chunks, decrypt_encryption_key, process_chunks
7-
from .types import File, ByteRange, FileMetadata
7+
from .types import ByteRange
88
from .stream import Streamer
99

1010
from ..objects.endpoints import DefaultBuilder, EndpointBuilder
@@ -23,18 +23,24 @@ def __init__(self, baseurl, credentials):
2323
self._client = AsyncClient(DefaultBuilder(), settings=cterasdk.settings.io.direct.storage.settings, authenticator=lambda *_: True)
2424
self._credentials = credentials
2525

26-
async def _direct(self, file_id):
27-
server_object = await get_chunks(self._api, self._credentials, file_id)
28-
encryption_key = decrypt_encryption_key(file_id, server_object.wrapped_key, self._credentials.secret_access_key)
29-
return File(file_id, encryption_key, server_object.chunks)
26+
async def _chunks(self, file_id):
27+
metadata = await get_chunks(self._api, self._credentials, file_id)
28+
if metadata.encrypted:
29+
metadata.encryption_key = decrypt_encryption_key(
30+
metadata.file_id,
31+
metadata.encryption_key,
32+
self._credentials.secret_access_key
33+
)
34+
return metadata
3035

3136
async def metadata(self, file_id):
3237
"""
3338
Direct IO Metadata API.
3439
3540
:param int file_id: File ID.
3641
"""
37-
return FileMetadata(await self._direct(file_id))
42+
meta = await self._chunks(file_id)
43+
return meta.serialize()
3844

3945
async def blocks(self, file_id, blocks, max_workers):
4046
"""
@@ -47,8 +53,8 @@ async def blocks(self, file_id, blocks, max_workers):
4753
:returns: List of Blocks.
4854
:rtype: list[cterasdk.direct.types.Block]
4955
"""
50-
file = await self._direct(file_id)
51-
executor = await self.executor(filters.blocks(file, blocks), file.encryption_key, file_id, max_workers)
56+
meta = await self._chunks(file_id)
57+
executor = self.executor(filters.blocks(meta, blocks), meta.encryption_key, meta.file_id, max_workers)
5258
return await executor()
5359

5460
async def streamer(self, file_id, byte_range):
@@ -60,13 +66,13 @@ async def streamer(self, file_id, byte_range):
6066
:returns: Streamer Object
6167
:rtype: cterasdk.direct.stream.Streamer
6268
"""
63-
file = await self._direct(file_id)
69+
meta = await self._chunks(file_id)
6470
byte_range = byte_range if byte_range is not None else ByteRange.default()
6571
max_workers = cterasdk.settings.sessions.ctera_direct.streamer.max_workers
66-
executor = await self.executor(filters.span(file, byte_range), file.encryption_key, file_id, max_workers)
72+
executor = self.executor(filters.span(meta, byte_range), meta.encryption_key, file_id, max_workers)
6773
return Streamer(executor, byte_range)
6874

69-
async def executor(self, chunks, encryption_key, file_id=None, max_workers=None):
75+
def executor(self, chunks, encryption_key, file_id=None, max_workers=None):
7076
"""
7177
Get Blocks.
7278
@@ -142,6 +148,12 @@ async def streamer(self, file_id, byte_range=None):
142148
"""
143149
return await self._client.streamer(file_id, byte_range)
144150

151+
def executor(self, chunks, encryption_key, max_workers):
152+
"""
153+
Get download executor for download from chunk metadata
154+
"""
155+
return self._client.executor(chunks, encryption_key, None, max_workers)
156+
145157
async def close(self):
146158
await self._client.close()
147159

cterasdk/direct/exceptions.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,19 @@ def __init__(self, filename):
4545
class BlocksNotFoundError(DirectIOAPIError):
4646

4747
def __init__(self, filename):
48-
super().__init__(errno.ENODATA, 'Blocks not found', filename)
48+
super().__init__(errno.ENODATA, f'Could not find blocks for file ID: {filename}', filename)
4949

5050

5151
class BlockListConnectionError(DirectIOAPIError):
5252

5353
def __init__(self, filename):
54-
super().__init__(errno.ENETRESET, 'Failed to list blocks. Connection error', filename)
54+
super().__init__(errno.ENETRESET, f'Failed to list blocks for file ID: {filename} due to a connection error', filename)
5555

5656

5757
class BlockListTimeout(DirectIOAPIError):
5858

5959
def __init__(self, filename):
60-
super().__init__(errno.ETIMEDOUT, 'Failed to list blocks. Timed out', filename)
60+
super().__init__(errno.ETIMEDOUT, f'Timed out while listing blocks for file ID: {filename}', filename)
6161

6262

6363
class DecryptKeyError(DirectIOError):

cterasdk/direct/lib.py

Lines changed: 60 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
import asyncio
33

4-
from .types import DirectIOResponse, Block
4+
from .types import Metadata, Block
55
from .credentials import KeyPair, Bearer
66
from .crypto import decrypt_key, decrypt_block
77
from .decompressor import decompress
@@ -32,7 +32,7 @@ async def retry(coro, retries=3, backoff=1):
3232
if attempts == retries:
3333
raise error
3434
wait = backoff * (2 ** (attempts - 1))
35-
logger.debug('Failed attempt number %s. Retrying in %s seconds.', attempts, wait)
35+
logger.debug('Download of block failed on attempt %s. Retrying in %s seconds...', attempts, wait)
3636
await asyncio.sleep(wait)
3737

3838

@@ -46,23 +46,51 @@ async def get_object(client, file_id, chunk):
4646
:rtype: bytes
4747
"""
4848
async def get_object_coro():
49-
parameters = {'file_id': file_id, 'number': chunk.index, 'offset': chunk.offset}
50-
logger.debug('Downloading Block. %s', parameters)
49+
50+
message = (
51+
f"Downloading block #{chunk.index} "
52+
f"(offset={chunk.offset}, length={chunk.length})"
53+
)
54+
55+
if file_id:
56+
message += f" for file ID {file_id}"
57+
58+
error_message, exception = None, None
59+
60+
logger.debug(message)
5161
try:
5262
response = await client.get(chunk.url)
5363
return await response.read()
5464
except ConnectionError:
55-
logger.error('Failed to download block. Connection error. %s', parameters)
56-
raise DownloadConnectionError(file_id, chunk)
65+
error_message = 'connection'
66+
exception = DownloadConnectionError(file_id, chunk)
5767
except asyncio.TimeoutError:
58-
logger.error('Failed to download block. Timed out. %s', parameters)
59-
raise DownloadTimeout(file_id, chunk)
68+
error_message = 'timeout'
69+
exception = DownloadTimeout(file_id, chunk)
6070
except IOError as error:
61-
logger.error('Failed to download block. IO Error. %s', parameters)
62-
raise DownloadError(error, file_id, chunk)
71+
error_message = 'io'
72+
exception = DownloadError(error, file_id, chunk)
6373
except ClientResponseException as error:
64-
logger.error('Failed to download block. Error. %s', parameters)
65-
raise DownloadError(error.response, file_id, chunk)
74+
error_message = 'unknown'
75+
exception = DownloadError(error.response, file_id, chunk)
76+
77+
error_messages = {
78+
"connection": "Connection error",
79+
"timeout": "Timed out",
80+
"io": "I/O error",
81+
"unknown": "Unknown error"
82+
}
83+
84+
message = (
85+
f"Failed to download block #{chunk.index} "
86+
f"(offset={chunk.offset}, length={chunk.length})"
87+
)
88+
if file_id:
89+
message = message + f" for file ID {file_id}"
90+
91+
message = message + f": {error_messages.get(error_message, 'Unknown error')}."
92+
logger.error(message)
93+
raise exception
6694

6795
return await retry(get_object_coro)
6896

@@ -118,9 +146,13 @@ async def process_chunk(client, file_id, chunk, encryption_key, semaphore):
118146
:rtype: cterasdk.direct.types.Block
119147
"""
120148
async def process(client, chunk, encryption_key):
121-
parameters = {'file_id': file_id, 'number': chunk.index, 'offset': chunk.offset}
122-
logger.debug('Processing Block. %s', parameters)
123-
149+
message = (
150+
f"Processing block {chunk.index} "
151+
f"(offset={chunk.offset}, length={chunk.length})"
152+
)
153+
if file_id:
154+
message = message + f" for file ID {file_id}"
155+
logger.debug(message)
124156
encrypted_object = await get_object(client, file_id, chunk)
125157
decrypted_object = await decrypt_object(file_id, encrypted_object, encryption_key, chunk)
126158
decompressed_object = await decompress_object(file_id, decrypted_object, chunk)
@@ -144,10 +176,12 @@ async def process_chunks(client, file_id, chunks, encryption_key, semaphore=None
144176
:returns: List of futures.
145177
:rtype: list[asyncio.Task]
146178
"""
147-
parameters = {'file_id': file_id, 'blocks': len(chunks)}
179+
message = [f"Processing {len(chunks)} blocks"]
180+
if file_id:
181+
message.append(f"for file ID {file_id}")
148182
if semaphore:
149-
parameters['max_workers'] = semaphore._value # pylint: disable=protected-access
150-
logger.debug('Processing Blocks. %s', parameters)
183+
message.append(f"using up to {semaphore._value} workers") # pylint: disable=protected-access
184+
logger.debug(' '.join(message))
151185
futures = []
152186
for chunk in chunks:
153187
futures.append(asyncio.create_task(process_chunk(client, file_id, chunk, encryption_key, semaphore)))
@@ -182,9 +216,11 @@ def create_authorization_header(credentials):
182216
authorization_header = None
183217

184218
if isinstance(credentials, Bearer):
219+
logger.debug('Initializing client using bearer token')
185220
authorization_header = f'Bearer {credentials.bearer}'
186221

187222
elif isinstance(credentials, KeyPair):
223+
logger.debug('Initializing client using key pair.')
188224
authorization_header = f'Bearer {credentials.access_key_id}'
189225

190226
return {'Authorization': authorization_header}
@@ -197,17 +233,16 @@ async def get_chunks(api, credentials, file_id):
197233
:param cterasdk.clients.clients.AsyncJSON api: Asynchronous JSON Client.
198234
:param int file_id: File ID.
199235
:returns: Wrapped key and file chunks.
200-
:rtype: cterasdk.direct.types.DirectIOResponse
236+
:rtype: cterasdk.direct.types.Metadata
201237
"""
202238
async def get_chunks_coro():
203-
parameters = {'file_id': file_id}
204-
logger.debug('Listing blocks. %s', parameters)
239+
logger.debug('Listing blocks for file ID: %s', file_id)
205240
try:
206241
response = await api.get(f'{file_id}', headers=create_authorization_header(credentials))
207242
if not response.chunks:
208-
logger.error('Blocks not found. %s', parameters)
243+
logger.error('Could not find blocks for file ID: %s.', file_id)
209244
raise BlocksNotFoundError(file_id)
210-
return DirectIOResponse(response)
245+
return Metadata(file_id, response)
211246
except ClientResponseException as error:
212247
if error.response.status == 400:
213248
raise NotFoundError(file_id)
@@ -217,10 +252,10 @@ async def get_chunks_coro():
217252
raise UnprocessableContent(file_id)
218253
raise error
219254
except ConnectionError:
220-
logger.error('Failed to list blocks. Connection error. %s', parameters)
255+
logger.error('Failed to list blocks for file ID: %s due to a connection error.', file_id)
221256
raise BlockListConnectionError(file_id)
222257
except asyncio.TimeoutError:
223-
logger.error('Failed to list blocks. Timed out. %s', parameters)
258+
logger.error('Timed out while listing blocks for file ID: %s.', file_id)
224259
raise BlockListTimeout(file_id)
225260

226261
return await retry(get_chunks_coro)

0 commit comments

Comments
 (0)