diff --git a/cterasdk/asynchronous/core/files/browser.py b/cterasdk/asynchronous/core/files/browser.py index 5364ea45..865c30d5 100644 --- a/cterasdk/asynchronous/core/files/browser.py +++ b/cterasdk/asynchronous/core/files/browser.py @@ -105,15 +105,18 @@ async def public_link(self, path, access='RO', expire_in=30): """ return await io.public_link(self._core, self.normalize(path), access, expire_in) - async def copy(self, *paths, destination=None): + async def copy(self, *paths, destination=None, wait=False): """ Copy one or more files or folders :param list[str] paths: List of paths :param str destination: Destination + :param bool,optional wait: ``True`` Wait for task to complete, or ``False`` to return an awaitable task object. + :returns: Task status object, or an awaitable task object + :rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask` """ try: - return await io.copy(self._core, *[self.normalize(path) for path in paths], destination=self.normalize(destination)) + return await io.copy(self._core, *[self.normalize(path) for path in paths], destination=self.normalize(destination), wait=wait) except ValueError: raise ValueError('Copy destination was not specified.') @@ -175,39 +178,51 @@ async def makedirs(self, path): """ return await io.makedirs(self._core, self.normalize(path)) - async def rename(self, path, name): + async def rename(self, path, name, *, wait=False): """ Rename a file :param str path: Path of the file or directory to rename :param str name: The name to rename to + :param bool,optional wait: ``True`` Wait for task to complete, or ``False`` to return an awaitable task object. + :returns: Task status object, or an awaitable task object + :rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask` """ - return await io.rename(self._core, self.normalize(path), name) + return await io.rename(self._core, self.normalize(path), name, wait=wait) - async def delete(self, *paths): + async def delete(self, *paths, wait=False): """ Delete one or more files or folders :param str path: Path + :param bool,optional wait: ``True`` Wait for task to complete, or ``False`` to return an awaitable task object. + :returns: Task status object, or an awaitable task object + :rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask` """ - return await io.remove(self._core, *[self.normalize(path) for path in paths]) + return await io.remove(self._core, *[self.normalize(path) for path in paths], wait=wait) - async def undelete(self, *paths): + async def undelete(self, *paths, wait=False): """ Recover one or more files or folders :param str path: Path + :param bool,optional wait: ``True`` Wait for task to complete, or ``False`` to return an awaitable task object. + :returns: Task status object, or an awaitable task object + :rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask` """ - return await io.recover(self._core, *[self.normalize(path) for path in paths]) + return await io.recover(self._core, *[self.normalize(path) for path in paths], wait=wait) - async def move(self, *paths, destination=None): + async def move(self, *paths, destination=None, wait=False): """ Move one or more files or folders :param list[str] paths: List of paths :param str destination: Destination + :param bool,optional wait: ``True`` Wait for task to complete, or ``False`` to return an awaitable task object. + :returns: Task status object, or an awaitable task object + :rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask` """ try: - return await io.move(self._core, *[self.normalize(path) for path in paths], destination=self.normalize(destination)) + return await io.move(self._core, *[self.normalize(path) for path in paths], destination=self.normalize(destination), wait=wait) except ValueError: raise ValueError('Move destination was not specified.') diff --git a/cterasdk/asynchronous/core/files/io.py b/cterasdk/asynchronous/core/files/io.py index a7920794..c41b5968 100644 --- a/cterasdk/asynchronous/core/files/io.py +++ b/cterasdk/asynchronous/core/files/io.py @@ -1,9 +1,8 @@ import logging -from ....cio.common import encode_request_parameter +from ....cio.common import encode_request_parameter, a_await_or_future from ....cio import core as fs from ....exceptions.io import ResourceNotFoundError, NotADirectory, ResourceExistsError from .. import query -from ....lib import FetchResourcesResponse logger = logging.getLogger('cterasdk.core') @@ -12,7 +11,7 @@ async def listdir(core, path, depth=None, include_deleted=False, search_criteria=None, limit=None): with fs.fetch_resources(path, depth, include_deleted, search_criteria, limit) as param: if param.depth > 0: - return query.iterator(core, '', param, 'fetchResources', callback_response=FetchResourcesResponse) + return query.iterator(core, '', param, 'fetchResources', callback_response=fs.FetchResourcesResponse) return await core.v1.api.execute('', 'fetchResources', param) @@ -54,7 +53,7 @@ async def walk(core, scope, path, include_deleted=False): async def mkdir(core, path): with fs.makedir(path) as param: response = await core.v1.api.execute('', 'makeCollection', param) - fs.accept_response(response, path.reference.as_posix()) + fs.accept_response(response) async def makedirs(core, path): @@ -67,29 +66,34 @@ async def makedirs(core, path): logger.debug('Resource already exists: %s', path.reference.as_posix()) -async def rename(core, path, name): +async def rename(core, path, name, *, wait=False): with fs.rename(path, name) as param: - return await core.v1.api.execute('', 'moveResources', param) + ref = await core.v1.api.execute('', 'moveResources', param) + return await a_await_or_future(core, ref, wait) -async def remove(core, *paths): +async def remove(core, *paths, wait=False): with fs.delete(*paths) as param: - return await core.v1.api.execute('', 'deleteResources', param) + ref = await core.v1.api.execute('', 'deleteResources', param) + return await a_await_or_future(core, ref, wait) -async def recover(core, *paths): +async def recover(core, *paths, wait=False): with fs.recover(*paths) as param: - return await core.v1.api.execute('', 'restoreResources', param) + ref = await core.v1.api.execute('', 'restoreResources', param) + return await a_await_or_future(core, ref, wait) -async def copy(core, *paths, destination=None): +async def copy(core, *paths, destination=None, wait=False): with fs.copy(*paths, destination=destination) as param: - return await core.v1.api.execute('', 'copyResources', param) + ref = await core.v1.api.execute('', 'copyResources', param) + return await a_await_or_future(core, ref, wait) -async def move(core, *paths, destination=None): +async def move(core, *paths, destination=None, wait=False): with fs.move(*paths, destination=destination) as param: - return await core.v1.api.execute('', 'moveResources', param) + ref = await core.v1.api.execute('', 'moveResources', param) + return await a_await_or_future(core, ref, wait) async def ensure_directory(core, directory, suppress_error=False): @@ -159,6 +163,8 @@ def upload(name, size, destination, fd): :returns: Callable function to start the upload. :rtype: callable """ + fs.destination_prerequisite_conditions(destination, name) + async def wrapper(core): """ Upload file from metadata and file handle. diff --git a/cterasdk/asynchronous/core/tasks.py b/cterasdk/asynchronous/core/tasks.py new file mode 100644 index 00000000..16cb4e6a --- /dev/null +++ b/cterasdk/asynchronous/core/tasks.py @@ -0,0 +1,17 @@ +import logging +from ...lib.tasks import AwaitablePortalTask +from .base_command import BaseCommand + + +logger = logging.getLogger('cterasdk.core') + + +class Tasks(BaseCommand): + """ Portal Background Task APIs """ + + def awaitable_task(self, ref): + return AwaitablePortalTask(self._core, ref) + + async def wait(self, ref, timeout=None, poll_interval=None): + awaitable_task = AwaitablePortalTask(self._core, ref) + return await awaitable_task.a_wait(timeout, poll_interval) diff --git a/cterasdk/cio/common.py b/cterasdk/cio/common.py index 0b517e74..92cfce0a 100644 --- a/cterasdk/cio/common.py +++ b/cterasdk/cio/common.py @@ -77,3 +77,27 @@ def encode_request_parameter(param): return dict( inputXML=utf8_decode(toxmlstr(param)) ) + + +def await_or_future(ctera, ref, wait): + """ + Wait for task completion, or return an awaitable task object. + + :param str ref: Task reference + :param bool wait: ``True`` to wait for task completion, ``False`` to return an awaitable task object + """ + if wait: + return ctera.tasks.wait(ref) + return ctera.tasks.awaitable_task(ref) + + +async def a_await_or_future(ctera, ref, wait): + """ + Wait for task completion, or return an awaitable task object. + + :param str ref: Task reference + :param bool wait: ``True`` to wait for task completion, ``False`` to return an awaitable task object + """ + if wait: + return await ctera.tasks.wait(ref) + return ctera.tasks.awaitable_task(ref) diff --git a/cterasdk/cio/core.py b/cterasdk/cio/core.py index af494266..77e6537f 100644 --- a/cterasdk/cio/core.py +++ b/cterasdk/cio/core.py @@ -3,9 +3,10 @@ from contextlib import contextmanager from ..objects.uri import quote, unquote from ..common import Object, DateTimeUtils -from ..core.enum import ProtectionLevel, CollaboratorType, SearchType, PortalAccountType, FileAccessMode +from ..core.enum import ProtectionLevel, CollaboratorType, SearchType, PortalAccountType, FileAccessMode, FileAccessError from ..core.types import PortalAccount, UserAccount, GroupAccount -from ..exceptions.io import ResourceExistsError, PathValidationError, NameSyntaxError, ReservedNameError +from ..exceptions.io import ResourceExistsError, PathValidationError, NameSyntaxError, ReservedNameError, RestrictedRoot +from ..lib.iterator import DefaultResponse from . import common @@ -177,6 +178,17 @@ def build(self): return self.param +class FetchResourcesResponse(DefaultResponse): + + def __init__(self, response): + accept_response(response.errorType) + super().__init__(response) + + @property + def objects(self): + return self._response.items + + @contextmanager def fetch_resources(path, depth, include_deleted, search_criteria, limit): """ @@ -278,6 +290,13 @@ def handle(path): yield path.reference +def destination_prerequisite_conditions(destination, name): + if not destination.reference.root: + raise RestrictedRoot() + if any(c in name for c in ['\\', '/', ':', '?', '&', '<', '>', '"', '|']): + raise NameSyntaxError() + + @contextmanager def upload(core, name, destination, size, fd): fd, size = common.encode_stream(fd, size) @@ -489,28 +508,28 @@ def obtain_current_accounts(param): return current_accounts -def accept_response(response, reference): +def accept_response(error_type): """ Check if response contains an error. """ error = { - "FileWithTheSameNameExist": ResourceExistsError(), - "DestinationNotExists": PathValidationError(), - "InvalidName": NameSyntaxError(), - "ReservedName": ReservedNameError() - }.get(response, None) + FileAccessError.FileWithTheSameNameExist: ResourceExistsError(), + FileAccessError.DestinationNotExists: PathValidationError(), + FileAccessError.InvalidName: NameSyntaxError(), + FileAccessError.ReservedName: ReservedNameError() + }.get(error_type, None) try: if error: raise error except ResourceExistsError as error: - logger.info('Resource already exists: a file or folder with this name already exists. %s', {'path': reference}) + logger.info('Resource already exists: a file or folder with this name already exists.') raise error except PathValidationError as error: - logger.error('Path validation failed: the specified destination path does not exist. %s', {'path': reference}) + logger.error('Path validation failed: the specified destination path does not exist.') raise error except NameSyntaxError as error: - logger.error('Invalid name: the name contains characters that are not allowed. %s', {'name': reference}) + logger.error('Invalid name: the name contains characters that are not allowed.') raise error except ReservedNameError as error: - logger.error('Reserved name error: the name is reserved and cannot be used. %s', {'name': reference}) + logger.error('Reserved name error: the name is reserved and cannot be used.') raise error diff --git a/cterasdk/common/enum.py b/cterasdk/common/enum.py index f90f69db..dd927af7 100644 --- a/cterasdk/common/enum.py +++ b/cterasdk/common/enum.py @@ -96,3 +96,28 @@ class ScheduleType: Monthly = 'monthly' Interval = 'interval' Window = 'window' + + +class TaskRunningStatus: + """ + Task Status + + :ivar str Disabled: Task is disabled. + :ivar str Running: Task is currently running. + :ivar str Completed: Task completed successfully. + :ivar str Failed: Task failed. + :ivar str Retrying: Task is retrying after a failure. + :ivar str Waiting: Task is waiting to be run. + :ivar str Stopping: Task is in the process of stopping. + :ivar str Stopped: Task has been stopped. + :ivar str Warnings: Task completed with warnings. + """ + Disabled = 'disabled' + Running = 'running' + Completed = 'completed' + Failed = 'failed' + Retrying = 'retrying' + Waiting = 'waiting' + Stopping = 'stopping' + Stopped = 'stopped' + Warnings = 'completed with warnings' diff --git a/cterasdk/core/cloudfs.py b/cterasdk/core/cloudfs.py index b0b2424a..e18b7080 100644 --- a/cterasdk/core/cloudfs.py +++ b/cterasdk/core/cloudfs.py @@ -135,14 +135,24 @@ def delete(self, name): class CloudDrives(BaseCommand): - """ Cloud Drive Folder APIs """ + """ Cloud Drive Folder APIs + + :ivar cterasdk.core.cloudfs.Locks locks: Global file locking APIs + """ default = ['name', 'group', 'owner'] + default_extensions = ["ppt", "pptx", "xls", "xlsx", "doc", "docx", "indd", "idlk", "dwl", "dwl2", "dwt", "dwg", "rvt", "dat"] + + def __init__(self, core): + super().__init__(core) + self.locks = Locks(self._core) + def _get_entire_object(self, name, owner): return self._core.api.get(f'{self.find(name, owner, include=["baseObjectRef"]).baseObjectRef}') - def add(self, name, group, owner, winacls=True, description=None, quota=None, compliance_settings=None, xattrs=None): + def add(self, name, group, owner, winacls=True, description=None, # pylint: disable=too-many-arguments + quota=None, compliance_settings=None, xattrs=None, gfl=False, lock_extensions=None): """ Create a new Cloud Drive Folder (Cloud Volume) @@ -156,6 +166,10 @@ def add(self, name, group, owner, winacls=True, description=None, quota=None, co Use :func:`cterasdk.core.types.ComplianceSettingsBuilder` to build the compliance settings object :param cterasdk.common.object.Object,optional xattrs: Extended attributes, defaults to MacOS. Use :func:`cterasdk.core.types.ExtendedAttributesBuilder` to build the extended attributes object + :param bool,optional gfl: Enable global file locking + :param list[str],optional lock_extensions: List of file extensions (without leading dot) for which global file locking is enforced. + :returns: Path to the Cloud Drive folder + :rtype: str """ param = Object() param.name = name @@ -179,22 +193,31 @@ def add(self, name, group, owner, winacls=True, description=None, quota=None, co else: param.extendedAttributes = ExtendedAttributesBuilder.default().build() + if gfl: + param.globalFileLockSettings = Object() + param.globalFileLockSettings._classname = 'GlobalFileLockSettings' # pylint: disable=protected-access + param.globalFileLockSettings.enabled = True + param.globalFileLockSettings.globalFileLockExtensions = ( + lock_extensions if lock_extensions else CloudDrives.default_extensions + ) + try: response = self._core.api.execute('', 'addCloudDrive', param) logger.info( 'Cloud drive folder created. %s', {'name': name, 'owner': param.owner, 'folder_group': group, 'winacls': winacls} ) - return response + return re.search(r'/Users\/(.+)', response).group() except CTERAException as error: logger.error( 'Cloud drive folder creation failed. %s', - {'name': name, 'folder_group': group, 'owner': owner, 'win_acls': winacls} + {'name': name, 'folder_group': group, 'owner': str(owner), 'win_acls': winacls} ) raise error - def modify(self, current_name, owner, new_name=None, new_owner=None, new_group=None, # pylint: disable=too-many-arguments - description=None, winacls=None, quota=None, compliance_settings=None, xattrs=None): + def modify(self, current_name, owner, new_name=None, # pylint: disable=too-many-arguments, too-many-locals + new_owner=None, new_group=None, description=None, winacls=None, quota=None, compliance_settings=None, xattrs=None, + gfl=None, lock_extensions=None): """ Modify a Cloud Drive Folder (Cloud Volume) @@ -210,6 +233,8 @@ def modify(self, current_name, owner, new_name=None, new_owner=None, new_group=N Use :func:`cterasdk.core.types.ComplianceSettingsBuilder` to build the compliance settings object :param cterasdk.common.object.Object,optional xattrs: Extended attributes. Use :func:`cterasdk.core.types.ExtendedAttributesBuilder` to build the extended attributes object + :param bool,optional gfl: Enable global file locking + :param list[str],optional lock_extensions: List of file extensions (without leading dot) for which global file locking is enforced. """ param = self._get_entire_object(current_name, owner) if new_name: @@ -228,6 +253,13 @@ def modify(self, current_name, owner, new_name=None, new_owner=None, new_group=N param.wormSettings = compliance_settings if xattrs: param.extendedAttributes = xattrs + if gfl is not None: + param.globalFileLockSettings.enabled = gfl + if lock_extensions: + param.globalFileLockSettings.globalFileLockExtensions = ( + lock_extensions if lock_extensions else CloudDrives.default_extensions + ) + try: response = self._core.api.put(f'/{param.baseObjectRef}', param) logger.info('Cloud drive folder updated. %s', {'name': current_name}) @@ -727,3 +759,16 @@ def delete(self, name): response = self._core.api.delete(f'/buckets/{name}') logger.info('Bucket deleted. %s', {'name': name}) return response + + +class Locks(BaseCommand): + + def all(self): + """ + List Locked Files + + :returns: Iterator for all locked files + """ + builder = query.QueryParamBuilder().include_classname() + param = builder.build() + return query.iterator(self._core, '', param, 'getLockedFilesInfo') diff --git a/cterasdk/core/enum.py b/cterasdk/core/enum.py index 70c69173..027e68a7 100644 --- a/cterasdk/core/enum.py +++ b/cterasdk/core/enum.py @@ -644,3 +644,50 @@ class Reports: Portals = 'portalsStatisticsReport' Folders = 'foldersStatisticsReport' FolderGroups = 'folderGroupsStatisticsReport' + + +class FileAccessError: + """ + File Access Error + + :ivar Conflict: Conflict occurred during file operation. + :ivar PermissionDenied: Operation denied due to insufficient permissions. + :ivar MoveDeletedFile: Attempted to move a file that was deleted. + :ivar CopyToSubFolder: Cannot copy a folder into one of its subfolders. + :ivar QuotaViolation: Operation exceeds allowed storage quota. + :ivar DestinationNotExists: Destination folder does not exist. + :ivar CancelledByUser: Operation was cancelled by the user. + :ivar InternalError: An internal error occurred. + :ivar UserPasswordRequired: User password is required to proceed. + :ivar PassphraseRequire: A passphrase is required. + :ivar CopyFileToRoot: Attempted to copy a file to the root directory. + :ivar FileWithTheSameNameExist: A file with the same name already exists. + :ivar RejectedByPolicy: Operation was rejected by a policy rule. + :ivar RejectedByWormSettings: Operation not allowed by WORM (Write Once Read Many) settings. + :ivar TooManyFailedAuthenticationAttemps: Too many failed authentication attempts. + :ivar UserActionTimeout: Timed out waiting for user action. + :ivar InvalidName: Provided name is invalid. + :ivar ReservedName: The name provided is reserved and cannot be used. + :ivar CannotRunPermanentDeleteWhenFsckIsRunning: FSCK is running, cannot perform permanent delete. + :ivar ResourceLocked: The resource is currently locked. + """ + Conflict = "Conflict" + PermissionDenied = "PermissionDenied" + MoveDeletedFile = "MoveDeletedFile" + CopyToSubFolder = "CopyToSubFolder" + QuotaViolation = "QuotaViolation" + DestinationNotExists = "DestinationNotExists" + CancelledByUser = "CancelledByUser" + InternalError = "InternalError" + UserPasswordRequired = "UserPasswordRequired" + PassphraseRequire = "PassphraseRequire" + CopyFileToRoot = "CopyFileToRoot" + FileWithTheSameNameExist = "FileWithTheSameNameExist" + RejectedByPolicy = "RejectedByPolicy" + RejectedByWormSettings = "RejectedByWormSettings" + TooManyFailedAuthenticationAttemps = "TooManyFailedAuthenticationAttemps" + UserActionTimeout = "UserActionTimeout" + InvalidName = "InvalidName" + ReservedName = "ReservedName" + CannotRunPermanentDeleteWhenFsckIsRunning = "CannotRunPermanentDeleteWhenFsckIsRunning" + ResourceLocked = "ResourceLocked" diff --git a/cterasdk/core/files/browser.py b/cterasdk/core/files/browser.py index 37fbef1b..234c2d3e 100644 --- a/cterasdk/core/files/browser.py +++ b/cterasdk/core/files/browser.py @@ -111,15 +111,18 @@ def public_link(self, path, access='RO', expire_in=30): """ return io.public_link(self._core, self.normalize(path), access, expire_in) - def copy(self, *paths, destination=None): + def copy(self, *paths, destination=None, wait=True): """ Copy one or more files or folders :param list[str] paths: List of paths :param str destination: Destination + :param bool,optional wait: ``True`` Wait for task to complete, or ``False`` to return an awaitable task object. + :returns: Task status object, or an awaitable task object + :rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask` """ try: - return io.copy(self._core, *[self.normalize(path) for path in paths], destination=self.normalize(destination)) + return io.copy(self._core, *[self.normalize(path) for path in paths], destination=self.normalize(destination), wait=wait) except ValueError: raise ValueError('Copy destination was not specified.') @@ -181,40 +184,52 @@ def makedirs(self, path): """ return io.makedirs(self._core, self.normalize(path)) - def rename(self, path, name): + def rename(self, path, name, *, wait=True): """ Rename a file :param str path: Path of the file or directory to rename :param str name: The name to rename to + :param bool,optional wait: ``True`` Wait for task to complete, or ``False`` to return an awaitable task object. + :returns: Task status object, or an awaitable task object + :rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask` """ - return io.rename(self._core, self.normalize(path), name) + return io.rename(self._core, self.normalize(path), name, wait=wait) - def delete(self, *paths): + def delete(self, *paths, wait=True): """ Delete one or more files or folders :param str path: Path + :param bool,optional wait: ``True`` Wait for task to complete, or ``False`` to return an awaitable task object. + :returns: Task status object, or an awaitable task object + :rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask` """ - return io.remove(self._core, *[self.normalize(path) for path in paths]) + return io.remove(self._core, *[self.normalize(path) for path in paths], wait=wait) - def undelete(self, *paths): + def undelete(self, *paths, wait=True): """ Recover one or more files or folders :param str path: Path + :param bool,optional wait: ``True`` Wait for task to complete, or ``False`` to return an awaitable task object. + :returns: Task status object, or an awaitable task object + :rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask` """ - return io.recover(self._core, *[self.normalize(path) for path in paths]) + return io.recover(self._core, *[self.normalize(path) for path in paths], wait=wait) - def move(self, *paths, destination=None): + def move(self, *paths, destination=None, wait=True): """ Move one or more files or folders :param list[str] paths: List of paths :param str destination: Destination + :param bool,optional wait: ``True`` Wait for task to complete, or ``False`` to return an awaitable task object. + :returns: Task status object, or an awaitable task object + :rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask` """ try: - return io.move(self._core, *[self.normalize(path) for path in paths], destination=self.normalize(destination)) + return io.move(self._core, *[self.normalize(path) for path in paths], destination=self.normalize(destination), wait=wait) except ValueError: raise ValueError('Move destination was not specified.') diff --git a/cterasdk/core/files/io.py b/cterasdk/core/files/io.py index 805eaa1c..ff660e13 100644 --- a/cterasdk/core/files/io.py +++ b/cterasdk/core/files/io.py @@ -1,10 +1,9 @@ import logging -from ...cio.common import encode_request_parameter +from ...cio.common import encode_request_parameter, await_or_future from ...cio import core as fs from ...exceptions.io import ResourceNotFoundError, ResourceExistsError, NotADirectory from ...core import query from ..enum import CollaboratorType -from ...lib import FetchResourcesResponse logger = logging.getLogger('cterasdk.core') @@ -13,7 +12,7 @@ def listdir(core, path, depth=None, include_deleted=False, search_criteria=None, limit=None): with fs.fetch_resources(path, depth, include_deleted, search_criteria, limit) as param: if param.depth > 0: - return query.iterator(core, '', param, 'fetchResources', callback_response=FetchResourcesResponse) + return query.iterator(core, '', param, 'fetchResources', callback_response=fs.FetchResourcesResponse) return core.api.execute('', 'fetchResources', param) @@ -42,6 +41,7 @@ def versions(core, path): def walk(core, scope, path, include_deleted=False): + ensure_directory(core, path) paths = [fs.CorePath.instance(scope, path)] while len(paths) > 0: path = paths.pop(0) @@ -55,7 +55,7 @@ def walk(core, scope, path, include_deleted=False): def mkdir(core, path): with fs.makedir(path) as param: response = core.api.execute('', 'makeCollection', param) - fs.accept_response(response, path.reference.as_posix()) + fs.accept_response(response) def makedirs(core, path): @@ -68,29 +68,34 @@ def makedirs(core, path): logger.debug('Resource already exists: %s', path.reference.as_posix()) -def rename(core, path, name): +def rename(core, path, name, *, wait=True): with fs.rename(path, name) as param: - return core.api.execute('', 'moveResources', param) + ref = core.api.execute('', 'moveResources', param) + return await_or_future(core, ref, wait) -def remove(core, *paths): +def remove(core, *paths, wait=True): with fs.delete(*paths) as param: - return core.api.execute('', 'deleteResources', param) + ref = core.api.execute('', 'deleteResources', param) + return await_or_future(core, ref, wait) -def recover(core, *paths): +def recover(core, *paths, wait=True): with fs.recover(*paths) as param: - return core.api.execute('', 'restoreResources', param) + ref = core.api.execute('', 'restoreResources', param) + return await_or_future(core, ref, wait) -def copy(core, *paths, destination=None): +def copy(core, *paths, destination=None, wait=True): with fs.copy(*paths, destination=destination) as param: - return core.api.execute('', 'copyResources', param) + ref = core.api.execute('', 'copyResources', param) + return await_or_future(core, ref, wait) -def move(core, *paths, destination=None): +def move(core, *paths, destination=None, wait=True): with fs.move(*paths, destination=destination) as param: - return core.api.execute('', 'moveResources', param) + ref = core.api.execute('', 'moveResources', param) + return await_or_future(core, ref, wait) def ensure_directory(core, directory, suppress_error=False): @@ -160,6 +165,8 @@ def upload(name, size, destination, fd): :returns: Callable function to start the upload. :rtype: callable """ + fs.destination_prerequisite_conditions(destination, name) + def wrapper(core): """ Upload file from metadata and file handle. diff --git a/cterasdk/core/login.py b/cterasdk/core/login.py index 23543cc0..3482c0f4 100644 --- a/cterasdk/core/login.py +++ b/cterasdk/core/login.py @@ -1,7 +1,8 @@ import logging from .base_command import BaseCommand -from ..exceptions import CTERAException +from ..exceptions.transport import Forbidden +from ..exceptions.auth import AuthenticationError logger = logging.getLogger('cterasdk.core') @@ -14,18 +15,19 @@ class Login(BaseCommand): def login(self, username, password): """ - Log into the portal + Log in to CTERA Portal - :param str username: User name to log in + :param str username: User name :param str password: User password + :raises: :class:`cterasdk.exceptions.auth.AuthenticationError` """ host = self._core.host() try: self._core.api.form_data('/login', {'j_username': username, 'j_password': password}) logger.info("User logged in. %s", {'host': host, 'user': username}) - except CTERAException: + except Forbidden as error: logger.error('Login failed. %s', {'host': host, 'user': username}) - raise + raise AuthenticationError() from error def sso(self, ctera_ticket): """ @@ -38,7 +40,7 @@ def sso(self, ctera_ticket): def logout(self): """ - Log out of the portal + Log out of CTERA Portal """ username = self._core.session().account.name self._core.api.form_data('/logout', {}) diff --git a/cterasdk/core/portals.py b/cterasdk/core/portals.py index 5babbd8f..182169ce 100644 --- a/cterasdk/core/portals.py +++ b/cterasdk/core/portals.py @@ -146,12 +146,14 @@ def apply_changes(self, wait=False): Apply provisioning changes.\n :param bool,optional wait: Wait for all changes to apply + :returns: Task status object, or an awaitable task object + :rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask` """ param = Object() param.objectId = None param.type = 'portals' logger.info('Applying provisioning changes.') - task = self._core.api.execute('', 'updatePortals', param) + ref = self._core.api.execute('', 'updatePortals', param) if wait: - task = self._core.tasks.wait(task) - return task + return self._core.tasks.wait(ref) + return self._core.tasks.awaitable_task(ref) diff --git a/cterasdk/core/taskmgr.py b/cterasdk/core/taskmgr.py deleted file mode 100644 index ce547c13..00000000 --- a/cterasdk/core/taskmgr.py +++ /dev/null @@ -1,48 +0,0 @@ -import re -import logging - -from ..lib.task_manager_base import TaskBase -from ..exceptions import InputError -from .base_command import BaseCommand - - -logger = logging.getLogger('cterasdk.core') - - -class Task(TaskBase): - - def _get_task_id(self, ref): - match = re.search('servers/[^/]*/bgTasks/[1-9][0-9]*$', ref) - if not match: - logger.error('Invalid task id. %s', {'ref': ref}) - raise InputError('Invalid task id', ref, ['servers/server/bgTasks/107781']) - return match.group(0) - - def get_task_status(self): - if self.CTERAHost.session().in_tenant_context(): - return self.CTERAHost.api.execute('', 'getTaskStatus', self.path) - return self.CTERAHost.api.get('/' + self.path) - - -class Tasks(BaseCommand): - """ Portal Background Task APIs """ - - def status(self, ref): - """ - Get background task status - - :param str ref: Task reference - """ - task = Task(self._core, ref) - return task.get_task_status() - - def wait(self, ref, retries=100, seconds=1): - """ - Wait for background task to complete - - :param str ref: Task reference - :param int,optional retries: Number of retries when sampling the task status, defaults to 100 - :param int,optional seconds: Number of seconds to wait between retries, defaults to 1 - """ - task = Task(self._core, ref, retries, seconds) - return task.wait() diff --git a/cterasdk/core/tasks.py b/cterasdk/core/tasks.py new file mode 100644 index 00000000..46368112 --- /dev/null +++ b/cterasdk/core/tasks.py @@ -0,0 +1,17 @@ +import logging +from ..lib.tasks import AwaitablePortalTask +from .base_command import BaseCommand + + +logger = logging.getLogger('cterasdk.core') + + +class Tasks(BaseCommand): + """ Portal Background Task APIs """ + + def awaitable_task(self, ref): + return AwaitablePortalTask(self._core, ref) + + def wait(self, ref, timeout=None, poll_interval=None): + awaitable_task = AwaitablePortalTask(self._core, ref) + return awaitable_task.wait(timeout, poll_interval) diff --git a/cterasdk/core/templates.py b/cterasdk/core/templates.py index c59aa021..672d3795 100644 --- a/cterasdk/core/templates.py +++ b/cterasdk/core/templates.py @@ -298,7 +298,7 @@ def apply_changes(self, wait=False): :param bool,optional wait: Wait for all changes to apply, defaults to `False` """ - task = self._core.api.execute('', 'applyAutoAssignmentRules') + ref = self._core.api.execute('', 'applyAutoAssignmentRules') if wait: - task = self._core.tasks.wait(task) - return task + return self._core.tasks.wait(ref) + return self._core.tasks.awaitable_task(ref) diff --git a/cterasdk/core/users.py b/cterasdk/core/users.py index fa6e0f7a..f20ab3d1 100644 --- a/cterasdk/core/users.py +++ b/cterasdk/core/users.py @@ -158,15 +158,17 @@ def apply_changes(self, wait=False): Apply provisioning changes.\n :param bool,optional wait: Wait for all changes to apply + :returns: Task status object, or an awaitable task object + :rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask` """ param = Object() param.objectId = None param.type = 'users' logger.info('Applying provisioning changes.') - task = self._core.api.execute('', 'updateAccounts', param) + ref = self._core.api.execute('', 'updateAccounts', param) if wait: - task = self._core.tasks.wait(task) - return task + return self._core.tasks.wait(ref) + return self._core.tasks.awaitable_task(ref) def delete(self, user): """ diff --git a/cterasdk/direct/client.py b/cterasdk/direct/client.py index 1dc6591f..01d92faf 100644 --- a/cterasdk/direct/client.py +++ b/cterasdk/direct/client.py @@ -2,7 +2,7 @@ import cterasdk.settings from . import filters -from .credentials import KeyPair, Bearer +from .credentials import KeyPair, Bearer, create_bearer_token from .lib import get_chunks, decrypt_encryption_key, process_chunks from .types import ByteRange from .stream import Streamer @@ -11,20 +11,28 @@ from ..clients.clients import AsyncClient, AsyncJSON -class Client: +class DirectIO: + + async def __aenter__(self): + return self - def __init__(self, baseurl, credentials): + def __init__(self, baseurl=None, access_key_id=None, secret_access_key=None, bearer=None): """ + Initialize a CTERA Direct IO Client. + :param str baseurl: Portal URL - :param cterasdk.direct.credentials.BaseCredentials credentials: Credentials object + :param str,optional access_key_id: Access key + :param str,optional secret_access_key: Secret key + :param str,optional bearer: Bearer token """ self._api = AsyncJSON(EndpointBuilder.new(baseurl, '/directio'), settings=cterasdk.settings.io.direct.api.settings, authenticator=lambda *_: True) self._client = AsyncClient(DefaultBuilder(), settings=cterasdk.settings.io.direct.storage.settings, authenticator=lambda *_: True) - self._credentials = credentials + self._credentials = Bearer(bearer) if bearer else KeyPair(access_key_id, secret_access_key) + self._bearer = create_bearer_token(self._credentials) async def _chunks(self, file_id): - metadata = await get_chunks(self._api, self._credentials, file_id) + metadata = await get_chunks(self._api, self._bearer, file_id) if metadata.encrypted: metadata.encryption_key = decrypt_encryption_key( metadata.file_id, @@ -35,34 +43,34 @@ async def _chunks(self, file_id): async def metadata(self, file_id): """ - Direct IO Metadata API. + Get File Metadata. :param int file_id: File ID. """ meta = await self._chunks(file_id) return meta.serialize() - async def blocks(self, file_id, blocks, max_workers): + async def blocks(self, file_id, byte_range=None, max_workers=None): """ Blocks API. :param int file_id: File ID. - :param list[cterasdk.direct.exceptions.BlockInfo] blocks: List of BlockInfo objects, - or list of integers identifying the block position. - :param int max_workers: Max concurrent tasks. A task will be dispatched for each block if no limited was specified. + :param cterasdk.direct.types.ByteRange,optional byte_range: Byte Range. + :param int max_workers: Max concurrent tasks :returns: List of Blocks. :rtype: list[cterasdk.direct.types.Block] """ meta = await self._chunks(file_id) - executor = self.executor(filters.blocks(meta, blocks), meta.encryption_key, meta.file_id, max_workers) + byte_range = byte_range if byte_range is not None else ByteRange.default() + executor = self.executor(filters.span(meta, byte_range), meta.encryption_key, meta.file_id, max_workers) return await executor() - async def streamer(self, file_id, byte_range): + async def streamer(self, file_id, byte_range=None): """ Stream API. :param int file_id: File ID. - :param cterasdk.direct.types.ByteRange byte_range: Byte Range. + :param cterasdk.direct.types.ByteRange,optional byte_range: Byte Range. :returns: Streamer Object :rtype: cterasdk.direct.stream.Streamer """ @@ -74,7 +82,7 @@ async def streamer(self, file_id, byte_range): def executor(self, chunks, encryption_key, file_id=None, max_workers=None): """ - Get Blocks. + Download Executor. :param list[cterasdk.direct.types.Chunk] chunks: List of Chunks. :param str encryption_key: Decryption Key. @@ -98,64 +106,5 @@ async def close(self): await self._api.close() await self._client.close() - -class DirectIO: - - async def __aenter__(self): - return self - - def __init__(self, baseurl=None, access_key_id=None, secret_access_key=None, bearer=None): - """ - Initialize a CTERA Direct IO Client. - - :param str baseurl: Portal URL - :param str,optional access_key_id: Access key - :param str,optional secret_access_key: Secret key - :param str,optional bearer: Bearer token - """ - self._client = Client(baseurl, Bearer(bearer) if bearer else KeyPair(access_key_id, secret_access_key)) - - async def metadata(self, file_id): - """ - Get Metadata. - - :param int file_id: File ID - """ - return await self._client.metadata(file_id) - - async def blocks(self, file_id, blocks=None, max_workers=None): - """ - Get Blocks. - - :param int file_id: File ID - :param list[cterasdk.direct.exceptions.BlockInfo],optional blocks: List of BlockInfo objects, - or list of integers identifying the block position. - :param int,optional max_workers: Max allowed concurrent tasks. - A task will be dispatched for each block if no limited was specified. - :returns: Blocks - :rtype: list[cterasdk.direct.types.Block] - """ - return await self._client.blocks(file_id, blocks, max_workers) - - async def streamer(self, file_id, byte_range=None): - """ - Iterates over data chunks. - - :param int file_id: File ID. - :param cterasdk.direct.types.ByteRange,optional byte_range: Byte Range. - :returns: Stream Object - :rtype: cterasdk.direct.stream.Streamer - """ - return await self._client.streamer(file_id, byte_range) - - def executor(self, chunks, encryption_key, max_workers): - """ - Get download executor for download from chunk metadata - """ - return self._client.executor(chunks, encryption_key, None, max_workers) - - async def close(self): - await self._client.close() - async def __aexit__(self, exc_type, exc, tb): await self.close() diff --git a/cterasdk/direct/credentials.py b/cterasdk/direct/credentials.py index 6f580375..65476d4e 100644 --- a/cterasdk/direct/credentials.py +++ b/cterasdk/direct/credentials.py @@ -1,3 +1,9 @@ +import logging + + +logger = logging.getLogger('cterasdk.direct') + + class BaseCredentials: """Base Credentials for CTERA Direct IO""" @@ -15,3 +21,24 @@ class Bearer(BaseCredentials): def __init__(self, bearer): self.bearer = bearer + + +def create_bearer_token(credentials): + """ + Create Authorization Header. + + :param cterasdk.direct.credentials.BaseCredentials credentials: Credentials + :returns: Authorization header as a dictionary. + :rtype: dict + """ + token = None + + if isinstance(credentials, Bearer): + logger.debug('Initializing client using Bearer token') + token = f'Bearer {credentials.bearer}' + + elif isinstance(credentials, KeyPair): + logger.debug('Initializing client using Key Pair.') + token = f'Bearer {credentials.access_key_id}' + + return token diff --git a/cterasdk/direct/filters.py b/cterasdk/direct/filters.py index a8adc561..d8407534 100644 --- a/cterasdk/direct/filters.py +++ b/cterasdk/direct/filters.py @@ -1,26 +1,9 @@ import logging -from ..exceptions.direct import BlockInfo logger = logging.getLogger('cterasdk.direct') -def blocks(file, array): - """ - Filter Blocks by Block Number. - - :param list[cterasdk.direct.types.File] file: File Object. - :param list[cterasdk.direct.types.BlockInfo] array: List of BlockInfo objects, - or list of integers identifying the block position. - :returns: List of Chunks. - :rtype: list[cterasdk.direct.types.Chunk] - """ - if array is not None: - numbers = [block.number if isinstance(block, BlockInfo) else block for block in array] - return [file.chunks[number - 1] for number in numbers] - return file.chunks - - def span(file, byte_range): """ Filter Blocks by Byte Range. diff --git a/cterasdk/direct/lib.py b/cterasdk/direct/lib.py index 42e37371..3343f82c 100644 --- a/cterasdk/direct/lib.py +++ b/cterasdk/direct/lib.py @@ -1,42 +1,22 @@ import logging import asyncio +from ..lib.retries import execute_with_retries from .types import Metadata, Block -from .credentials import KeyPair, Bearer from .crypto import decrypt_key, decrypt_block from .decompressor import decompress -from ..exceptions.transport import HTTPError +from ..exceptions.transport import BadRequest, Unauthorized, Unprocessable, InternalServerError, HTTPError from ..exceptions.direct import ( - UnAuthorized, UnprocessableContent, BlocksNotFoundError, DownloadError, DownloadTimeout, BlockListTimeout, - DownloadConnectionError, DecryptKeyError, DecryptBlockError, NotFoundError, DecompressBlockError, - BlockValidationException, BlockListConnectionError, DirectIOError + AuthorizationError, BlockListConnectionError, BlockListTimeout, BlockValidationException, BlocksNotFoundError, + DecompressBlockError, DecryptBlockError, DecryptKeyError, DirectIOError, DownloadConnectionError, + DownloadError, DownloadTimeout, InvalidRequest, ObjectNotFoundError, UnsupportedStorageError ) logger = logging.getLogger('cterasdk.direct') -async def retry(coro, retries=3, backoff=1): - """ - Retry Coroutine. - - :param coroutine coro: Asynchronous Coroutine. - :param int retries: Retries. - :param int backoff: Seconds. - """ - attempts = 0 - while attempts < retries: - try: - return await coro() - except DirectIOError as error: - attempts = attempts + 1 - if attempts == retries: - raise error - wait = backoff * (2 ** (attempts - 1)) - logger.debug('Download of block failed on attempt %s. Retrying in %s seconds...', attempts, wait) - await asyncio.sleep(wait) - - +@execute_with_retries(retries=3, backoff=1, max_backoff=10) async def get_object(client, file_id, chunk): """ Get Object from a Signed URL. @@ -46,54 +26,50 @@ async def get_object(client, file_id, chunk): :returns: Object :rtype: bytes """ - async def get_object_coro(): + message = ( + f"Downloading block #{chunk.number} " + f"(offset={chunk.offset}, length={chunk.length})" + ) - message = ( - f"Downloading block #{chunk.index} " - f"(offset={chunk.offset}, length={chunk.length})" - ) - - if file_id: - message += f" for file ID {file_id}" - - error_message, exception = None, None - - logger.debug(message) - try: - response = await client.get(chunk.url) - return await response.read() - except ConnectionError: - error_message = 'connection' - exception = DownloadConnectionError(file_id, chunk) - except asyncio.TimeoutError: - error_message = 'timeout' - exception = DownloadTimeout(file_id, chunk) - except IOError as error: - error_message = 'io' - exception = DownloadError(error, file_id, chunk) - except HTTPError as error: - error_message = 'unknown' - exception = DownloadError(error.error, file_id, chunk) - - error_messages = { - "connection": "Connection error", - "timeout": "Timed out", - "io": "I/O error", - "unknown": "Unknown error" - } + if file_id: + message += f" for file ID {file_id}" - message = ( - f"Failed to download block #{chunk.index} " - f"(offset={chunk.offset}, length={chunk.length})" - ) - if file_id: - message = message + f" for file ID {file_id}" + error_message, exception = None, None - message = message + f": {error_messages.get(error_message, 'Unknown error')}." - logger.error(message) - raise exception + logger.debug(message) + try: + response = await client.get(chunk.url) + return await response.read() + except ConnectionError: + error_message = 'connection' + exception = DownloadConnectionError(file_id, chunk) + except asyncio.TimeoutError: + error_message = 'timeout' + exception = DownloadTimeout(file_id, chunk) + except IOError as error: + error_message = 'io' + exception = DownloadError(error, file_id, chunk) + except HTTPError as error: + error_message = 'unknown' + exception = DownloadError(error.error, file_id, chunk) + + error_messages = { + "connection": "Connection error", + "timeout": "Timed out", + "io": "I/O error", + "unknown": "Unknown error" + } + + message = ( + f"Failed to download block #{chunk.number} " + f"(offset={chunk.offset}, length={chunk.length})" + ) + if file_id: + message = message + f" for file ID {file_id}" - return await retry(get_object_coro) + message = message + f": {error_messages.get(error_message, 'Unknown error')}." + logger.error(message) + raise exception async def decrypt_object(file_id, encrypted_object, encryption_key, chunk): @@ -148,7 +124,7 @@ async def process_chunk(client, file_id, chunk, encryption_key, semaphore): """ async def process(client, chunk, encryption_key): message = ( - f"Processing block {chunk.index} " + f"Processing block #{chunk.number} " f"(offset={chunk.offset}, length={chunk.length})" ) if file_id: @@ -157,7 +133,7 @@ async def process(client, chunk, encryption_key): encrypted_object = await get_object(client, file_id, chunk) decrypted_object = await decrypt_object(file_id, encrypted_object, encryption_key, chunk) decompressed_object = await decompress_object(file_id, decrypted_object, chunk) - return Block(file_id, chunk.index, chunk.offset, decompressed_object, chunk.length) + return Block(file_id, chunk.number, chunk.offset, decompressed_object, chunk.length) if semaphore is not None: async with semaphore: @@ -206,57 +182,35 @@ def decrypt_encryption_key(file_id, wrapped_key, secret_access_key): raise DecryptKeyError(file_id) -def create_authorization_header(credentials): - """ - Create Authorization Header. - - :param cterasdk.direct.credentials.BaseCredentials credentials: Credentials - :returns: Authorization header as a dictionary. - :rtype: dict - """ - authorization_header = None - - if isinstance(credentials, Bearer): - logger.debug('Initializing client using bearer token') - authorization_header = f'Bearer {credentials.bearer}' - - elif isinstance(credentials, KeyPair): - logger.debug('Initializing client using key pair.') - authorization_header = f'Bearer {credentials.access_key_id}' - - return {'Authorization': authorization_header} - - -async def get_chunks(api, credentials, file_id): +@execute_with_retries(retries=3, backoff=1, max_backoff=10) +async def get_chunks(api, bearer, file_id): """ Get Chunks. :param cterasdk.clients.clients.AsyncJSON api: Asynchronous JSON Client. + :param str bearer: Bearer Token. :param int file_id: File ID. :returns: Wrapped key and file chunks. :rtype: cterasdk.direct.types.Metadata """ - async def get_chunks_coro(): - logger.debug('Listing blocks for file ID: %s', file_id) - try: - response = await api.get(f'{file_id}', headers=create_authorization_header(credentials)) - if not response.chunks: - logger.error('Could not find blocks for file ID: %s.', file_id) - raise BlocksNotFoundError(file_id) - return Metadata(file_id, response) - except HTTPError as error: - if error.code == 400: - raise NotFoundError(file_id) - if error.code == 401: - raise UnAuthorized(file_id) - if error.code == 422: - raise UnprocessableContent(file_id) - raise error - except ConnectionError: - logger.error('Failed to list blocks for file ID: %s due to a connection error.', file_id) - raise BlockListConnectionError(file_id) - except asyncio.TimeoutError: - logger.error('Timed out while listing blocks for file ID: %s.', file_id) - raise BlockListTimeout(file_id) - - return await retry(get_chunks_coro) + logger.debug('Listing blocks for file ID: %s', file_id) + try: + response = await api.get(f'{file_id}', headers={'Authorization': bearer}) + if not response.chunks: + logger.error('Could not find blocks for file ID: %s.', file_id) + raise BlocksNotFoundError(file_id) + return Metadata(file_id, response) + except BadRequest as error: + raise ObjectNotFoundError(file_id) from error + except Unauthorized as error: + raise AuthorizationError(file_id) from error + except Unprocessable as error: + raise UnsupportedStorageError(file_id) from error + except InternalServerError as error: + raise InvalidRequest(file_id) from error + except ConnectionError: + logger.error('Failed to list blocks for file ID: %s due to a connection error.', file_id) + raise BlockListConnectionError(file_id) + except asyncio.TimeoutError: + logger.error('Timed out while listing blocks for file ID: %s.', file_id) + raise BlockListTimeout(file_id) diff --git a/cterasdk/direct/stream.py b/cterasdk/direct/stream.py index 14d437c7..10a4401e 100644 --- a/cterasdk/direct/stream.py +++ b/cterasdk/direct/stream.py @@ -38,7 +38,7 @@ async def start(self): for download in self._downloads: block = await download fragment = block.fragment(self._byte_range) - logger.debug('Streamer Fragment. %s', {'offset': fragment.offset, 'length': fragment.length}) + logger.debug('Streamer fragment. %s', {'offset': fragment.offset, 'length': fragment.length}) yield fragment self._offset = fragment.offset + fragment.length except DirectIOAPIError as error: diff --git a/cterasdk/direct/types.py b/cterasdk/direct/types.py index d7061a4c..5efa2a7d 100644 --- a/cterasdk/direct/types.py +++ b/cterasdk/direct/types.py @@ -56,17 +56,17 @@ class CompressionLib: class Chunk(Object): - def __init__(self, index, offset, url, length): + def __init__(self, number, offset, url, length): """ Initialize a Chunk. - :param int index: Chunk index. + :param int number: Chunk number. :param int offset: Chunk offset. :param str url: Signed URL. :param int length: Object length. """ super().__init__( - index=index, + number=number, offset=offset, url=url, length=length @@ -109,8 +109,8 @@ def _format_chunks(server_object): """ offset = 0 chunks = [] - for index, chunk in enumerate(server_object, 1): - chunks.append(Chunk(index, offset, chunk.url, chunk.len)) + for number, chunk in enumerate(server_object, 1): + chunks.append(Chunk(number, offset, chunk.url, chunk.len)) offset = offset + chunk.len return chunks diff --git a/cterasdk/edge/backup.py b/cterasdk/edge/backup.py index 9e23b60f..801f151a 100644 --- a/cterasdk/edge/backup.py +++ b/cterasdk/edge/backup.py @@ -100,8 +100,8 @@ def _attach(self, sharedSecret): return settings def _attach_folder(self): - task = self._edge.api.execute('/status/services', 'attachFolder') - return self._attach_response(task) + ref = self._edge.api.execute('/status/services', 'attachFolder') + return self._attach_response(ref) def _attach_encrypted_folder(self, encryptedFolderKey, passPhraseSalt, sharedSecret): param = Object() @@ -109,8 +109,8 @@ def _attach_encrypted_folder(self, encryptedFolderKey, passPhraseSalt, sharedSec param.passPhraseSalt = passPhraseSalt param.sharedSecret = sharedSecret - task = self._edge.api.execute('/status/services', 'attachEncryptedFolder', param) - return self._attach_response(task) + ref = self._edge.api.execute('/status/services', 'attachEncryptedFolder', param) + return self._attach_response(ref) def _attach_response(self, task): response = self._wait(task) @@ -161,14 +161,14 @@ def _create_folder(self, passphrase): logger.debug('Creating a backup folder.') param.encryptionMode = EncryptionMode.Recoverable - task = self._edge.api.execute('/status/services', 'createFolder', param) - settings = self._create_response(task) + ref = self._edge.api.execute('/status/services', 'createFolder', param) + settings = self._create_response(ref) settings.encryptionMode = param.encryptionMode return settings - def _create_response(self, task): - response = self._wait(task) + def _create_response(self, ref): + response = self._wait(ref) return Backup._process_create_response(response) @staticmethod @@ -190,8 +190,8 @@ def _process_create_response(response): return None raise CTERAException(f'Failed to create backup folder (rc={rc})') - def _wait(self, task): - task = self._edge.tasks.wait(task) + def _wait(self, ref): + task = self._edge.tasks.wait(ref) return task.result def _configure_backup_settings(self, param): diff --git a/cterasdk/edge/ctera_migrate.py b/cterasdk/edge/ctera_migrate.py index 9c82598c..1ee7e49f 100644 --- a/cterasdk/edge/ctera_migrate.py +++ b/cterasdk/edge/ctera_migrate.py @@ -1,8 +1,10 @@ import logging +from datetime import datetime from .base_command import BaseCommand from ..common import Object from .enum import TaskType, SourceType +from ..lib.storage import commonfs, synfs logger = logging.getLogger('cterasdk.edge') @@ -93,46 +95,75 @@ def _update_task(self, task, action): def details(self, task): """ - Get task details + Retrieve the jobs of a discovery or migration task. + + :param cterasdk.common.object.Object task: Task object. """ response = self._edge.migrate.get('/tasks/history', params={'id': task.id}) # pylint: disable=protected-access if response.history: - return Jobs(response.history) + return response.history logger.error('Task not found. %s', {'task_id': task.id}) return None - def results(self, task): + @staticmethod + def _format_export_date(timestamp): + date_format = '%Y%m%d-%H%M%S' + return datetime.strftime(datetime.fromtimestamp(timestamp), date_format) + + @staticmethod + def _format_export(task, job, share=None): if task.type == 'discovery': - return self._edge.migrate.get('/discovery/results', # pylint: disable=protected-access - params={'id': task.id}).discovery + if share is not None: + return f'discovery-{task.name}-share-{share}.csv' + return ( + f'discovery-{task.name}-{task.source}' + f'-{CTERAMigrate._format_export_date(job.start_time)}-{CTERAMigrate._format_export_date(job.finish_time)}.csv' + ) if task.type == 'migration': - return self._edge.migrate.get('/migration/results', # pylint: disable=protected-access - params={'id': task.id}).migration - logger.error('Could not determine task type. %s', {'id': task.id, 'type': task.type, 'name': task.name}) + return f'{CTERAMigrate._format_export_date(job.finish_time)}-migration-migrate_{task.source}-{task.id}.log' return None + @staticmethod + def _save_export(name, handle, destination=None): + directory, filename = commonfs.generate_file_destination(destination, name) + return synfs.write(directory, filename, handle) -class Jobs: - """Class representing task jobs""" - - def __init__(self, jobs): - self._all = jobs + def results(self, task, job, *, export=False, destination=None): + """ + Retrieve the results of a job execution. - @property - def latest(self): + :param cterasdk.common.object.Object task: Task object. + :param cterasdk.common.object.Object job: Job object. + :param bool,optional export: Export to file, defaults to ``False`` + :param str,optional destination: File destination, defaults to the default downloads directory """ - Get the latest job of a task + params = {'id': job.job_id} + if export: + params['format'] = 'csv' + handle = self._edge.migrate.handle(f'/{task.type}/results', params=params) + name = CTERAMigrate._format_export(task, job) + return CTERAMigrate._save_export(name, handle, destination) + response = self._edge.migrate.get(f'/{task.type}/results', params={'id': job.job_id}) # pylint: disable=protected-access + return getattr(response, task.type) + + def log(self, task, job, share=None, destination=None): """ - if self._all: - return self._all[0] - return None + Download the log of a job execution. - @property - def all(self): + :param cterasdk.common.object.Object task: Task object. + :param cterasdk.common.object.Object job: Job object. + :param str,optional share: Share name, required for discovery jobs. + :param str,optional destination: File destination, defaults to the default downloads directory """ - Get all jobs of a task - """ - return self._all + name = None + params = {'id': job.job_id} + if task.type == 'discovery': + if share is None: + raise TypeError('Share name parameter is required for discovery jobs.') + params['share'] = share + handle = self._edge.migrate.handle(f'/{task.type}/log', params=params) + name = CTERAMigrate._format_export(task, job, share) + return CTERAMigrate._save_export(name, handle, destination) class TaskManager: diff --git a/cterasdk/edge/dedup.py b/cterasdk/edge/dedup.py index b7b073f4..ba8b7dc5 100644 --- a/cterasdk/edge/dedup.py +++ b/cterasdk/edge/dedup.py @@ -58,7 +58,7 @@ def status(self): """ size = self._edge.api.execute('/config/cloudsync/cloudExtender', 'allFilesTotalUsedBytes') usage = self._edge.api.execute('/config/cloudsync/cloudExtender', 'storageUsedBytes') - return DeduplicationStatus(size, usage) + return DeduplicationStatus(self.is_enabled(), size, usage) def _wait_for_reboot(self, reboot, wait): if reboot: diff --git a/cterasdk/edge/enum.py b/cterasdk/edge/enum.py index 276617b9..057dd501 100644 --- a/cterasdk/edge/enum.py +++ b/cterasdk/edge/enum.py @@ -419,7 +419,7 @@ class SourceType: class TaskType: """ - Migration Tool Task Type + CTERA Migrate Task Type :ivar str Discovery: Discovery :ivar str Migration: Migration diff --git a/cterasdk/edge/login.py b/cterasdk/edge/login.py index 92e346da..cbf482f6 100644 --- a/cterasdk/edge/login.py +++ b/cterasdk/edge/login.py @@ -1,6 +1,7 @@ import logging -from ..exceptions import CTERAException +from ..exceptions.transport import InternalServerError +from ..exceptions.auth import AuthenticationError from .base_command import BaseCommand @@ -16,13 +17,21 @@ def info(self): return self._edge.api.get('/nosession/logininfo') def login(self, username, password): + """ + Log in to CTERA Edge Filer + + :param str username: User name + :param str password: User password + :raises: :class:`cterasdk.exceptions.auth.AuthenticationError` + """ host = self._edge.host() try: self._edge.api.form_data('/login', {'username': username, 'password': password}) logger.info("User logged in. %s", {'host': host, 'user': username}) self._edge.ctera_migrate.login() - except CTERAException: - logger.error("Login failed. %s", {'host': host, 'user': username}) + except InternalServerError as e: + if e.error.response.error.msg == 'Wrong username or password': + raise AuthenticationError() from e raise def sso(self, ticket): @@ -38,9 +47,5 @@ def sso(self, ticket): def logout(self): host = self._edge.host() user = self._edge.session().account.name - try: - self._edge.api.form_data('/logout', {'foo': 'bar'}) - logger.info("User logged out. %s", {'host': host, 'user': user}) - except CTERAException: - logger.error("Logout failed. %s", {'host': host, 'user': user}) - raise + self._edge.api.form_data('/logout', {'foo': 'bar'}) + logger.info("User logged out. %s", {'host': host, 'user': user}) diff --git a/cterasdk/edge/network.py b/cterasdk/edge/network.py index 959c2191..0ac72bd6 100644 --- a/cterasdk/edge/network.py +++ b/cterasdk/edge/network.py @@ -1,9 +1,9 @@ import logging from ..exceptions import CTERAException +from ..exceptions.common import TaskException from .enum import Mode, IPProtocol, Traffic from .types import TCPConnectResult -from ..lib.task_manager_base import TaskError from ..common import Object, parse_to_ipaddress from .base_command import BaseCommand @@ -127,20 +127,20 @@ def tcp_connect(self, service): logger.info("Testing connection. %s", {'host': service.host, 'port': service.port}) - task = self._edge.api.execute("/status/network", "tcpconnect", param) + ref = self._edge.api.execute("/status/network", "tcpconnect", param) try: - task = self._edge.tasks.wait(task) - logger.debug("Obtained connection status. %s", {'status': task.result.rc}) + task = self._edge.tasks.wait(ref) + logger.debug("Connection status: %s", task.result.rc) if task.result.rc == "Open": return TCPConnectResult(service.host, service.port, True) - except TaskError: + except TaskException: pass logger.warning("Couldn't establish TCP connection. %s", {'address': service.host, 'port': service.port}) return TCPConnectResult(service.host, service.port, False) - def iperf(self, address, port=5201, threads=1, protocol=IPProtocol.TCP, direction=Traffic.Upload, retries=120, seconds=1): + def iperf(self, address, port=5201, threads=1, protocol=IPProtocol.TCP, direction=Traffic.Upload, timeout=None): """ Invoke a network throughput test @@ -149,8 +149,7 @@ def iperf(self, address, port=5201, threads=1, protocol=IPProtocol.TCP, directio :param int,optional threads: The number of threads, defaults to 1 :param cterasdk.edge.enum.IPProtocol,optional protocol: IP protocol, defaults to `'TCP'` :param cterasdk.edge.enum.Traffic,optional direction: Traffic direction, defaults to `'Upload'` - :param int,optional retries: Number of retries when sampling the iperf task status, defaults to 120 - :param int,optional seconds: Number of seconds to wait between retries, defaults to 1 + :param float,optional timeout: Timeout (in seconds). :returns: A string containing the iperf output :rtype: str """ @@ -161,11 +160,11 @@ def iperf(self, address, port=5201, threads=1, protocol=IPProtocol.TCP, directio param.threads = threads param.reverse = direction == Traffic.Download param.protocol = None if protocol == IPProtocol.TCP else IPProtocol.UDP - task = self._edge.api.execute("/status/network", "iperf", param) + ref = self._edge.api.execute("/status/network", "iperf", param) try: - task = self._edge.tasks.wait(task, retries, seconds) + task = self._edge.tasks.wait(ref, timeout) return task.result.res - except TaskError as error: + except TaskException as error: return error.task.result.res diff --git a/cterasdk/edge/power.py b/cterasdk/edge/power.py index 0e14feac..8c2b45bd 100644 --- a/cterasdk/edge/power.py +++ b/cterasdk/edge/power.py @@ -1,7 +1,7 @@ import logging import time -from ..exceptions import CTERAException +from ..exceptions.transport import HTTPError from .base_command import BaseCommand @@ -17,7 +17,7 @@ def reboot(self, wait=False): :param bool,optional wait: Wait for reboot to complete, defaults to False """ - logger.info("Rebooting device. %s", {'host': self._edge.host()}) + logger.info("Rebooting Edge Filer. %s", {'host': self._edge.host()}) self._edge.api.execute("/status/device", "reboot", None) if wait: Boot(self._edge).wait() @@ -33,7 +33,7 @@ def reset(self, wait=False): :param bool,optional wait: Wait for reset to complete, defaults to False """ self._edge.api.execute("/status/device", "reset2default", None) - logger.info("Resetting device to default settings. %s", {'host': self._edge.host()}) + logger.info("Resetting Edge Filer to default settings. %s", {'host': self._edge.host()}) if wait: Boot(self._edge).wait() @@ -50,22 +50,26 @@ def wait(self): while True: try: self._increment() - logger.debug('Checking if device is up and running. %s', {'attempt': self._attempt}) + logger.debug("Status check, (try %s)", self._attempt + 1) self._edge.test() - logger.info("Device is back up and running.") + logger.info("Edge Filer is up and running.") break - except (CTERAException, ConnectionError, TimeoutError) as e: - logger.debug('Exception. %s', {'exception': e.__class__.__name__, 'message': e.message}) + except ConnectionError: + logger.debug('Connection error while checking status.') + except TimeoutError: + logger.debug('Status check timed out.') + except HTTPError as e: + logger.debug("Status check failed with HTTP %s: %s", e.code, e.name) def _increment(self): self._attempt = self._attempt + 1 if self._attempt >= self._retries: self._unreachable() - logger.debug('Sleep. %s', {'seconds': self._seconds}) + logger.debug("Try %s failed; Sleeping for %s second(s).", self._attempt, self._seconds) time.sleep(self._seconds) def _unreachable(self): host = self._edge.host() port = self._edge.port() - logger.error('Timed out. Could not reach host. %s', {'host': host, 'port': port}) - raise ConnectionError(f'Timed out. Could not reach host {host}:{port}.') + logger.error("Connection timed out after retries. %s", {'host': host, 'port': port}) + raise ConnectionError(f'Connection to {host}:{port} timed out after retries.') diff --git a/cterasdk/edge/services.py b/cterasdk/edge/services.py index 5650fef4..53d23fbf 100644 --- a/cterasdk/edge/services.py +++ b/cterasdk/edge/services.py @@ -1,11 +1,11 @@ import logging import cterasdk.settings -from ..lib.task_manager_base import TaskError from .licenses import Licenses from ..lib import ask, track from ..common import Object from ..exceptions import CTERAException, InputError +from ..exceptions.common import TaskException from . import enum from .base_command import BaseCommand from .types import TCPService @@ -127,15 +127,15 @@ def _set_sso(self, sso_state): logger.info('Single sign-on %s.', ('enabled' if sso_state else 'disabled')) def _connect_to_services(self, param, ctera_license): - task = self._attach(param) + ref = self._attach(param) try: - self._edge.tasks.wait(task) + self._edge.tasks.wait(ref) track(self._edge, '/status/services/CTERAPortal/connectionState', [enum.ServicesConnectionState.Connected], [enum.ServicesConnectionState.ResolvingServers, enum.ServicesConnectionState.Connecting, enum.ServicesConnectionState.Attaching, enum.ServicesConnectionState.Authenticating], [], [enum.ServicesConnectionState.Disconnected], 20, 1) logger.info("Connected to Portal.") - except TaskError as error: + except TaskException as error: description = error.task.description logger.error("Connection failed. Reason: %s", description) raise CTERAException(f"Connection failed. Reason: {description}") diff --git a/cterasdk/edge/shares.py b/cterasdk/edge/shares.py index 78bee712..1cf6a5a0 100644 --- a/cterasdk/edge/shares.py +++ b/cterasdk/edge/shares.py @@ -274,8 +274,8 @@ def modify( try: self._edge.api.put('/config/fileservices/share/' + name, share) logger.info("Share modified. %s", {'name': name}) - except Exception as error: - message = f'Could not modify share: {name}' + except CTERAException as error: + message = f'Share modification failed: {name}' logger.error(message) raise CTERAException(message) from error @@ -289,7 +289,7 @@ def delete(self, name): try: self._edge.api.delete(ref) logger.info("Share deleted: %s", ref) - except Exception as error: + except CTERAException as error: logger.error("Share deletion failed: %s", ref) raise CTERAException(f'Share deletion failed: {ref}') from error diff --git a/cterasdk/edge/shell.py b/cterasdk/edge/shell.py index 0ad9fd25..5b32a9f5 100644 --- a/cterasdk/edge/shell.py +++ b/cterasdk/edge/shell.py @@ -1,7 +1,7 @@ import logging -from ..lib.task_manager_base import TaskError from ..exceptions import CTERAException +from ..exceptions.common import TaskException from .base_command import BaseCommand @@ -21,12 +21,12 @@ def run_command(self, shell_command, wait=True): """ logger.info("Executing shell command. %s", {'shell_command': shell_command}) - task = self._edge.api.execute("/config/device", "bgshell", shell_command) + ref = self._edge.api.execute("/config/device", "bgshell", shell_command) if not wait: - return task + return self._edge.tasks.awaitable_task(ref) try: - task = self._edge.tasks.wait(task) + task = self._edge.tasks.wait(ref) logger.info("Shell command executed. %s", {'shell_command': shell_command}) return task.result.result - except TaskError as error: - raise CTERAException('An error occurred while executing task') from error + except TaskException as error: + raise CTERAException('An error occurred while executing Shell command.') from error diff --git a/cterasdk/edge/sync.py b/cterasdk/edge/sync.py index 67f4609a..374942e4 100644 --- a/cterasdk/edge/sync.py +++ b/cterasdk/edge/sync.py @@ -1,7 +1,6 @@ import logging from ..lib import track, ErrorStatus -from .taskmgr import Task from .enum import Mode, SyncStatus, Acl from .base_command import BaseCommand from ..common import Object, ThrottlingRule, FilterBackupSet, FileFilterBuilder @@ -170,15 +169,16 @@ def evict(self, path, wait=False): :param str path: Directory path :param bool wait: Wait for eviction task to complete, defaults to ``False`` - :returns: A reference to the background task - :rtype: str + :returns: Task status object, or an awaitable task object + :rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitableEdgeTask` """ param = Object() + param._classname = 'evictFolderParam' # pylint: disable=protected-access param.path = path ref = self._edge.api.execute('/config/cloudsync', 'evictFolder', param) if wait: - Task(self._edge, ref).wait() - return ref + return self._edge.tasks.wait(ref) + return self._edge.tasks.awaitable_task(ref) class CloudSyncBandwidthThrottling(BaseCommand): diff --git a/cterasdk/edge/taskmgr.py b/cterasdk/edge/taskmgr.py deleted file mode 100644 index 689f3bc5..00000000 --- a/cterasdk/edge/taskmgr.py +++ /dev/null @@ -1,75 +0,0 @@ -import re -import logging - -from . import query -from ..common import Object -from ..lib.task_manager_base import TaskBase -from ..exceptions import InputError -from .base_command import BaseCommand - - -logger = logging.getLogger('cterasdk.edge') - - -class Task(TaskBase): - - def _get_task_id(self, ref): - uid = None - if isinstance(ref, int): - uid = str(ref) - elif isinstance(ref, str): - match = re.search('[1-9][0-9]*', ref) - if match is not None: - start, end = match.span() - uid = ref[start: end] - if uid is not None: - return '/proc/bgtasks/' + uid - logger.error('Could not parse task id. %s', {'ref': ref}) - raise InputError('Invalid task id', ref, [64, '64', '/proc/bgtasks/64']) - - def get_task_status(self): - return self.CTERAHost.api.get(self.path) - - -class Tasks(BaseCommand): - """ Edge Filer Background Task APIs """ - - def status(self, ref): - """ - Get background task status - - :param str ref: Task reference - """ - task = Task(self._edge, ref) - return task.get_task_status() - - def running(self): - """ - Get all running background tasks - """ - return self._query('status', 'running') - - def by_name(self, name): - """ - Get background tasks by name - - :param str name: Task name - """ - return self._query('name', name) - - def _query(self, key, value): - param = Object() - param.key = key - param.value = value - return query.iterator(self._edge, '/proc/bgtasks', param) - - def wait(self, ref, retries=100, seconds=1): - """ - Wait for background task to complete - - :param str ref: Task reference - :param int,optional retries: Number of retries when sampling the task status, defaults to 100 - :param int,optional seconds: Number of seconds to wait between retries, defaults to 1 - """ - task = Task(self._edge, ref, retries, seconds) - return task.wait() diff --git a/cterasdk/edge/tasks.py b/cterasdk/edge/tasks.py new file mode 100644 index 00000000..83915c23 --- /dev/null +++ b/cterasdk/edge/tasks.py @@ -0,0 +1,40 @@ +import logging + +from . import query +from ..lib.tasks import AwaitableEdgeTask +from ..common import Object +from .base_command import BaseCommand + + +logger = logging.getLogger('cterasdk.edge') + + +class Tasks(BaseCommand): + """ Edge Filer Background Task APIs """ + + def awaitable_task(self, ref): + return AwaitableEdgeTask(self._edge, ref) + + def wait(self, ref, timeout=None, poll_interval=None): + awaitable_task = AwaitableEdgeTask(self._edge, ref) + return awaitable_task.wait(timeout, poll_interval) + + def running(self): + """ + Get all running background tasks + """ + return self._query('status', 'running') + + def by_name(self, name): + """ + Get background tasks by name + + :param str name: Task name + """ + return self._query('name', name) + + def _query(self, key, value): + param = Object() + param.key = key + param.value = value + return query.iterator(self._edge, '/proc/bgtasks', param) diff --git a/cterasdk/edge/types.py b/cterasdk/edge/types.py index 3b19f768..9170683e 100644 --- a/cterasdk/edge/types.py +++ b/cterasdk/edge/types.py @@ -297,12 +297,14 @@ class DeduplicationStatus(Object): """ Edge Filer Local Deduplication Status Object + :ivar bool enabled: ``True`` if deduplication is enabled, ``False`` otherwise :ivar int size: Logical Size in Bytes :ivar int usage: Actual Size in Bytes """ - def __init__(self, size, usage): + def __init__(self, enabled, size, usage): super().__init__() + self.enabled = enabled self.size = size self.usage = usage diff --git a/cterasdk/edge/users.py b/cterasdk/edge/users.py index 18e49ec7..7307e974 100644 --- a/cterasdk/edge/users.py +++ b/cterasdk/edge/users.py @@ -105,6 +105,6 @@ def delete(self, username): response = self._edge.api.delete(ref) logger.info("User deleted: %s", ref) return response - except Exception as error: + except CTERAException as error: logger.error("User deletion failed: %s", ref) raise CTERAException(f'User deletion failed: {ref}') from error diff --git a/cterasdk/edge/volumes.py b/cterasdk/edge/volumes.py index 6742dc5e..528fe731 100644 --- a/cterasdk/edge/volumes.py +++ b/cterasdk/edge/volumes.py @@ -1,9 +1,9 @@ import logging -from ..lib.task_manager_base import TaskError from .enum import VolumeStatus from ..common import Object from ..exceptions import CTERAException, InputError +from ..exceptions.common import TaskException from ..lib import track from .base_command import BaseCommand @@ -203,5 +203,5 @@ def _wait_pending_mount(self, volume): def _wait_mount(self, tid): try: self._edge.tasks.wait(tid) - except TaskError: + except TaskException: logger.debug('Failed mounting volume. %s', {'tid': tid}) diff --git a/cterasdk/exceptions/__init__.py b/cterasdk/exceptions/__init__.py index 8fab2996..96a281b2 100644 --- a/cterasdk/exceptions/__init__.py +++ b/cterasdk/exceptions/__init__.py @@ -1,5 +1,7 @@ from . import ( # noqa: E402, F401 + auth, backup, + common, direct, io, notifications, diff --git a/cterasdk/exceptions/auth.py b/cterasdk/exceptions/auth.py new file mode 100644 index 00000000..7636aa06 --- /dev/null +++ b/cterasdk/exceptions/auth.py @@ -0,0 +1,10 @@ +from .base import CTERAException + + +class AuthenticationError(CTERAException): + """ + Exception raised for authentication failures. + """ + + def __init__(self): + super().__init__("Authentication failed: Invalid username or password") diff --git a/cterasdk/exceptions/common.py b/cterasdk/exceptions/common.py new file mode 100644 index 00000000..10ab0827 --- /dev/null +++ b/cterasdk/exceptions/common.py @@ -0,0 +1,32 @@ +from .base import CTERAException + + +class AwaitableTaskException(CTERAException): + + def __init__(self, message, awaitable_task): + """ + Awaitable Task Exception + + :param cterasdk.lib.tasks.AwaitableTask awaitable_task: Awaitable task object + """ + super().__init__(message) + self.awaitable_task = awaitable_task + + +class TaskException(CTERAException): + """ + Task Exception + + :param cterasdk.common.object.Object task: Task object + """ + def __init__(self, message, task): + super().__init__(message) + self.task = task + + +class TaskWaitTimeoutError(TaskException): + """ + Task Wait Timeout Error + """ + def __init__(self, duration, task): + super().__init__(f"Task {task.id} remains pending completion after {duration} second(s).", task) diff --git a/cterasdk/exceptions/direct.py b/cterasdk/exceptions/direct.py index be186af8..222be7ad 100644 --- a/cterasdk/exceptions/direct.py +++ b/cterasdk/exceptions/direct.py @@ -24,24 +24,30 @@ def __init__(self, error, strerror, filename): super().__init__(error, strerror, filename) -class NotFoundError(DirectIOAPIError): +class ObjectNotFoundError(DirectIOAPIError): def __init__(self, filename): - super().__init__(errno.EBADF, 'File not found', filename) + super().__init__(errno.ENOENT, 'File not found', filename) -class UnAuthorized(DirectIOAPIError): +class AuthorizationError(DirectIOAPIError): def __init__(self, filename): super().__init__(errno.EACCES, 'Unauthorized: You do not have the necessary permissions to access this resource', filename) -class UnprocessableContent(DirectIOAPIError): +class UnsupportedStorageError(DirectIOAPIError): def __init__(self, filename): super().__init__(errno.ENOTSUP, 'Not all blocks of the requested file are stored on a storage node set to Direct Mode', filename) +class InvalidRequest(DirectIOAPIError): + + def __init__(self, filename): + super().__init__(errno.EIO, 'Request failed due to internal error: invalid request', filename) + + class BlocksNotFoundError(DirectIOAPIError): def __init__(self, filename): @@ -129,6 +135,6 @@ def __init__(self, file_id, chunk): :param cterasdk.direct.types.Chunk chunk: Chunk. """ self.file_id = file_id - self.number = chunk.index + self.number = chunk.number self.offset = chunk.offset self.length = chunk.length diff --git a/cterasdk/exceptions/io.py b/cterasdk/exceptions/io.py index b47c6e02..e997f631 100644 --- a/cterasdk/exceptions/io.py +++ b/cterasdk/exceptions/io.py @@ -7,7 +7,7 @@ class RemoteStorageException(CTERAException): :ivar str path: Path """ - def __init__(self, message, path): + def __init__(self, message, path=None): super().__init__(message) self.path = path @@ -24,31 +24,37 @@ def __init__(self, path): super().__init__('Target validation error: Resource exists but it is not a directory.', path) -class ResourceExistsError(CTERAException): +class ResourceExistsError(RemoteStorageException): def __init__(self): super().__init__('Resource already exists: a file or folder with this name already exists.') -class PathValidationError(CTERAException): +class PathValidationError(RemoteStorageException): def __init__(self): super().__init__('Path validation failed: the specified destination path does not exist.') -class NameSyntaxError(CTERAException): +class NameSyntaxError(RemoteStorageException): def __init__(self): - super().__init__('Invalid name: the name contains characters that are not allowed.') + super().__init__('Invalid name: the name contains characters that are not allowed "\\ / : ? & < > \" |".') -class ReservedNameError(CTERAException): +class ReservedNameError(RemoteStorageException): def __init__(self): super().__init__('Reserved name error: the name is reserved and cannot be used.') -class RestrictedPathError(CTERAException): +class RestrictedPathError(RemoteStorageException): def __init__(self): super().__init__('Creating a folder in the specified location is forbidden.') + + +class RestrictedRoot(RemoteStorageException): + + def __init__(self): + super().__init__('Storing files to the root directory is forbidden.', '/') diff --git a/cterasdk/lib/__init__.py b/cterasdk/lib/__init__.py index c5696117..8266bc02 100644 --- a/cterasdk/lib/__init__.py +++ b/cterasdk/lib/__init__.py @@ -2,7 +2,7 @@ from .consent import ask # noqa: E402, F401 from .tempfile import TempfileServices # noqa: E402, F401 from .version import Version # noqa: E402, F401 -from .iterator import QueryIterator, BaseResponse, FetchResourcesResponse, \ +from .iterator import QueryIterator, BaseResponse, \ DefaultResponse, KeyValueQueryIterator, QueryLogsResponse, CursorResponse # noqa: E402, F401 from .tracker import track, ErrorStatus # noqa: E402, F401 from .crypto import CryptoServices, X509Certificate, PrivateKey, create_certificate_chain # noqa: E402, F401 diff --git a/cterasdk/lib/iterator.py b/cterasdk/lib/iterator.py index 84987229..9bc8258f 100644 --- a/cterasdk/lib/iterator.py +++ b/cterasdk/lib/iterator.py @@ -89,13 +89,6 @@ def objects(self): return self._response.logs -class FetchResourcesResponse(DefaultResponse): - - @property - def objects(self): - return self._response.items - - class CursorResponse(BaseResponse): @property diff --git a/cterasdk/lib/retries.py b/cterasdk/lib/retries.py new file mode 100644 index 00000000..d6ba48b8 --- /dev/null +++ b/cterasdk/lib/retries.py @@ -0,0 +1,34 @@ +import asyncio +import logging +import functools + + +logger = logging.getLogger('cterasdk.common') + + +def execute_with_retries(retries=None, backoff=None, max_backoff=None): + """ + A decorator that retries a function or coroutine upon exception, using exponential backoff. + + This decorator supports both synchronous and asynchronous functions. + + :param int retries: The maximum number of attempts before giving up. + :param int backoff: The initial backoff delay in seconds. + """ + def decorator(func): + @functools.wraps(func) + async def a_wrapper(*args, **kwargs): + delay = backoff + for try_num in range(retries): + try: + logger.debug("Try %s out of %s of function: '%s'", try_num + 1, retries, func.__name__) + return await func(*args, **kwargs) + except Exception as e: # pylint: disable=broad-exception-caught + logger.debug("Try %s of %s of function '%s' failed. Backing off for %s second(s).", try_num + 1, retries, + func.__name__, delay) + if try_num == retries - 1: + raise e + await asyncio.sleep(delay) + delay = min(max_backoff, delay * 2) + return a_wrapper + return decorator diff --git a/cterasdk/lib/task_manager_base.py b/cterasdk/lib/task_manager_base.py deleted file mode 100644 index dc59ed94..00000000 --- a/cterasdk/lib/task_manager_base.py +++ /dev/null @@ -1,72 +0,0 @@ -import time -import logging -from abc import ABC, abstractmethod -from ..exceptions import CTERAException -from ..convert import tojsonstr - - -logger = logging.getLogger('cterasdk.common') - - -class TaskRunningStatus: - Running = 'running' - Completed = 'completed' - Failed = 'failed' - Warnings = 'completed with warnings' - - -class TaskError(CTERAException): - - def __init__(self, task): - super().__init__() - self.task = task - - -class TaskBase(ABC): - - def __init__(self, CTERAHost, ref, retries=10, seconds=1): - self.CTERAHost = CTERAHost - self.path = self._get_task_id(ref) - self.attempt = 0 - self.retries = retries - self.seconds = seconds - self.running = True - - @abstractmethod - def _get_task_id(self, ref): - raise NotImplementedError("Subclass must implement _get_task_id") - - @abstractmethod - def get_task_status(self): - raise NotImplementedError("Subclass must implement get_task_status") - - def wait(self): - task = None - while self.running: - logger.debug('Obtaining task status. %s', {'path': self.path, 'attempt': (self.attempt + 1)}) - task = self.get_task_status() - logger.debug('Task status. %s', tojsonstr(task, False)) - self.increment() - self.running = task.status == TaskRunningStatus.Running - return TaskBase.resolve(task) - - @staticmethod - def resolve(task): - task_info = {'id': task.id, 'name': task.name, 'status': task.status, 'start_time': task.startTime, 'end_time': task.endTime} - if task.status == TaskRunningStatus.Failed: - logger.error('Task failed. %s', task_info) - raise TaskError(task) - if task.status == TaskRunningStatus.Warnings: - logger.warning('Task completed with warnings. %s', task_info) - if task.status == TaskRunningStatus.Completed: - logger.debug('Task completed successfully. %s', task_info) - return task - - def increment(self): - if self.attempt >= self.retries: - duration = time.strftime("%H:%M:%S", time.gmtime(self.retries * self.seconds)) - logger.error('Could not obtain task status in a timely manner. %s', {'duration': duration}) - raise CTERAException('Could not obtain task status in a timely manner.') - self.attempt = self.attempt + 1 - logger.debug('Sleep. %s', {'seconds': self.seconds}) - time.sleep(self.seconds) diff --git a/cterasdk/lib/tasks.py b/cterasdk/lib/tasks.py new file mode 100644 index 00000000..30c262c9 --- /dev/null +++ b/cterasdk/lib/tasks.py @@ -0,0 +1,183 @@ +import re +import time +import logging +import asyncio +from abc import ABC, abstractmethod + +from ..common.enum import TaskRunningStatus +from ..exceptions.common import TaskWaitTimeoutError, AwaitableTaskException +from ..exceptions.transport import HTTPError + + +logger = logging.getLogger('cterasdk.common') + + +class AwaitableTask(ABC): + + def __init__(self, ctera, ref): + self._ctera = ctera + self._ref = self._task_reference(ref) + + @property + def ref(self): + return self._ref + + def wait(self, timeout=None, poll_interval=None): + """ + Wait until the given task is complete, or until the timeout expires. + + If a positive `timeout` (in seconds) is specified, this function will block + only up to that duration. If `timeout` is None or non-positive, it will + wait indefinitely until the task completes. + + :param float,optional timeout: Raise exception in the event of a timeout + :param float,optional poll_interval: Poll interval, defaults to 1 second + """ + try: + return _synchronous_wait(self, timeout, poll_interval) + except HTTPError as error: + raise AwaitableTaskException(f"An error occurred while retrieving the status of Task '{self.ref}'.", + self._ctera.tasks.awaitable_task(self.ref)) from error + + async def a_wait(self, timeout=None, poll_interval=None): + """ + Wait until the given task is complete, or until the timeout expires. + + If a positive `timeout` (in seconds) is specified, this function will block + only up to that duration. If `timeout` is None or non-positive, it will + wait indefinitely until the task completes. + + :param float,optional timeout: Raise exception in the event of a timeout + :param float,optional poll_interval: Poll interval, defaults to 1 second + """ + try: + return await _asynchronous_wait(self, timeout, poll_interval) + except HTTPError as error: + raise AwaitableTaskException(f"An error occurred while retrieving the status of Task '{self.ref}'.", + self._ctera.tasks.awaitable_task(self.ref)) from error + + @abstractmethod + def _task_reference(self, ref): + raise NotImplementedError("Subclass must implement the '_task_reference' function.") + + @abstractmethod + def status(self): + raise NotImplementedError("Subclass must implement the 'status' function.") + + @abstractmethod + async def a_status(self): + raise NotImplementedError("Subclass must implement the 'a_status' function.") + + def __str__(self): + return self._ref + + def __repr__(self): + return str(self) + + +class AwaitableEdgeTask(AwaitableTask): + """ + Awaitable Edge Filer Task Object + """ + def _task_reference(self, ref): + uid = None + if isinstance(ref, int): + uid = str(ref) + elif isinstance(ref, str): + match = re.search('[1-9][0-9]*', ref) + if match is not None: + start, end = match.span() + uid = ref[start: end] + if uid is not None: + return '/proc/bgtasks/' + uid + logger.error('Failed to parse task identifier from reference: %s', ref) + raise ValueError(f'Failed to parse task identifier from reference: {ref}') + + def status(self): + """ + Synchronous function to retrieve task status. + """ + return self._ctera.api.get(self._ref) + + async def a_status(self): + """ + Asynchronous function to retrieve task status. + """ + return await self._ctera.api.get(self._ref) + + +class AwaitablePortalTask(AwaitableTask): + """ + Awaitable Portal Task Object + """ + def _task_reference(self, ref): + match = re.search('servers/[^/]*/bgTasks/[1-9][0-9]*$', ref) + if not match: + logger.error('Failed to parse task identifier from reference: %s', ref) + raise ValueError(f'Failed to parse task identifier from reference: {ref}') + return match.group(0) + + def status(self): + """ + Synchronous function to retrieve task status. + """ + if self._ctera.session().in_tenant_context(): + return self._ctera.api.execute('', 'getTaskStatus', self._ref) + return self._ctera.api.get(f'{self._ref}') + + async def a_status(self): + """ + Asynchronous function to retrieve task status. + """ + if self._ctera.session().in_tenant_context(): + return await self._ctera.v1.api.execute('', 'getTaskStatus', self._ref) + return await self._ctera.v1.api.get(f'{self._ref}') + + +def _before_wait(timeout, poll_interval): + timeout_at = None + + if timeout is not None: + if not isinstance(timeout, (int, float)): + raise ValueError('Timeout must be a positive int or float.') + timeout_at = time.time() + timeout + + if poll_interval is not None and isinstance(poll_interval, (int, float)): + raise ValueError('Poll interval must be a positive int or float.') + poll_interval = poll_interval if poll_interval is not None else 1 + + return timeout_at, poll_interval + + +def _synchronous_wait(awaitable_task, timeout=None, poll_interval=None): + timeout_at, poll_interval = _before_wait(timeout, poll_interval) + while True: + task = awaitable_task.status() + if task.status in [ + TaskRunningStatus.Completed, + TaskRunningStatus.Disabled, + TaskRunningStatus.Stopped, + TaskRunningStatus.Warnings, + TaskRunningStatus.Failed, + ]: + return task + if timeout_at is not None and time.time() > timeout_at: + raise TaskWaitTimeoutError(timeout, task) + time.sleep(poll_interval) + + +async def _asynchronous_wait(awaitable_task, timeout=None, poll_interval=None): + timeout_at, poll_interval = _before_wait(timeout, poll_interval) + while True: + task = await awaitable_task.a_status() + if task.status in [ + TaskRunningStatus.Completed, + TaskRunningStatus.Disabled, + TaskRunningStatus.Stopped, + TaskRunningStatus.Warnings, + TaskRunningStatus.Failed, + ]: + return task + if timeout_at is not None and time.time() > timeout_at: + raise TaskWaitTimeoutError(timeout, task) + await asyncio.sleep(poll_interval) diff --git a/cterasdk/objects/asynchronous/core.py b/cterasdk/objects/asynchronous/core.py index 94a2f78c..42e1ead9 100644 --- a/cterasdk/objects/asynchronous/core.py +++ b/cterasdk/objects/asynchronous/core.py @@ -4,7 +4,7 @@ from ...clients import clients from .. import authenticators from ...lib.session.core import Session -from ...asynchronous.core import files, login, cloudfs, notifications, portals, settings, users +from ...asynchronous.core import files, login, cloudfs, notifications, portals, settings, tasks, users class Clients: @@ -62,6 +62,7 @@ def __init__(self, host, port=None, https=True): self.files = files.CloudDrive(self) self.notifications = notifications.Notifications(self) self.settings = settings.Settings(self) + self.tasks = tasks.Tasks(self) self.users = users.Users(self) @property diff --git a/cterasdk/objects/synchronous/core.py b/cterasdk/objects/synchronous/core.py index 55dc53a3..7e5cfa7f 100644 --- a/cterasdk/objects/synchronous/core.py +++ b/cterasdk/objects/synchronous/core.py @@ -9,7 +9,7 @@ activation, admins, antivirus, buckets, cli, cloudfs, connection, credentials, devices, directoryservice, domains, files, firmwares, groups, kms, licenses, login, logs, mail, messaging, plans, portals, reports, roles, servers, settings, - setup, ssl, startup, storage_classes, syslog, taskmgr, templates, users, + setup, ssl, startup, storage_classes, syslog, tasks, templates, users, ) @@ -68,7 +68,7 @@ def __init__(self, host, port=None, https=True): self.roles = roles.Roles(self) self.settings = settings.Settings(self) self.storage_classes = storage_classes.StorageClasses(self) - self.tasks = taskmgr.Tasks(self) + self.tasks = tasks.Tasks(self) self.templates = templates.Templates(self) self.users = users.Users(self) diff --git a/cterasdk/objects/synchronous/edge.py b/cterasdk/objects/synchronous/edge.py index 8f9eea80..788d8db7 100644 --- a/cterasdk/objects/synchronous/edge.py +++ b/cterasdk/objects/synchronous/edge.py @@ -9,7 +9,7 @@ afp, aio, antivirus, array, audit, backup, cache, cli, config, connection, ctera_migrate, dedup, directoryservice, drive, files, firmware, ftp, groups, licenses, login, logs, mail, network, nfs, ntp, power, remote, rsync, ransom_protect, services, - shares, shell, smb, snmp, ssh, ssl, support, sync, syslog, taskmgr, telnet, + shares, shell, smb, snmp, ssh, ssl, support, sync, syslog, tasks, telnet, timezone, users, volumes, ) @@ -109,7 +109,7 @@ def __init__(self, host=None, port=None, https=True, Portal=None, *, base=None): self.support = support.Support(self) self.sync = sync.Sync(self) self.syslog = syslog.Syslog(self) - self.tasks = taskmgr.Tasks(self) + self.tasks = tasks.Tasks(self) self.telnet = telnet.Telnet(self) self.timezone = timezone.Timezone(self) self.users = users.Users(self) diff --git a/docs/source/UserGuides/DataServices/DirectIO.rst b/docs/source/UserGuides/DataServices/DirectIO.rst index 8117b9a3..68a11a29 100644 --- a/docs/source/UserGuides/DataServices/DirectIO.rst +++ b/docs/source/UserGuides/DataServices/DirectIO.rst @@ -213,9 +213,10 @@ Exceptions Hierarchy * IOError * DirectIOError * DirectIOAPIError - * NotFoundError - * UnAuthorized - * UnprocessableContent + * ObjectNotFoundError + * AuthorizationError + * UnsupportedStorageError + * InvalidRequest * BlocksNotFoundError * BlockListConnectionError * BlockListTimeout diff --git a/docs/source/UserGuides/Edge/Migration.rst b/docs/source/UserGuides/Edge/Migration.rst index 300182a5..1b8a96d8 100644 --- a/docs/source/UserGuides/Edge/Migration.rst +++ b/docs/source/UserGuides/Edge/Migration.rst @@ -64,8 +64,28 @@ Migration .. code-block:: python - edge.ctera_migrate.results(task) + """Retrieve tasks and all jobs associated with a task""" + tasks = edge.ctera_migrate.list_tasks() + jobs = edge.ctera_migrate.details(tasks[0]) + """Retrieve the results of a job execution""" + edge.ctera_migrate.results(tasks[0], jobs[0]) + edge.ctera_migrate.results(tasks[0], jobs[0], export=True) # export to file + edge.ctera_migrate.results(tasks[0], jobs[0], export=True, destination='c:/users/bwayne/downloads/discovery.csv') + +.. automethod:: cterasdk.edge.ctera_migrate.CTERAMigrate.log + :noindex: + +.. code-block:: python + + """Retrieve tasks and all jobs associated with a task""" + tasks = edge.ctera_migrate.list_tasks() + jobs = edge.ctera_migrate.details(tasks[0]) + + """Retrieve the results of a job execution""" + edge.ctera_migrate.log(tasks[0], jobs[0]) + edge.ctera_migrate.log(tasks[0], jobs[0], share='smb-project-nexus') # export to file + edge.ctera_migrate.log(tasks[0], jobs[0], share='smb-project-nexus', destination='c:/users/bwayne/downloads/discovery.csv') Discovery ========= diff --git a/docs/source/UserGuides/Miscellaneous/Changelog.rst b/docs/source/UserGuides/Miscellaneous/Changelog.rst index d45ca2c1..a99b5ae6 100644 --- a/docs/source/UserGuides/Miscellaneous/Changelog.rst +++ b/docs/source/UserGuides/Miscellaneous/Changelog.rst @@ -1,6 +1,45 @@ Changelog ========= +Improvements +^^^^^^^^^^^^ + +* Refactored the CTERA Direct I/O module to reduce duplicate code and improve exception handling. +* Added support for configuring Cloud Drive folders with Global File Locking. +* Improved authentication error handling by catching HTTP exceptions and raising :py:class:`cterasdk.exceptions.auth.AuthenticationError`. +* Added an attribute to indicate whether deduplication is enabled when retrieving the deduplication status. +* Raise an exception when uploading a file with invalid characters in its name. +* Raise an exception when attempting to upload files to the Cloud Drive root directory. +* Added support for exporting data discovery and migration jobs to CSV format. +* Introduced an asynchronous task management module for operations such as copying, moving, renaming, deleting, or undeleting files. +* Background tasks now return awaitable objects: :py:class:`cterasdk.lib.tasks.AwaitableEdgeTask`, + :py:class:`cterasdk.lib.tasks.AwaitablePortalTask`. + +Bug Fixes +^^^^^^^^^ + +* Fixed an `AttributeError` when a connection error occurs while waiting for an Edge Filer to reboot. + +Related issues and pull requests on GitHub: `#315 `_ + +.. code:: python + + # Background task: 'Apply Provisioning Changes' + result = admin.users.apply_changes(wait=True) # Wait for provisioning changes to complete and return the result + + awaitable_task = admin.users.apply_changes() # Return an awaitable task object without waiting + result = awaitable_task.status() # Get the current status of the task + result = awaitable_task.wait() # Wait for task completion + result = awaitable_task.wait(timeout=5) # Wait up to 5 seconds for the task to complete + + # Moving files and folders + result = user.files.move(('My Files/doc.docx', 'Documents/Guide.docx')) # Move a file and wait for completion + + awaitable_task = user.files.move(('My Files/doc.docx', 'Documents/Guide.docx'), wait=False) # Return an awaitable task object + result = awaitable_task.wait() # Wait for the move operation to complete + +.. + 2.20.18 ------- diff --git a/docs/source/UserGuides/Portal/Administration.rst b/docs/source/UserGuides/Portal/Administration.rst index 17ab0338..801806a7 100644 --- a/docs/source/UserGuides/Portal/Administration.rst +++ b/docs/source/UserGuides/Portal/Administration.rst @@ -1475,6 +1475,13 @@ Cloud Drive Folders owner_sid = 'S-1-12-1-1536910496-1126310805-1188065941-1612002142' admin.cloudfs.drives.setoacl(folders_paths, owner_sid, True) + +Global File Locking +------------------- + +.. automethod:: cterasdk.core.cloudfs.Locks.all + :noindex: + Zones ----- diff --git a/docs/source/api/cterasdk.core.rst b/docs/source/api/cterasdk.core.rst index 6beb8056..ddba03c5 100644 --- a/docs/source/api/cterasdk.core.rst +++ b/docs/source/api/cterasdk.core.rst @@ -45,7 +45,7 @@ Submodules cterasdk.core.ssl cterasdk.core.startup cterasdk.core.syslog - cterasdk.core.taskmgr + cterasdk.core.tasks cterasdk.core.templates cterasdk.core.types cterasdk.core.admins diff --git a/docs/source/api/cterasdk.core.taskmgr.rst b/docs/source/api/cterasdk.core.taskmgr.rst deleted file mode 100644 index e7e3d0b0..00000000 --- a/docs/source/api/cterasdk.core.taskmgr.rst +++ /dev/null @@ -1,7 +0,0 @@ -cterasdk.core.taskmgr module -============================ - -.. automodule:: cterasdk.core.taskmgr - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/source/api/cterasdk.core.tasks.rst b/docs/source/api/cterasdk.core.tasks.rst new file mode 100644 index 00000000..ca6f2837 --- /dev/null +++ b/docs/source/api/cterasdk.core.tasks.rst @@ -0,0 +1,7 @@ +cterasdk.core.tasks module +========================== + +.. automodule:: cterasdk.core.tasks + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/api/cterasdk.edge.rst b/docs/source/api/cterasdk.edge.rst index d1cd3bcc..d6fdb631 100644 --- a/docs/source/api/cterasdk.edge.rst +++ b/docs/source/api/cterasdk.edge.rst @@ -58,7 +58,7 @@ Submodules cterasdk.edge.support cterasdk.edge.sync cterasdk.edge.syslog - cterasdk.edge.taskmgr + cterasdk.edge.tasks cterasdk.edge.telnet cterasdk.edge.timezone cterasdk.edge.types diff --git a/docs/source/api/cterasdk.edge.taskmgr.rst b/docs/source/api/cterasdk.edge.taskmgr.rst deleted file mode 100644 index bebd10bf..00000000 --- a/docs/source/api/cterasdk.edge.taskmgr.rst +++ /dev/null @@ -1,7 +0,0 @@ -cterasdk.edge.taskmgr module -============================ - -.. automodule:: cterasdk.edge.taskmgr - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/source/api/cterasdk.edge.tasks.rst b/docs/source/api/cterasdk.edge.tasks.rst new file mode 100644 index 00000000..81ced02b --- /dev/null +++ b/docs/source/api/cterasdk.edge.tasks.rst @@ -0,0 +1,7 @@ +cterasdk.edge.tasks module +========================== + +.. automodule:: cterasdk.edge.tasks + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/api/cterasdk.exceptions.auth.rst b/docs/source/api/cterasdk.exceptions.auth.rst new file mode 100644 index 00000000..ffd9c60f --- /dev/null +++ b/docs/source/api/cterasdk.exceptions.auth.rst @@ -0,0 +1,7 @@ +cterasdk.exceptions.auth module +=============================== + +.. automodule:: cterasdk.exceptions.auth + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/api/cterasdk.exceptions.backup.rst b/docs/source/api/cterasdk.exceptions.backup.rst new file mode 100644 index 00000000..cadf3a5b --- /dev/null +++ b/docs/source/api/cterasdk.exceptions.backup.rst @@ -0,0 +1,7 @@ +cterasdk.exceptions.backup module +================================= + +.. automodule:: cterasdk.exceptions.backup + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/api/cterasdk.exceptions.common.rst b/docs/source/api/cterasdk.exceptions.common.rst new file mode 100644 index 00000000..295e3236 --- /dev/null +++ b/docs/source/api/cterasdk.exceptions.common.rst @@ -0,0 +1,7 @@ +cterasdk.exceptions.common module +================================= + +.. automodule:: cterasdk.exceptions.common + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/api/cterasdk.exceptions.rst b/docs/source/api/cterasdk.exceptions.rst index af47f0d6..08373595 100644 --- a/docs/source/api/cterasdk.exceptions.rst +++ b/docs/source/api/cterasdk.exceptions.rst @@ -11,6 +11,9 @@ Submodules .. toctree:: + cterasdk.exceptions.auth + cterasdk.exceptions.backup + cterasdk.exceptions.common cterasdk.exceptions.direct cterasdk.exceptions.io cterasdk.exceptions.notifications diff --git a/docs/source/api/cterasdk.lib.rst b/docs/source/api/cterasdk.lib.rst index b8a97433..6bf07915 100644 --- a/docs/source/api/cterasdk.lib.rst +++ b/docs/source/api/cterasdk.lib.rst @@ -17,6 +17,7 @@ Submodules cterasdk.lib.platform cterasdk.lib.registry cterasdk.lib.storage + cterasdk.lib.tasks cterasdk.lib.tempfile cterasdk.lib.tracker cterasdk.lib.version diff --git a/docs/source/api/cterasdk.lib.tasks.rst b/docs/source/api/cterasdk.lib.tasks.rst new file mode 100644 index 00000000..b9055bd2 --- /dev/null +++ b/docs/source/api/cterasdk.lib.tasks.rst @@ -0,0 +1,7 @@ +cterasdk.lib.tasks module +========================= + +.. automodule:: cterasdk.lib.tasks + :members: + :undoc-members: + :show-inheritance: diff --git a/tests/ut/aio/direct/base.py b/tests/ut/aio/direct/base.py index 3ccb9734..1230b17e 100644 --- a/tests/ut/aio/direct/base.py +++ b/tests/ut/aio/direct/base.py @@ -8,6 +8,6 @@ class BaseAsyncDirect(base.BaseAsyncTest): def setUp(self, access=None, secret=None): super().setUp() self._direct = ctera_direct.client.DirectIO('', access, secret) - self._direct._client._api.get = mock.AsyncMock() # pylint: disable=protected-access - self._direct._client._client.get = mock.AsyncMock() # pylint: disable=protected-access + self._direct._api.get = mock.AsyncMock() # pylint: disable=protected-access + self._direct._client.get = mock.AsyncMock() # pylint: disable=protected-access self._authorization_header = {'Authorization': f'Bearer {access}'} diff --git a/tests/ut/aio/direct/test_get_metadata.py b/tests/ut/aio/direct/test_get_metadata.py index 9da62cb7..677cba60 100644 --- a/tests/ut/aio/direct/test_get_metadata.py +++ b/tests/ut/aio/direct/test_get_metadata.py @@ -1,6 +1,5 @@ import asyncio import errno -from http import HTTPStatus from unittest import mock import munch from cterasdk import exceptions @@ -15,18 +14,18 @@ def setUp(self): # pylint: disable=arguments-differ self._file_id = 12345 async def test_retries_on_error(self): - self._direct._client._api.get.return_value = munch.Munch({'chunks': None}) # pylint: disable=protected-access + self._direct._api.get.return_value = munch.Munch({'chunks': None}) # pylint: disable=protected-access with mock.patch('asyncio.sleep'): with self.assertRaises(exceptions.direct.BlocksNotFoundError): await self._direct.metadata(self._file_id) - self._direct._client._api.get.assert_has_calls( # pylint: disable=protected-access + self._direct._api.get.assert_has_calls( # pylint: disable=protected-access self._retries * [ mock.call(f'{self._file_id}', headers=self._authorization_header), ] ) async def test_get_file_metadata_not_found(self): - self._direct._client._api.get.return_value = munch.Munch({'chunks': None}) # pylint: disable=protected-access + self._direct._api.get.return_value = munch.Munch({'chunks': None}) # pylint: disable=protected-access with mock.patch('asyncio.sleep'): with self.assertRaises(exceptions.direct.BlocksNotFoundError) as error: await self._direct.metadata(self._file_id) @@ -35,54 +34,51 @@ async def test_get_file_metadata_not_found(self): self.assertEqual(error.exception.filename, self._file_id) async def test_get_file_metadata_error_400(self): - self._direct._client._api.get.side_effect = exceptions.transport.HTTPError( # pylint: disable=protected-access - HTTPStatus.BAD_REQUEST, + self._direct._api.get.side_effect = exceptions.transport.BadRequest( # pylint: disable=protected-access BaseDirectMetadata._create_error_object() ) with mock.patch('asyncio.sleep'): - with self.assertRaises(exceptions.direct.NotFoundError) as error: + with self.assertRaises(exceptions.direct.ObjectNotFoundError) as error: await self._direct.metadata(self._file_id) - self.assertEqual(error.exception.errno, errno.EBADF) + self.assertEqual(error.exception.errno, errno.ENOENT) self.assertEqual(error.exception.strerror, 'File not found') self.assertEqual(error.exception.filename, self._file_id) async def test_get_file_metadata_error_401(self): - self._direct._client._api.get.side_effect = exceptions.transport.HTTPError( # pylint: disable=protected-access - HTTPStatus.UNAUTHORIZED, + self._direct._api.get.side_effect = exceptions.transport.Unauthorized( # pylint: disable=protected-access BaseDirectMetadata._create_error_object() ) with mock.patch('asyncio.sleep'): - with self.assertRaises(exceptions.direct.UnAuthorized) as error: + with self.assertRaises(exceptions.direct.AuthorizationError) as error: await self._direct.metadata(self._file_id) self.assertEqual(error.exception.errno, errno.EACCES) self.assertEqual(error.exception.strerror, 'Unauthorized: You do not have the necessary permissions to access this resource') self.assertEqual(error.exception.filename, self._file_id) async def test_get_file_metadata_error_422(self): - self._direct._client._api.get.side_effect = exceptions.transport.HTTPError( # pylint: disable=protected-access - HTTPStatus.UNPROCESSABLE_ENTITY, + self._direct._api.get.side_effect = exceptions.transport.Unprocessable( # pylint: disable=protected-access BaseDirectMetadata._create_error_object() ) with mock.patch('asyncio.sleep'): - with self.assertRaises(exceptions.direct.UnprocessableContent) as error: + with self.assertRaises(exceptions.direct.UnsupportedStorageError) as error: await self._direct.metadata(self._file_id) self.assertEqual(error.exception.errno, errno.ENOTSUP) self.assertEqual(error.exception.strerror, 'Not all blocks of the requested file are stored on a storage node set to Direct Mode') self.assertEqual(error.exception.filename, self._file_id) async def test_get_file_metadata_unknown_error(self): - url = '/xyz' - self._direct._client._api.get.side_effect = exceptions.transport.HTTPError( # pylint: disable=protected-access - HTTPStatus.INTERNAL_SERVER_ERROR, + self._direct._api.get.side_effect = exceptions.transport.InternalServerError( # pylint: disable=protected-access BaseDirectMetadata._create_error_object() ) with mock.patch('asyncio.sleep'): - with self.assertRaises(exceptions.transport.HTTPError) as error: + with self.assertRaises(exceptions.direct.InvalidRequest) as error: await self._direct.metadata(self._file_id) - self.assertEqual(error.exception.error.request.url, url) + self.assertEqual(error.exception.errno, errno.EIO) + self.assertEqual(error.exception.strerror, 'Request failed due to internal error: invalid request') + self.assertEqual(error.exception.filename, self._file_id) async def test_get_file_metadata_connection_error(self): - self._direct._client._api.get.side_effect = ConnectionError # pylint: disable=protected-access + self._direct._api.get.side_effect = ConnectionError # pylint: disable=protected-access with mock.patch('asyncio.sleep'): with self.assertRaises(exceptions.direct.BlockListConnectionError) as error: await self._direct.metadata(self._file_id) @@ -92,7 +88,7 @@ async def test_get_file_metadata_connection_error(self): self.assertEqual(error.exception.filename, self._file_id) async def test_get_file_metadata_timeout(self): - self._direct._client._api.get.side_effect = asyncio.TimeoutError # pylint: disable=protected-access + self._direct._api.get.side_effect = asyncio.TimeoutError # pylint: disable=protected-access with mock.patch('asyncio.sleep'): with self.assertRaises(exceptions.direct.BlockListTimeout) as error: await self._direct.metadata(self._file_id) diff --git a/tests/ut/aio/direct/test_get_object.py b/tests/ut/aio/direct/test_get_object.py index 257fbbfd..0792255f 100644 --- a/tests/ut/aio/direct/test_get_object.py +++ b/tests/ut/aio/direct/test_get_object.py @@ -17,10 +17,10 @@ def setUp(self): # pylint: disable=arguments-differ async def test_get_object_connection_error(self): chunk = BaseDirectMetadata._create_chunk() - self._direct._client._client.get.side_effect = ConnectionError # pylint: disable=protected-access + self._direct._client.get.side_effect = ConnectionError # pylint: disable=protected-access with mock.patch('asyncio.sleep'): with self.assertRaises(exceptions.direct.DownloadConnectionError) as error: - await get_object(self._direct._client._client, self._file_id, chunk) # pylint: disable=protected-access + await get_object(self._direct._client, self._file_id, chunk) # pylint: disable=protected-access self.assertEqual(error.exception.errno, errno.ENETRESET) self.assertEqual(error.exception.strerror, 'Failed to download block. Connection error') self.assertEqual(error.exception.filename, self._file_id) @@ -28,10 +28,10 @@ async def test_get_object_connection_error(self): async def test_get_object_timeout(self): chunk = BaseDirectMetadata._create_chunk() - self._direct._client._client.get.side_effect = asyncio.TimeoutError # pylint: disable=protected-access + self._direct._client.get.side_effect = asyncio.TimeoutError # pylint: disable=protected-access with mock.patch('asyncio.sleep'): with self.assertRaises(exceptions.direct.DownloadTimeout) as error: - await get_object(self._direct._client._client, self._file_id, chunk) # pylint: disable=protected-access + await get_object(self._direct._client, self._file_id, chunk) # pylint: disable=protected-access self.assertEqual(error.exception.errno, errno.ETIMEDOUT) self.assertEqual(error.exception.strerror, 'Failed to download block. Timed out') self.assertEqual(error.exception.filename, self._file_id) @@ -44,10 +44,10 @@ async def test_response_read_io_error(self): async def stream_reader(): raise IOError(message) - self._direct._client._client.get.return_value = munch.Munch({'read': stream_reader}) # pylint: disable=protected-access + self._direct._client.get.return_value = munch.Munch({'read': stream_reader}) # pylint: disable=protected-access with mock.patch('asyncio.sleep'): with self.assertRaises(exceptions.direct.DownloadError) as error: - await get_object(self._direct._client._client, self._file_id, chunk) # pylint: disable=protected-access + await get_object(self._direct._client, self._file_id, chunk) # pylint: disable=protected-access self.assertEqual(error.exception.errno, errno.EIO) self.assertEqual(str(error.exception.strerror), message) self.assertEqual(error.exception.filename, self._file_id) @@ -55,13 +55,13 @@ async def stream_reader(): async def test_get_client_error(self): chunk = BaseDirectMetadata._create_chunk() - self._direct._client._client.get.side_effect = exceptions.transport.HTTPError( # pylint: disable=protected-access + self._direct._client.get.side_effect = exceptions.transport.HTTPError( # pylint: disable=protected-access HTTPStatus.INTERNAL_SERVER_ERROR, BaseDirectMetadata._create_error_object(HTTPStatus.INTERNAL_SERVER_ERROR.value) ) with mock.patch('asyncio.sleep'): with self.assertRaises(exceptions.direct.DownloadError) as error: - await get_object(self._direct._client._client, self._file_id, chunk) # pylint: disable=protected-access + await get_object(self._direct._client, self._file_id, chunk) # pylint: disable=protected-access self.assertEqual(error.exception.errno, errno.EIO) self.assertEqual(error.exception.strerror.response.status, 500) self.assertEqual(error.exception.filename, self._file_id) @@ -80,7 +80,7 @@ def _create_error_object(status): def _create_block_info(file_id, chunk): return munch.Munch({ 'file_id': file_id, - 'number': chunk.index, + 'number': chunk.number, 'offset': chunk.offset, 'length': chunk.length }) @@ -89,7 +89,7 @@ def _create_block_info(file_id, chunk): def _create_chunk(): return munch.Munch({ 'url': 'https://s3.amazonaws.com/test', - 'index': 1, + 'number': 1, 'offset': 0, 'length': 4096 }) diff --git a/tests/ut/core/admin/test_cloudfs_cloud_drives.py b/tests/ut/core/admin/test_cloudfs_cloud_drives.py index 802e8792..beeb58ab 100644 --- a/tests/ut/core/admin/test_cloudfs_cloud_drives.py +++ b/tests/ut/core/admin/test_cloudfs_cloud_drives.py @@ -22,6 +22,8 @@ def setUp(self): self._description = 'description' self._user_uid = 1337 self._folder_uid = 7331 + self._cloudfolder_path = f'/Users/John Smith/{self._cloudfolder_name}' + self._add_cloudfolder_response = f'teamPortals/portal/cloudDrives/{self._cloudfolder_path}' self._nt_acl_folders = Object() self._nt_acl_folders._classname = 'SDDLFoldersParam' # pylint: disable=protected-access @@ -61,7 +63,7 @@ def _get_list_folders_param(include=None, include_deleted=False, filter_deleted= return builder.build() def test_add_cloud_drive_with_local_owner_no_winacls_param(self): - self._init_global_admin(get_response='admin', execute_response='Success') + self._init_global_admin(get_response='admin', execute_response=self._add_cloudfolder_response) self._mock_get_user_base_object_ref() self._mock_get_folder_group() @@ -75,10 +77,27 @@ def test_add_cloud_drive_with_local_owner_no_winacls_param(self): actual_param = self._global_admin.api.execute.call_args[0][2] self._assert_equal_objects(actual_param, expected_param) - self.assertEqual(ret, 'Success') + self.assertEqual(ret, self._cloudfolder_path) + + def test_add_cloud_drive_with_locking_default_extensions(self): + self._init_global_admin(get_response='admin', execute_response=self._add_cloudfolder_response) + self._mock_get_user_base_object_ref() + self._mock_get_folder_group() + + ret = cloudfs.CloudDrives(self._global_admin).add(self._name, self._group, self._local_user_account, gfl=True) + + self._global_admin.users.get.assert_called_once_with(self._local_user_account, ['baseObjectRef']) + self._global_admin.cloudfs.groups.get.assert_called_once_with(self._group, ['baseObjectRef']) + self._global_admin.api.execute.assert_called_once_with('', 'addCloudDrive', mock.ANY) + + expected_param = self._get_add_cloud_drive_object(gfl=True) + actual_param = self._global_admin.api.execute.call_args[0][2] + self._assert_equal_objects(actual_param, expected_param) + + self.assertEqual(ret, self._cloudfolder_path) def test_add_cloud_drive_with_local_owner_no_winacls_param_with_description(self): - self._init_global_admin(get_response='admin', execute_response='Success') + self._init_global_admin(get_response='admin', execute_response=self._add_cloudfolder_response) self._mock_get_user_base_object_ref() self._mock_get_folder_group() @@ -92,11 +111,11 @@ def test_add_cloud_drive_with_local_owner_no_winacls_param_with_description(self actual_param = self._global_admin.api.execute.call_args[0][2] self._assert_equal_objects(actual_param, expected_param) - self.assertEqual(ret, 'Success') + self.assertEqual(ret, self._cloudfolder_path) def test_add_cloud_drive_with_local_owner_winacls_true(self): get_response = 'admin' - self._init_global_admin(get_response=get_response, execute_response='Success') + self._init_global_admin(get_response=get_response, execute_response=self._add_cloudfolder_response) self._mock_get_user_base_object_ref() self._mock_get_folder_group() @@ -110,11 +129,11 @@ def test_add_cloud_drive_with_local_owner_winacls_true(self): actual_param = self._global_admin.api.execute.call_args[0][2] self._assert_equal_objects(actual_param, expected_param) - self.assertEqual(ret, 'Success') + self.assertEqual(ret, self._cloudfolder_path) def test_add_cloud_drive_with_local_owner_winacls_false(self): get_response = 'admin' - self._init_global_admin(get_response=get_response, execute_response='Success') + self._init_global_admin(get_response=get_response, execute_response=self._add_cloudfolder_response) self._mock_get_user_base_object_ref() self._mock_get_folder_group() @@ -129,11 +148,11 @@ def test_add_cloud_drive_with_local_owner_winacls_false(self): actual_param = self._global_admin.api.execute.call_args[0][2] self._assert_equal_objects(actual_param, expected_param) - self.assertEqual(ret, 'Success') + self.assertEqual(ret, self._cloudfolder_path) def test_add_cloud_drive_with_local_owner_raise(self): get_response = 'admin' - self._init_global_admin(get_response=get_response, execute_response='Success') + self._init_global_admin(get_response=get_response) self._mock_get_user_base_object_ref() self._mock_get_folder_group() @@ -172,7 +191,8 @@ def test_undelete_with_local_owner(self): self._global_admin.users.get.assert_called_once_with(self._local_user_account, ['displayName']) self._global_admin.files.undelete.assert_called_once_with(f'Users/{self._owner}/{self._name}') - def _get_add_cloud_drive_object(self, winacls=True, description=None, quota=None, compliance_settings=None, xattrs=None): + def _get_add_cloud_drive_object(self, winacls=True, description=None, quota=None, compliance_settings=None, xattrs=None, + gfl=None, lock_extensions=None): add_cloud_drive_param = Object() add_cloud_drive_param.name = self._name add_cloud_drive_param.owner = self._owner @@ -183,6 +203,13 @@ def _get_add_cloud_drive_object(self, winacls=True, description=None, quota=None add_cloud_drive_param.description = description add_cloud_drive_param.wormSettings = compliance_settings if compliance_settings else ComplianceSettingsBuilder.default().build() add_cloud_drive_param.extendedAttributes = xattrs if xattrs else ExtendedAttributesBuilder.default().build() + if gfl: + add_cloud_drive_param.globalFileLockSettings = Object() + add_cloud_drive_param.globalFileLockSettings._classname = 'GlobalFileLockSettings' # pylint: disable=protected-access + add_cloud_drive_param.globalFileLockSettings.enabled = True + add_cloud_drive_param.globalFileLockSettings.globalFileLockExtensions = ( + lock_extensions if lock_extensions else cloudfs.CloudDrives.default_extensions + ) return add_cloud_drive_param def _mock_get_user_base_object_ref(self): diff --git a/tests/ut/core/admin/test_copy.py b/tests/ut/core/admin/test_copy.py index 4e1a7482..0598fccd 100644 --- a/tests/ut/core/admin/test_copy.py +++ b/tests/ut/core/admin/test_copy.py @@ -9,14 +9,15 @@ class TestCoreFilesBrowser(base_admin.BaseCoreTest): _base_path = '/admin/webdav/Users' + _task_reference = 'servers/MainDB/bgTasks/918908' - def test_copy(self): - expected_response = 'success' + def test_copy_no_wait(self): + expected_response = TestCoreFilesBrowser._task_reference src = 'cloud/Users' dst = 'public' self._init_global_admin(execute_response=expected_response) - actual_response = io.copy(self._global_admin, self._get_object_path(src), destination=self._get_object_path(dst)) - self.assertEqual(expected_response, actual_response) + actual_response = io.copy(self._global_admin, self._get_object_path(src), destination=self._get_object_path(dst), wait=False) + self.assertEqual(expected_response, actual_response.ref) self._global_admin.api.execute.assert_called_once_with('', 'copyResources', mock.ANY) expected_copy_param = self._get_expected_copy_params(src, dst) actual_copy_param = self._global_admin.api.execute.call_args[0][2] diff --git a/tests/ut/core/admin/test_portals.py b/tests/ut/core/admin/test_portals.py index c8ab60c3..d77db81d 100644 --- a/tests/ut/core/admin/test_portals.py +++ b/tests/ut/core/admin/test_portals.py @@ -24,6 +24,7 @@ def setUp(self): self._billing_id = 'billing-id' self._company = 'The Acme Corporation' self._tenant_attrs = ['externalPortalId', 'companyName'] + self._task_reference = 'servers/MainDB/bgTasks/918908' mock_session = self.patch_call("cterasdk.objects.services.Management.session") mock_session.return_value = munch.Munch({'update_current_tenant': lambda x: x}) @@ -181,14 +182,14 @@ def test_browse_global_admin(self): self._global_admin.api.put.assert_called_once_with('/currentPortal', '') def test_apply_changes(self): - execute_response = 'Success' + execute_response = self._task_reference self._init_global_admin(execute_response=execute_response) ret = portals.Portals(self._global_admin).apply_changes() self._global_admin.api.execute.assert_called_once_with('', 'updatePortals', mock.ANY) expected_param = TestCorePortals._get_apply_changes_param() actual_param = self._global_admin.api.execute.call_args[0][2] self._assert_equal_objects(actual_param, expected_param) - self.assertEqual(ret, execute_response) + self.assertEqual(ret.ref, execute_response) @staticmethod def _get_apply_changes_param(): diff --git a/tests/ut/core/admin/test_templates.py b/tests/ut/core/admin/test_templates.py index 01b1e62e..14065b7a 100644 --- a/tests/ut/core/admin/test_templates.py +++ b/tests/ut/core/admin/test_templates.py @@ -21,6 +21,7 @@ def setUp(self): self._name = 'Template' self._classname = 'DeviceTemplate' self._description = 'description' + self._task_reference = 'servers/MainDB/bgTasks/918908' def test_get_template_default_attrs(self): get_multi_response = self._get_template_object(name=self._name) @@ -64,7 +65,7 @@ def test_delete_template(self): self.assertEqual(ret, delete_response) def test_set_default_template_no_wait(self): - execute_response = 'Success' + execute_response = self._task_reference self._init_global_admin(execute_response=execute_response) ret = templates.Templates(self._global_admin).set_default(self._name) self._global_admin.api.execute.assert_has_calls([ @@ -74,7 +75,7 @@ def test_set_default_template_no_wait(self): self.assertEqual(ret, execute_response) def test_remove_default_template_no_wait(self): - execute_response = 'Success' + execute_response = self._task_reference get_multi_response = self._get_template_object(name=self._name, isDefault=True) self._init_global_admin(get_multi_response=get_multi_response, execute_response=execute_response) ret = templates.Templates(self._global_admin).remove_default(self._name) diff --git a/tests/ut/core/admin/test_users.py b/tests/ut/core/admin/test_users.py index 7f7a98e0..1caace86 100644 --- a/tests/ut/core/admin/test_users.py +++ b/tests/ut/core/admin/test_users.py @@ -24,6 +24,7 @@ def setUp(self): self._role = 'EndUser' self._domain = 'ctera.local' self._domain_user_account = UserAccount(self._username, self._domain) + self._task_reference = 'servers/MainDB/bgTasks/918908' def test_get_user_default_attrs(self): get_multi_response = self._get_user_object(name=self._local_user_account.name) @@ -165,14 +166,14 @@ def test_delete_domain_user(self): self.assertEqual(ret, execute_response) def test_apply_changes(self): - execute_response = 'Success' + execute_response = self._task_reference self._init_global_admin(execute_response=execute_response) ret = users.Users(self._global_admin).apply_changes() self._global_admin.api.execute.assert_called_once_with('', 'updateAccounts', mock.ANY) expected_param = TestCoreUsers._get_apply_changes_param() actual_param = self._global_admin.api.execute.call_args[0][2] self._assert_equal_objects(actual_param, expected_param) - self.assertEqual(ret, execute_response) + self.assertEqual(ret.ref, execute_response) @staticmethod def _get_apply_changes_param(): diff --git a/tests/ut/core/user/base_user.py b/tests/ut/core/user/base_user.py index 8c84f25a..953b95e1 100644 --- a/tests/ut/core/user/base_user.py +++ b/tests/ut/core/user/base_user.py @@ -12,6 +12,7 @@ def setUp(self): super().setUp() self._services = ServicesPortal("") self._base = '/ServicesPortal/webdav' + self._task_reference = 'servers/MainDB/bgTasks/918908' @staticmethod def encode_path(path): diff --git a/tests/ut/core/user/test_browser.py b/tests/ut/core/user/test_browser.py index a140b8db..4c0cb6e8 100644 --- a/tests/ut/core/user/test_browser.py +++ b/tests/ut/core/user/test_browser.py @@ -57,7 +57,7 @@ def test_rename(self): new_name = 'Names' rename_mock = self.patch_call('cterasdk.core.files.io.rename') self.files.rename(path, new_name) - rename_mock.assert_called_once_with(self._global_admin, mock.ANY, new_name) + rename_mock.assert_called_once_with(self._global_admin, mock.ANY, new_name, wait=True) actual_ctera_path = rename_mock.call_args[0][1] self.assertEqual(actual_ctera_path.absolute, TestCoreFilesBrowser._create_expected_path(TestCoreFilesBrowser._base_path, path)) @@ -65,7 +65,7 @@ def test_delete(self): path = 'cloud/Users' rm_mock = self.patch_call('cterasdk.core.files.io.remove') self.files.delete(path) - rm_mock.assert_called_once_with(self._global_admin, mock.ANY) + rm_mock.assert_called_once_with(self._global_admin, mock.ANY, wait=True) actual_ctera_path = rm_mock.call_args[0][1] self.assertEqual(actual_ctera_path.absolute, TestCoreFilesBrowser._create_expected_path(TestCoreFilesBrowser._base_path, path)) @@ -73,7 +73,7 @@ def test_undelete(self): path = 'cloud/Users' recover_mock = self.patch_call('cterasdk.core.files.io.recover') self.files.undelete(path) - recover_mock.assert_called_once_with(self._global_admin, mock.ANY) + recover_mock.assert_called_once_with(self._global_admin, mock.ANY, wait=True) actual_ctera_path = recover_mock.call_args[0][1] self.assertEqual(actual_ctera_path.absolute, TestCoreFilesBrowser._create_expected_path(TestCoreFilesBrowser._base_path, path)) @@ -82,7 +82,7 @@ def test_move(self): dst = 'public' mv_mock = self.patch_call('cterasdk.core.files.io.move') self.files.move(src, destination=dst) - mv_mock.assert_called_once_with(self._global_admin, mock.ANY, destination=mock.ANY) + mv_mock.assert_called_once_with(self._global_admin, mock.ANY, destination=mock.ANY, wait=True) actual_ctera_paths = mv_mock.call_args[0][1:] self.assertListEqual( [actual_ctera_path.absolute for actual_ctera_path in actual_ctera_paths], @@ -94,7 +94,7 @@ def test_copy(self): dst = 'public' cp_mock = self.patch_call('cterasdk.core.files.io.copy') self.files.copy(src, destination=dst) - cp_mock.assert_called_once_with(self._global_admin, mock.ANY, destination=mock.ANY) + cp_mock.assert_called_once_with(self._global_admin, mock.ANY, destination=mock.ANY, wait=True) actual_ctera_paths = cp_mock.call_args[0][1:] self.assertListEqual( [actual_ctera_path.absolute for actual_ctera_path in actual_ctera_paths], diff --git a/tests/ut/core/user/test_copy.py b/tests/ut/core/user/test_copy.py index 6e58c8d8..d3d48528 100644 --- a/tests/ut/core/user/test_copy.py +++ b/tests/ut/core/user/test_copy.py @@ -11,15 +11,15 @@ def setUp(self): self._source = 'My Files/Documents/' + self._filename self._dest = 'My Files/Reports' - def test_copy(self): - execute_response = 'Success' + def test_copy_no_wait(self): + execute_response = self._task_reference self._init_services(execute_response=execute_response) - ret = self._services.files.copy(self._source, destination=self._dest) + ret = self._services.files.copy(self._source, destination=self._dest, wait=False) self._services.api.execute.assert_called_once_with('', 'copyResources', mock.ANY) expected_param = self._create_copy_resource_param() actual_param = self._services.api.execute.call_args[0][2] self._assert_equal_objects(actual_param, expected_param) - self.assertEqual(ret, execute_response) + self.assertEqual(ret.ref, execute_response) def _create_copy_resource_param(self): destinations = [base_user.BaseCoreServicesTest.encode_path(self._dest + '/' + self._filename)] diff --git a/tests/ut/core/user/test_delete.py b/tests/ut/core/user/test_delete.py index 1fcdaf9b..7dc6307d 100644 --- a/tests/ut/core/user/test_delete.py +++ b/tests/ut/core/user/test_delete.py @@ -9,15 +9,15 @@ def setUp(self): super().setUp() self._path = 'My Files/Documents' - def test_delete(self): - execute_response = 'Success' + def test_delete_no_wait(self): + execute_response = self._task_reference self._init_services(execute_response=execute_response) - ret = self._services.files.delete(self._path) + ret = self._services.files.delete(self._path, wait=False) self._services.api.execute.assert_called_once_with('', 'deleteResources', mock.ANY) expected_param = self._create_delete_resource_param() actual_param = self._services.api.execute.call_args[0][2] self._assert_equal_objects(actual_param, expected_param) - self.assertEqual(ret, execute_response) + self.assertEqual(ret.ref, execute_response) def _create_delete_resource_param(self): return self._create_action_resource_param([base_user.BaseCoreServicesTest.encode_path(self._path)]) diff --git a/tests/ut/core/user/test_listdir.py b/tests/ut/core/user/test_listdir.py index c1ad9c11..0ced034a 100644 --- a/tests/ut/core/user/test_listdir.py +++ b/tests/ut/core/user/test_listdir.py @@ -55,6 +55,7 @@ def _fetch_resources_side_effect(path, name, param): def _fetch_resources_response(response, files): for file in files: resource_info = BaseCoreServicesFilesList._create_resource_info(file) + response.errorType = None response.items.append(resource_info) return response diff --git a/tests/ut/core/user/test_move.py b/tests/ut/core/user/test_move.py index 2a1167ee..2ae91115 100644 --- a/tests/ut/core/user/test_move.py +++ b/tests/ut/core/user/test_move.py @@ -11,15 +11,15 @@ def setUp(self): self._source = 'My Files/Documents/' + self._filename self._dest = 'My Files/Reports' - def test_move(self): - execute_response = 'Success' + def test_move_no_wait(self): + execute_response = self._task_reference self._init_services(execute_response=execute_response) - ret = self._services.files.move(self._source, destination=self._dest) + ret = self._services.files.move(self._source, destination=self._dest, wait=False) self._services.api.execute.assert_called_once_with('', 'moveResources', mock.ANY) expected_param = self._create_move_resource_param() actual_param = self._services.api.execute.call_args[0][2] self._assert_equal_objects(actual_param, expected_param) - self.assertEqual(ret, execute_response) + self.assertEqual(ret.ref, execute_response) def _create_move_resource_param(self): destinations = [base_user.BaseCoreServicesTest.encode_path(self._dest + '/' + self._filename)] diff --git a/tests/ut/core/user/test_rename.py b/tests/ut/core/user/test_rename.py index 54e19523..99f79390 100644 --- a/tests/ut/core/user/test_rename.py +++ b/tests/ut/core/user/test_rename.py @@ -12,14 +12,14 @@ def setUp(self): self._parent_directory = 'My Files/Documents' def test_rename(self): - execute_response = 'Success' + execute_response = self._task_reference self._init_services(execute_response=execute_response) - ret = self._services.files.rename(self._parent_directory + '/' + self._current_filename, self._new_filename) + ret = self._services.files.rename(f'{self._parent_directory}/{self._current_filename}', self._new_filename, wait=False) self._services.api.execute.assert_called_once_with('', 'moveResources', mock.ANY) expected_param = self._create_rename_resource_param() actual_param = self._services.api.execute.call_args[0][2] self._assert_equal_objects(actual_param, expected_param) - self.assertEqual(ret, execute_response) + self.assertEqual(ret.ref, execute_response) def _create_rename_resource_param(self): sources = [base_user.BaseCoreServicesTest.encode_path(self._parent_directory + '/' + self._current_filename)] diff --git a/tests/ut/core/user/test_undelete.py b/tests/ut/core/user/test_undelete.py index 3399712d..2ee83651 100644 --- a/tests/ut/core/user/test_undelete.py +++ b/tests/ut/core/user/test_undelete.py @@ -10,14 +10,14 @@ def setUp(self): self._path = 'My Files/Documents' def test_undelete(self): - execute_response = 'Success' + execute_response = self._task_reference self._init_services(execute_response=execute_response) - ret = self._services.files.undelete(self._path) + ret = self._services.files.undelete(self._path, wait=False) self._services.api.execute.assert_called_once_with('', 'restoreResources', mock.ANY) expected_param = self._create_undelete_resource_param() actual_param = self._services.api.execute.call_args[0][2] self._assert_equal_objects(actual_param, expected_param) - self.assertEqual(ret, execute_response) + self.assertEqual(ret.ref, execute_response) def _create_undelete_resource_param(self): return self._create_action_resource_param([base_user.BaseCoreServicesTest.encode_path(self._path)]) diff --git a/tests/ut/edge/test_dedup.py b/tests/ut/edge/test_dedup.py index 500eb5dd..b100682a 100644 --- a/tests/ut/edge/test_dedup.py +++ b/tests/ut/edge/test_dedup.py @@ -40,6 +40,7 @@ def test_status(self): self._init_filer() self._filer.api.execute = mock.MagicMock(side_effect=self._execute_side_effect) ret = dedup.Dedup(self._filer).status() + self._filer.api.get.assert_called_once_with('/config/dedup/useLocalMapFileDedup') self._filer.api.execute.assert_has_calls([ mock.call('/config/cloudsync/cloudExtender', 'allFilesTotalUsedBytes'), mock.call('/config/cloudsync/cloudExtender', 'storageUsedBytes') diff --git a/tests/ut/edge/test_directory_service.py b/tests/ut/edge/test_directory_service.py index 1a145e3b..b5d87a4e 100644 --- a/tests/ut/edge/test_directory_service.py +++ b/tests/ut/edge/test_directory_service.py @@ -4,7 +4,7 @@ from cterasdk.edge import directoryservice from cterasdk.edge.types import TCPService, TCPConnectResult from cterasdk.common import Object -from cterasdk.lib import task_manager_base +from cterasdk.exceptions.common import TaskException from tests.ut.edge import base_edge @@ -99,7 +99,7 @@ def test_connect_join_failure(self): get_response_side_effect = TestEdgeDirectoryService._get_response_side_effect(self._get_workgroup_param(), None) self._filer.api.get = mock.MagicMock(side_effect=get_response_side_effect) self._filer.network.tcp_connect = mock.MagicMock(return_value=TCPConnectResult(self._domain, self._ldap_port, True)) - self._filer.api.execute = mock.MagicMock(side_effect=task_manager_base.TaskError(self._task_id)) + self._filer.api.execute = mock.MagicMock(side_effect=TaskException('Task failed', self._task_id)) with self.assertRaises(exceptions.CTERAException): directoryservice.DirectoryService(self._filer).connect(self._domain, self._username, self._password, check_connection=True) diff --git a/tests/ut/edge/test_migration_tool.py b/tests/ut/edge/test_migration_tool.py index d34e863c..2ced14b3 100644 --- a/tests/ut/edge/test_migration_tool.py +++ b/tests/ut/edge/test_migration_tool.py @@ -87,13 +87,6 @@ def test_stop(self): self._assert_equal_objects(actual_param, munch.Munch(task_id=self._task_id)) self.assertEqual(ret, 'Success') - def test_details(self): - self._init_ctera_migrate(get_response=munch.Munch(dict(history=self._jobs))) - jobs = ctera_migrate.CTERAMigrate(self._filer).details(munch.Munch(id=self._task_id)) - self._filer.migrate.get.assert_called_once_with('/tasks/history', params={'id': self._task_id}) # pylint: disable=protected-access - self.assertEqual(jobs.all, self._jobs) - self.assertEqual(jobs.latest, self._jobs[0]) - def test_details_not_found(self): self._init_ctera_migrate(get_response=munch.Munch(dict(history=None))) ctera_migrate.CTERAMigrate(self._filer).details(munch.Munch(id=self._task_id)) @@ -101,16 +94,14 @@ def test_details_not_found(self): def test_results(self): self._init_ctera_migrate(get_response=munch.Munch(dict(discovery='discovery', migration='migration'))) - for i in ['discovery', 'migration', 'other']: - ret = ctera_migrate.CTERAMigrate(self._filer).results(munch.Munch(id=i, type=i, name='task')) + for i, j in [('discovery', 1), ('migration', 2)]: + ret = ctera_migrate.CTERAMigrate(self._filer).results(munch.Munch(type=i), munch.Munch(job_id=j)) if i == 'discovery': - self._filer.migrate.get.assert_called_with('/discovery/results', params={'id': i}) # pylint: disable=protected-access + self._filer.migrate.get.assert_called_with('/discovery/results', params={'id': j}) # pylint: disable=protected-access self.assertEqual(ret, 'discovery') elif i == 'migration': - self._filer.migrate.get.assert_called_with('/migration/results', params={'id': i}) # pylint: disable=protected-access + self._filer.migrate.get.assert_called_with('/migration/results', params={'id': j}) # pylint: disable=protected-access self.assertEqual(ret, 'migration') - else: - self.assertEqual(ret, None) def test_list_discovery_tasks(self): tasks = munch.Munch(dict(discovery=TestMigrationTool._create_discovery_task_object(), diff --git a/tests/ut/edge/test_network.py b/tests/ut/edge/test_network.py index b5e8a37d..b29b9236 100644 --- a/tests/ut/edge/test_network.py +++ b/tests/ut/edge/test_network.py @@ -3,9 +3,9 @@ from cterasdk.edge import network from cterasdk.edge.types import TCPService, TCPConnectResult -from cterasdk.lib import task_manager_base from cterasdk.edge.enum import Mode, IPProtocol, Traffic from cterasdk.common import Object +from cterasdk.exceptions.common import TaskException from cterasdk import exceptions from tests.ut.edge import base_edge @@ -199,7 +199,7 @@ def _get_iperf_param(self, port=5201, threads=1, protocol=IPProtocol.TCP, direct def test_tcp_connect_task_error(self): execute_response = self._task_id self._init_filer(execute_response=execute_response) - self._filer.tasks.wait = mock.MagicMock(side_effect=task_manager_base.TaskError(self._task_id)) + self._filer.tasks.wait = mock.MagicMock(side_effect=TaskException('Task failed', self._task_id)) ret = network.Network(self._filer).tcp_connect(TCPService(self._tcp_connect_address, self._tcp_connect_port)) diff --git a/tests/ut/edge/test_services.py b/tests/ut/edge/test_services.py index 938427f8..2dfd3212 100644 --- a/tests/ut/edge/test_services.py +++ b/tests/ut/edge/test_services.py @@ -1,11 +1,11 @@ from unittest import mock from cterasdk import exceptions -from cterasdk.lib import task_manager_base from cterasdk.edge import services from cterasdk.edge.enum import ServicesConnectionState from cterasdk.edge.types import TCPService, TCPConnectResult from cterasdk.common import Object +from cterasdk.exceptions.common import TaskException from tests.ut.edge import base_edge @@ -198,7 +198,7 @@ def _get_services_status_response(self): @staticmethod def _get_task_error(): - error = task_manager_base.TaskError(TestEdgeServices._background_task_id) + error = TaskException('Task failed', TestEdgeServices._background_task_id) error.task = Object() error.task.description = 'Reason for Failure' return error diff --git a/tests/ut/edge/test_shell.py b/tests/ut/edge/test_shell.py index aabd76ae..f43a82ec 100644 --- a/tests/ut/edge/test_shell.py +++ b/tests/ut/edge/test_shell.py @@ -2,8 +2,8 @@ from cterasdk import exceptions from cterasdk.edge import shell -from cterasdk.lib import task_manager_base from cterasdk.common import Object +from cterasdk.exceptions.common import TaskException from tests.ut.edge import base_edge @@ -26,11 +26,11 @@ def test_run_shell_command(self): def test_run_shell_command_task_error(self): execute_response = self._task_id self._init_filer(execute_response=execute_response) - self._filer.tasks.wait = mock.MagicMock(side_effect=task_manager_base.TaskError(self._task_id)) + self._filer.tasks.wait = mock.MagicMock(side_effect=TaskException('Task failed', self._task_id)) with self.assertRaises(exceptions.CTERAException) as error: shell.Shell(self._filer).run_command(self._shell_command) self._filer.tasks.wait.assert_called_once_with(self._task_id) - self.assertEqual('An error occurred while executing task', str(error.exception)) + self.assertEqual('An error occurred while executing Shell command.', str(error.exception)) def _get_task_manager_result_object(self): task_param = Object() diff --git a/tests/ut/edge/test_sync.py b/tests/ut/edge/test_sync.py index 388c4c9d..1efd0c11 100644 --- a/tests/ut/edge/test_sync.py +++ b/tests/ut/edge/test_sync.py @@ -95,19 +95,19 @@ def test_evict_wait(self): self._filer.api.execute.assert_called_once_with('/config/cloudsync', 'evictFolder', mock.ANY) self._filer.api.get.assert_called_once_with(execute_response) actual_param = self._filer.api.execute.call_args[0][2] - expected_param = munch.Munch(dict(path=self._path)) + expected_param = munch.Munch(dict(_classname='evictFolderParam', path=self._path)) self._assert_equal_objects(actual_param, expected_param) - self.assertEqual(ret, execute_response) + self.assertEqual(ret, get_response) def test_evict_no_wait(self): - execute_response = 'Success' + execute_response = '/proc/bgtasks/6192' self._init_filer(execute_response=execute_response) ret = sync.Sync(self._filer).evict(self._path) self._filer.api.execute.assert_called_once_with('/config/cloudsync', 'evictFolder', mock.ANY) actual_param = self._filer.api.execute.call_args[0][2] - expected_param = munch.Munch(dict(path=self._path)) + expected_param = munch.Munch(dict(_classname='evictFolderParam', path=self._path)) self._assert_equal_objects(actual_param, expected_param) - self.assertEqual(ret, execute_response) + self.assertEqual(ret.ref, execute_response) def test_get_linux_avoid_using_fanotify(self): for avoid in [True, False]: