Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cterasdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# pylint: disable=wrong-import-position
import cterasdk.settings # noqa: E402, F401
import cterasdk.exceptions # noqa: E402, F401

from .common import Object, PolicyRule # noqa: E402, F401
from .convert import fromjsonstr, tojsonstr, fromxmlstr, toxmlstr # noqa: E402, F401
from .exceptions import CTERAException # noqa: E402, F401
from .core import query # noqa: E402, F401
from .edge import types as edge_types # noqa: E402, F401
from .edge import enum as edge_enum # noqa: E402, F401
Expand Down
10 changes: 5 additions & 5 deletions cterasdk/asynchronous/core/files/browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ async def versions(self, path):
"""
return await io.versions(self._core, self.normalize(path))

async def walk(self, path, include_deleted=False):
async def walk(self, path=None, include_deleted=False):
"""
Walk Directory Contents

:param str path: Path to walk
:param str,optional path: Path to walk, defaults to the root directory
:param bool,optional include_deleted: Include deleted files, defaults to False
"""
return io.walk(self._core, self._scope, path, include_deleted=include_deleted)
Expand Down Expand Up @@ -134,14 +134,14 @@ def normalize(self, entries):

class CloudDrive(FileBrowser):

async def upload(self, name, size, destination, handle):
async def upload(self, name, destination, handle, size=None):
"""
Upload from file handle.

:param str name: File name.
:param str size: File size.
:param str destination: Path to remote directory.
:param object handle: Handle.
:param str,optional size: File size, defaults to content length
"""
upload_function = io.upload(name, size, self.normalize(destination), handle)
return await upload_function(self._core)
Expand All @@ -155,7 +155,7 @@ async def upload_file(self, path, destination):
"""
with open(path, 'rb') as handle:
metadata = commonfs.properties(path)
response = await self.upload(metadata['name'], metadata['size'], destination, handle)
response = await self.upload(metadata['name'], destination, handle, metadata['size'])
return response

async def mkdir(self, path):
Expand Down
53 changes: 33 additions & 20 deletions cterasdk/asynchronous/core/files/io.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from ....cio.common import encode_request_parameter
from ....cio import core as fs
from ....cio import exceptions
from ....exceptions.io import ResourceNotFoundError, NotADirectory, ResourceExistsError
from .. import query
from ....lib import FetchResourcesResponse

Expand All @@ -17,18 +17,22 @@ async def listdir(core, path, depth=None, include_deleted=False, search_criteria


async def exists(core, path):
try:
await metadata(core, path)
return True
except exceptions.ResourceNotFoundError:
return False
present, *_ = await metadata(core, path, suppress_error=True)
return present


async def metadata(core, path):
async def metadata(core, path, suppress_error=False):
"""
Get item metadata.

:returns: A tuple indicating if a file exists, and its metadata
"""
response = await listdir(core, path, 0)
if response.root is None:
raise exceptions.ResourceNotFoundError(path.absolute)
return response.root
if not suppress_error:
raise ResourceNotFoundError(path.absolute)
return False, None
return True, response.root


async def versions(core, path):
Expand Down Expand Up @@ -59,7 +63,7 @@ async def makedirs(core, path):
path = fs.CorePath.instance(path.scope, '/'.join(directories[:i]))
try:
await mkdir(core, path)
except exceptions.ResourceExistsError:
except ResourceExistsError:
logger.debug('Resource already exists: %s', path.reference.as_posix())


Expand Down Expand Up @@ -88,11 +92,11 @@ async def move(core, *paths, destination=None):
return await core.v1.api.execute('', 'moveResources', param)


async def retrieve_remote_dir(core, directory):
resource = await metadata(core, directory)
if not resource.isFolder:
raise exceptions.RemoteStorageException('The destination path is not a directory', None, path=directory.absolute)
return str(resource.cloudFolderInfo.uid)
async def ensure_directory(core, directory, suppress_error=False):
present, resource = await metadata(core, directory, suppress_error=True)
if (not present or not resource.isFolder) and not suppress_error:
raise NotADirectory(directory.absolute)
return resource.isFolder if present else False, resource


def handle(path):
Expand Down Expand Up @@ -132,10 +136,19 @@ async def wrapper(core):
:param object handle: File handle.
"""
with fs.handle_many(directory, objects) as param:
return await core.io.download_zip(await retrieve_remote_dir(core, directory), encode_request_parameter(param))
_, resource = await ensure_directory(core, directory)
return await core.io.download_zip(str(resource.cloudFolderInfo.uid), encode_request_parameter(param))
return wrapper


async def _validate_destination(core, name, destination):
is_dir, resource = await ensure_directory(core, destination, suppress_error=True)
if not is_dir:
is_dir, resource = await ensure_directory(core, destination.parent)
return resource.cloudFolderInfo.uid, destination.name, destination.parent
return resource.cloudFolderInfo.uid, name, destination


def upload(name, size, destination, fd):
"""
Create upload function
Expand All @@ -150,11 +163,11 @@ async def wrapper(core):
"""
Upload file from metadata and file handle.

:param cterasdk.objects.synchronous.core.Portal core: POrtal object.
:param cterasdk.objects.synchronous.core.Portal core: Portal object.
"""
target = await retrieve_remote_dir(core, destination)
with fs.upload(core, name, destination, size, fd) as param:
return await core.io.upload(target, param)
uid, filename, directory = await _validate_destination(core, name, destination)
with fs.upload(core, filename, directory, size, fd) as param:
return await core.io.upload(str(uid), param)
return wrapper


Expand Down
5 changes: 4 additions & 1 deletion cterasdk/asynchronous/core/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from abc import abstractmethod


logger = logging.getLogger('cterasdk.common')


class BaseAsyncIterator:
"""Abstract Asynchronous Iterator"""

Expand All @@ -23,7 +26,7 @@ async def __anext__(self):
self._objects.extend(page)
if self._objects:
return self.object
logging.getLogger('cterasdk.common').debug('Stopping iteration.')
logger.debug('Stopping iteration.')
raise StopAsyncIteration

@property
Expand Down
46 changes: 24 additions & 22 deletions cterasdk/asynchronous/core/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from .base_command import BaseCommand
from ...common import Object
from ...lib import CursorResponse
from ...exceptions import HTTPError, NotificationsError
from ...exceptions.transport import HTTPError
from ...exceptions.notifications import NotificationsError


logger = logging.getLogger('cterasdk.notifications')


class Notifications(BaseCommand):
Expand All @@ -30,11 +34,11 @@ async def get(self, cloudfolders=None, cursor=None, max_results=None):
"""
param = await self._create_parameter(cloudfolders, cursor)
param.max_results = max_results if max_results is not None else 2000
logging.getLogger('cterasdk.metadata.connector').debug('Listing updates.')
logger.debug('Listing updates.')
response = await self._core.v2.api.post('/metadata/list', param)
if response is not None:
return CursorResponse(response)
logging.getLogger('cterasdk.metadata.connector').error('An error occurred while trying to retrieve notifications.')
logger.error('An error occurred while trying to retrieve notifications.')
raise NotificationsError(cloudfolders, cursor)

async def _create_parameter(self, cloudfolders, cursor):
Expand Down Expand Up @@ -64,7 +68,7 @@ async def changes(self, cursor, cloudfolders=None, timeout=None):
param = Object()
param = await self._create_parameter(cloudfolders, cursor)
param.timeout = timeout if timeout else 10000
logging.getLogger('cterasdk.metadata.connector').debug('Checking for updates. %s', {'timeout': param.timeout})
logger.debug('Checking for updates. %s', {'timeout': param.timeout})
return (await self._core.v2.api.post('/metadata/longpoll', param)).changes

async def ancestors(self, descendant):
Expand All @@ -78,12 +82,11 @@ async def ancestors(self, descendant):
param = Object()
param.folder_id = descendant.folder_id
param.guid = descendant.guid
logging.getLogger('cterasdk.metadata.connector').debug('Getting ancestors. %s', {'guid': param.guid, 'folder_id': param.folder_id})
logger.debug('Getting ancestors. %s', {'guid': param.guid, 'folder_id': param.folder_id})
try:
return await self._core.v2.api.post('/metadata/ancestors', param)
except HTTPError:
logging.getLogger('cterasdk.metadata.connector').error('Could not retrieve ancestors. %s',
{'folder_id': param.folder_id, 'guid': param.guid})
logger.error('Could not retrieve ancestors. %s', {'folder_id': param.folder_id, 'guid': param.guid})
raise


Expand Down Expand Up @@ -128,7 +131,7 @@ async def retrieve_events(server_queue, core, cloudfolders, cursor):
:param list[CloudFSFolderFindingHelper] cloudfolders: List of Cloud Drive folders.
:param str cursor: Cursor
"""
logging.getLogger('cterasdk.metadata.connector').debug('Event Retrieval Service.')
logger.debug('Event Retrieval Service.')
last_response = LastResponse(cursor)
try:
while True:
Expand All @@ -142,9 +145,9 @@ async def retrieve_events(server_queue, core, cloudfolders, cursor):
except ConnectionError as error:
await on_connection_error(error)
except TimeoutError:
logging.getLogger('cterasdk.metadata.connector').debug('Request timed out. Retrying.')
logger.debug('Request timed out. Retrying.')
except asyncio.CancelledError:
logging.getLogger('cterasdk.metadata.connector').debug('Cancelling Event Retrieval.')
logger.debug('Cancelling Event Retrieval.')


async def forward_events(server_queue, client_queue, save_cursor):
Expand All @@ -155,15 +158,15 @@ async def forward_events(server_queue, client_queue, save_cursor):
:param asyncio.Queue client_queue: Client queue.
:param callback save_cursor: Callback function to persist the cursor.
"""
logging.getLogger('cterasdk.metadata.connector').debug('Event Forwarder Service.')
logger.debug('Event Forwarder Service.')
try:
while True:
batch = await server_queue.get()
await enqueue_events(batch.objects, client_queue)
await process_events(client_queue)
await persist_cursor(save_cursor, batch.cursor)
except asyncio.CancelledError:
logging.getLogger('cterasdk.metadata.connector').debug('Cancelling Event Forwarding.')
logger.debug('Cancelling Event Forwarding.')


async def enqueue_events(events, queue):
Expand All @@ -174,9 +177,9 @@ async def enqueue_events(events, queue):
:param cterasdk.asynchronous.core.iterator.CursorAsyncIterator events: Event Iterator.
"""
for event in events:
logging.getLogger('cterasdk.metadata.connector').debug('Enqueuing Event.')
logger.debug('Enqueuing Event.')
await queue.put(Event.from_server_object(event))
logging.getLogger('cterasdk.metadata.connector').debug('Enqueued Event.')
logger.debug('Enqueued Event.')


async def process_events(queue):
Expand All @@ -185,9 +188,9 @@ async def process_events(queue):

:param asyncio.Queue queue: Queue.
"""
logging.getLogger('cterasdk.metadata.connector').debug('Joining Queue.')
logger.debug('Joining Queue.')
await queue.join()
logging.getLogger('cterasdk.metadata.connector').debug('Completed Processing.')
logger.debug('Completed Processing.')


async def persist_cursor(save_cursor, cursor):
Expand All @@ -197,19 +200,18 @@ async def persist_cursor(save_cursor, cursor):
:param callback save_cursor: Asynchronous callback function to persist the cursor.
:param str cursor: Cursor
"""
logging.getLogger('cterasdk.metadata.connector').debug("Persisting Cursor. Calling function: '%s'", save_cursor)
logger.debug("Persisting Cursor. Calling function: '%s'", save_cursor)
try:
await save_cursor(cursor)
logging.getLogger('cterasdk.metadata.connector').debug("Called Persist Cursor Function.")
logger.debug("Called Persist Cursor Function.")
except Exception: # pylint: disable=broad-exception-caught
logging.getLogger('cterasdk.metadata.connector').error("An error occurred while trying to persist cursor. Function: '%s'",
save_cursor)
logger.error("An error occurred while trying to persist cursor. Function: '%s'", save_cursor)


async def on_connection_error(error):
seconds = 5
logging.getLogger('cterasdk.metadata.connector').error('Connection error. Reason: %s.', str(error))
logging.getLogger('cterasdk.metadata.connector').debug("Retrying in %s seconds.", seconds)
logger.error('Connection error. Reason: %s.', str(error))
logger.debug("Retrying in %s seconds.", seconds)
await asyncio.sleep(seconds)


Expand Down
7 changes: 4 additions & 3 deletions cterasdk/asynchronous/core/users.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .base_command import BaseCommand
from ...common import union, Object
from ...exceptions import ObjectNotFoundException, ContextError
from ...exceptions import ObjectNotFoundException
from ...exceptions.session import ContextError


class Users(BaseCommand):
Expand All @@ -25,7 +26,7 @@ async def get(self, user_account, include=None):
include = ['/' + attr for attr in include]
user_object = await self._core.v1.api.get_multi(baseurl, include)
if user_object.name is None:
raise ObjectNotFoundException('Could not find user', baseurl, user_directory=user_account.directory, username=user_account.name)
raise ObjectNotFoundException(baseurl)
return user_object

async def generate_ticket(self, username, tenant):
Expand All @@ -36,7 +37,7 @@ async def generate_ticket(self, username, tenant):
:param str portal: Tenant
"""
if self.session().in_tenant_context():
raise ContextError('Context error: Browse the Global Administration to invoke this API.')
raise ContextError('Browse the Global Administration to invoke this API')
param = Object()
param.username = username
param.portal = tenant
Expand Down
4 changes: 2 additions & 2 deletions cterasdk/asynchronous/edge/files/browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ async def listdir(self, path):
"""
return await io.listdir(self._edge, self.normalize(path))

async def walk(self, path):
async def walk(self, path=None):
"""
Walk Directory Contents

:param str path: Path to walk
:param str, defaults to the root directory path: Path to walk
"""
return io.walk(self._edge, path)

Expand Down
Loading