diff --git a/codalab/lib/bundle_cli.py b/codalab/lib/bundle_cli.py index 3a5a93f6a..ac205bc05 100644 --- a/codalab/lib/bundle_cli.py +++ b/codalab/lib/bundle_cli.py @@ -1506,6 +1506,8 @@ def do_download_command(self, args): default_client, default_worksheet_uuid, args.target_spec ) + print(default_worksheet_uuid) + # Figure out where to download. info = client.fetch('bundles', target.bundle_uuid) if args.output_path: @@ -1516,6 +1518,8 @@ def do_download_command(self, args): if target.subpath == '' else os.path.basename(target.subpath) ) + + print(info) final_path = os.path.join(os.getcwd(), local_path) if os.path.exists(final_path): if args.force: diff --git a/codalab/migration.py b/codalab/migration.py index 86b30ea66..86a2261c1 100644 --- a/codalab/migration.py +++ b/codalab/migration.py @@ -38,6 +38,7 @@ from codalab.common import ( StorageType, StorageURLScheme, + parse_linked_bundle_url ) from codalab.lib.print_util import FileTransferProgress from codalab.lib import ( @@ -70,6 +71,7 @@ class MigrationStatus(str, Enum): """ NOT_STARTED = "NOT_STARTED" UPLOADED_TO_AZURE = "UPLOADED_TO_AZURE" + UPLOADED_NOT_INDEXED = "UPLOADED_NOT_INDEXED" CHANGED_DB = "CHANGED_DB" FINISHED = "FINISHED" # Meaning it is uploaded to Azure, DB updated, and deleted from disk. @@ -163,7 +165,7 @@ def __init__(self, target_store_name, change_db, delete, proc_id, sanity_check_n self.bundle_migration_statuses = list() if os.path.exists(self.get_bundle_statuses_path()): - self.existing_bundle_migration_statuses = pd.read_csv(self.get_bundle_statuses_path()) + self.existing_bundle_migration_statuses = pd.read_csv(self.get_bundle_statuses_path(), dtype={'uuid': object, 'status': object, 'error_message': object}) else: self.existing_bundle_migration_statuses = None @@ -340,6 +342,9 @@ def sanity_check(self, bundle_uuid, bundle_location, bundle_info, is_dir, new_lo success, reason = None, None + # print(bundle_location, new_location) + # return True, None + if is_dir: # For dirs, check the folder contains same files with OpenFile(new_location, gzipped=True) as f: @@ -477,15 +482,19 @@ def migrate_bundle(self, bundle_uuid): self.logger.info("Getting Bundle info") bundle = self.get_bundle(bundle_uuid) bundle_location = self.get_bundle_location(bundle_uuid) + print(self.bundle_manager._model.get_bundle_storage_info(bundle_uuid)) + print(bundle_location) # This is for handling cases where rm -d was run on the bundle is_bundle_rm = False bundle_info = None + try: bundle_info = self.get_bundle_info(bundle_uuid, bundle_location) except Exception as e: + # print(e) if "Path ''" in str(e): - for i in range(0, 10): + for i in range(0, 11): try: bundle_info = self.get_bundle_info(bundle_uuid, f'/home/azureuser/codalab-worksheets/var/codalab/home/partitions/codalab{i}/bundles/{bundle_uuid}') bundle_location = f'/home/azureuser/codalab-worksheets/var/codalab/home/partitions/codalab{i}/bundles/{bundle_uuid}' @@ -498,10 +507,26 @@ def migrate_bundle(self, bundle_uuid): is_bundle_rm = True is_dir = False bundle_migration_status.status = MigrationStatus.NOT_STARTED - bundle_migration_status.error_message = None + bundle_migration_status.error_message = '' else: - raise e - + pass + + # if not bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): + # self.bundle_manager._model.update_bundle( + # bundle, + # { + # 'storage_type': StorageType.DISK_STORAGE.value, + # 'is_dir': bundle_info['type'] == 'directory', + # } + # ) + # self.bundle_manager._model.remove_bundle_location(bundle_uuid, self.target_store_uuid) + # bundle_migration_status.status = MigrationStatus.UPLOADED_NOT_INDEXED + # bundle_migration_status.error_message = '' + # print(self.bundle_manager._model.get_bundle_locations(bundle_uuid)) + + # print(bundle_info, bundle_location, is_bundle_rm) + # print(bundle_migration_status) + # Normal Migration if not is_bundle_rm: is_dir = bundle_info['type'] == 'directory' @@ -521,18 +546,21 @@ def migrate_bundle(self, bundle_uuid): # if db already changed # TODO: Check if bundle_location is azure (see other places in code base.) if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): + bundle_migration_status.error_message = '' return - elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): - bundle_migration_status.status = MigrationStatus.CHANGED_DB - elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check( - bundle_uuid, bundle_location, bundle_info, is_dir, target_location - )[0]): - bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE - bundle_migration_status.error_message = None + # elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): + # bundle_migration_status.status = MigrationStatus.CHANGED_DB + # bundle_migration_status.error_message = '' + # elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check( + # bundle_uuid, bundle_location, bundle_info, is_dir, target_location + # )[0]): + # bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE + # bundle_migration_status.error_message = '' # Upload to Azure. # print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location)) if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location): + print("UPLOADING") self.logger.info("Uploading to Azure") start_time = time.time() self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir) @@ -547,21 +575,27 @@ def migrate_bundle(self, bundle_uuid): # Change bundle metadata in database to point to the Azure Blob location (not disk) if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message: + print("Changing DB") self.logger.info("Changing DB") start_time = time.time() self.modify_bundle_data(bundle, bundle_uuid, is_dir) self.times["modify_bundle_data"].append(time.time() - start_time) bundle_migration_status.status = MigrationStatus.CHANGED_DB + bundle_migration_status.error_message = None # Delete the bundle from disk. if self.delete and not is_bundle_rm: self.logger.info("Deleting from disk") start_time = time.time() - if os.path.lexists(disk_location): - # Delete it. - path_util.remove(disk_bundle_location) + delete_status = self.delete_original_bundle(bundle_uuid) + if not delete_status: + self.logger.info(f"Bundle location not on disk or bundle already deleted for bundle {bundle_uuid}") + # if os.path.lexists(disk_location): + # # Delete it. + # path_util.remove(disk_bundle_location) self.times["delete_original_bundle"].append(time.time() - start_time) bundle_migration_status.status = MigrationStatus.FINISHED + bundle_migration_status.error_message = None self.times["migrate_bundle"].append(time.time() - total_start_time) @@ -590,6 +624,7 @@ def log_times(self): def write_bundle_statuses(self): new_records = pd.DataFrame.from_records([b_m_s.to_dict() for b_m_s in self.bundle_migration_statuses]) + new_records = new_records.astype({'uuid': object, 'status': object, 'error_message': object}) if self.existing_bundle_migration_statuses is None: self.existing_bundle_migration_statuses = new_records else: @@ -601,6 +636,7 @@ def write_bundle_statuses(self): def migrate_bundles(self, bundle_uuids, log_interval=100): total = len(bundle_uuids) for i, uuid in enumerate(bundle_uuids): + print(f"Migrating bundle {i} out of {total}") self.logger.info(f"migrating {uuid}") self.migrate_bundle(uuid) self.logger.info("status: %d / %d", i, total) @@ -630,6 +666,9 @@ def job(target_store_name, change_db, delete, worksheet, bundle_uuids, max_resul bundle_uuids_df.to_csv(migration.get_bundle_ids_path(), index=False, mode='w') print(f"[migration] Recorded all bundle ids to be migrated") + # data = pd.read_csv(bundle_uuids) + # bundle_uuids = data["0"].tolist()[100001:] + # Sort according to what process you are. chunk_size = len(bundle_uuids) // num_processes start_idx = chunk_size * proc_id @@ -657,6 +696,9 @@ def job(target_store_name, change_db, delete, worksheet, bundle_uuids, max_resul parser.add_argument( '-u', '--bundle-uuids', type=str, nargs='*', default=None, help='List of bundle UUIDs to migrate.' ) + # parser.add_argument( + # '-u', '--bundle-uuids', type=str, help='List of bundle UUIDs to migrate.' + # ) parser.add_argument( '-t', '--target_store_name', diff --git a/codalab/model/bundle_model.py b/codalab/model/bundle_model.py index a8265614a..5dc539be7 100644 --- a/codalab/model/bundle_model.py +++ b/codalab/model/bundle_model.py @@ -3175,6 +3175,17 @@ def add_bundle_location(self, bundle_uuid: str, bundle_store_uuid: str) -> None: } connection.execute(cl_bundle_location.insert().values(bundle_location_value)) + def remove_bundle_location(self, bundle_uuid: str, bundle_store_uuid: str) -> None: + """ + Removes a new bundle location to the specified bundle. + + Args: + bundle_uuid (str): The uuid for the bundle which we want to remove a BundleLocation to. + bundle_store_uuid (str): The uuid for the bundle store we are associating with the new BundleLocation. + """ + with self.engine.begin() as connection: + connection.execute(cl_bundle_location.delete().where((cl_bundle_location.c.bundle_uuid == bundle_uuid) & (cl_bundle_location.c.bundle_store_uuid == bundle_store_uuid))) + def get_bundle_location(self, bundle_uuid: str, bundle_store_uuid: str) -> dict: """ Returns data about the location associated with the specified bundle and bundle store.