Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions codalab/lib/bundle_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,8 @@ def do_download_command(self, args):
default_client, default_worksheet_uuid, args.target_spec
)

print(default_worksheet_uuid)

# Figure out where to download.
info = client.fetch('bundles', target.bundle_uuid)
if args.output_path:
Expand All @@ -1516,6 +1518,8 @@ def do_download_command(self, args):
if target.subpath == ''
else os.path.basename(target.subpath)
)

print(info)
final_path = os.path.join(os.getcwd(), local_path)
if os.path.exists(final_path):
if args.force:
Expand Down
72 changes: 57 additions & 15 deletions codalab/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from codalab.common import (
StorageType,
StorageURLScheme,
parse_linked_bundle_url
)
from codalab.lib.print_util import FileTransferProgress
from codalab.lib import (
Expand Down Expand Up @@ -70,6 +71,7 @@ class MigrationStatus(str, Enum):
"""
NOT_STARTED = "NOT_STARTED"
UPLOADED_TO_AZURE = "UPLOADED_TO_AZURE"
UPLOADED_NOT_INDEXED = "UPLOADED_NOT_INDEXED"
CHANGED_DB = "CHANGED_DB"
FINISHED = "FINISHED" # Meaning it is uploaded to Azure, DB updated, and deleted from disk.

Expand Down Expand Up @@ -163,7 +165,7 @@ def __init__(self, target_store_name, change_db, delete, proc_id, sanity_check_n
self.bundle_migration_statuses = list()

if os.path.exists(self.get_bundle_statuses_path()):
self.existing_bundle_migration_statuses = pd.read_csv(self.get_bundle_statuses_path())
self.existing_bundle_migration_statuses = pd.read_csv(self.get_bundle_statuses_path(), dtype={'uuid': object, 'status': object, 'error_message': object})
else:
self.existing_bundle_migration_statuses = None

Expand Down Expand Up @@ -340,6 +342,9 @@ def sanity_check(self, bundle_uuid, bundle_location, bundle_info, is_dir, new_lo

success, reason = None, None

# print(bundle_location, new_location)
# return True, None

if is_dir:
# For dirs, check the folder contains same files
with OpenFile(new_location, gzipped=True) as f:
Expand Down Expand Up @@ -477,15 +482,19 @@ def migrate_bundle(self, bundle_uuid):
self.logger.info("Getting Bundle info")
bundle = self.get_bundle(bundle_uuid)
bundle_location = self.get_bundle_location(bundle_uuid)
print(self.bundle_manager._model.get_bundle_storage_info(bundle_uuid))
print(bundle_location)

# This is for handling cases where rm -d was run on the bundle
is_bundle_rm = False
bundle_info = None

try:
bundle_info = self.get_bundle_info(bundle_uuid, bundle_location)
except Exception as e:
# print(e)
if "Path ''" in str(e):
for i in range(0, 10):
for i in range(0, 11):
try:
bundle_info = self.get_bundle_info(bundle_uuid, f'/home/azureuser/codalab-worksheets/var/codalab/home/partitions/codalab{i}/bundles/{bundle_uuid}')
bundle_location = f'/home/azureuser/codalab-worksheets/var/codalab/home/partitions/codalab{i}/bundles/{bundle_uuid}'
Expand All @@ -498,10 +507,26 @@ def migrate_bundle(self, bundle_uuid):
is_bundle_rm = True
is_dir = False
bundle_migration_status.status = MigrationStatus.NOT_STARTED
bundle_migration_status.error_message = None
bundle_migration_status.error_message = ''
else:
raise e

pass

# if not bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# self.bundle_manager._model.update_bundle(
# bundle,
# {
# 'storage_type': StorageType.DISK_STORAGE.value,
# 'is_dir': bundle_info['type'] == 'directory',
# }
# )
# self.bundle_manager._model.remove_bundle_location(bundle_uuid, self.target_store_uuid)
# bundle_migration_status.status = MigrationStatus.UPLOADED_NOT_INDEXED
# bundle_migration_status.error_message = ''
# print(self.bundle_manager._model.get_bundle_locations(bundle_uuid))

# print(bundle_info, bundle_location, is_bundle_rm)
# print(bundle_migration_status)

# Normal Migration
if not is_bundle_rm:
is_dir = bundle_info['type'] == 'directory'
Expand All @@ -521,18 +546,21 @@ def migrate_bundle(self, bundle_uuid):
# if db already changed
# TODO: Check if bundle_location is azure (see other places in code base.)
if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
bundle_migration_status.error_message = ''
return
elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
bundle_migration_status.status = MigrationStatus.CHANGED_DB
elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check(
bundle_uuid, bundle_location, bundle_info, is_dir, target_location
)[0]):
bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
bundle_migration_status.error_message = None
# elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# bundle_migration_status.status = MigrationStatus.CHANGED_DB
# bundle_migration_status.error_message = ''
# elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check(
# bundle_uuid, bundle_location, bundle_info, is_dir, target_location
# )[0]):
# bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
# bundle_migration_status.error_message = ''

# Upload to Azure.
# print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location))
if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location):
print("UPLOADING")
self.logger.info("Uploading to Azure")
start_time = time.time()
self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir)
Expand All @@ -547,21 +575,27 @@ def migrate_bundle(self, bundle_uuid):

# Change bundle metadata in database to point to the Azure Blob location (not disk)
if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message:
print("Changing DB")
self.logger.info("Changing DB")
start_time = time.time()
self.modify_bundle_data(bundle, bundle_uuid, is_dir)
self.times["modify_bundle_data"].append(time.time() - start_time)
bundle_migration_status.status = MigrationStatus.CHANGED_DB
bundle_migration_status.error_message = None

# Delete the bundle from disk.
if self.delete and not is_bundle_rm:
self.logger.info("Deleting from disk")
start_time = time.time()
if os.path.lexists(disk_location):
# Delete it.
path_util.remove(disk_bundle_location)
delete_status = self.delete_original_bundle(bundle_uuid)
if not delete_status:
self.logger.info(f"Bundle location not on disk or bundle already deleted for bundle {bundle_uuid}")
# if os.path.lexists(disk_location):
# # Delete it.
# path_util.remove(disk_bundle_location)
self.times["delete_original_bundle"].append(time.time() - start_time)
bundle_migration_status.status = MigrationStatus.FINISHED
bundle_migration_status.error_message = None

self.times["migrate_bundle"].append(time.time() - total_start_time)

Expand Down Expand Up @@ -590,6 +624,7 @@ def log_times(self):

def write_bundle_statuses(self):
new_records = pd.DataFrame.from_records([b_m_s.to_dict() for b_m_s in self.bundle_migration_statuses])
new_records = new_records.astype({'uuid': object, 'status': object, 'error_message': object})
if self.existing_bundle_migration_statuses is None:
self.existing_bundle_migration_statuses = new_records
else:
Expand All @@ -601,6 +636,7 @@ def write_bundle_statuses(self):
def migrate_bundles(self, bundle_uuids, log_interval=100):
total = len(bundle_uuids)
for i, uuid in enumerate(bundle_uuids):
print(f"Migrating bundle {i} out of {total}")
self.logger.info(f"migrating {uuid}")
self.migrate_bundle(uuid)
self.logger.info("status: %d / %d", i, total)
Expand Down Expand Up @@ -630,6 +666,9 @@ def job(target_store_name, change_db, delete, worksheet, bundle_uuids, max_resul
bundle_uuids_df.to_csv(migration.get_bundle_ids_path(), index=False, mode='w')
print(f"[migration] Recorded all bundle ids to be migrated")

# data = pd.read_csv(bundle_uuids)
# bundle_uuids = data["0"].tolist()[100001:]

# Sort according to what process you are.
chunk_size = len(bundle_uuids) // num_processes
start_idx = chunk_size * proc_id
Expand Down Expand Up @@ -657,6 +696,9 @@ def job(target_store_name, change_db, delete, worksheet, bundle_uuids, max_resul
parser.add_argument(
'-u', '--bundle-uuids', type=str, nargs='*', default=None, help='List of bundle UUIDs to migrate.'
)
# parser.add_argument(
# '-u', '--bundle-uuids', type=str, help='List of bundle UUIDs to migrate.'
# )
parser.add_argument(
'-t',
'--target_store_name',
Expand Down
11 changes: 11 additions & 0 deletions codalab/model/bundle_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3175,6 +3175,17 @@ def add_bundle_location(self, bundle_uuid: str, bundle_store_uuid: str) -> None:
}
connection.execute(cl_bundle_location.insert().values(bundle_location_value))

def remove_bundle_location(self, bundle_uuid: str, bundle_store_uuid: str) -> None:
"""
Removes a new bundle location to the specified bundle.

Args:
bundle_uuid (str): The uuid for the bundle which we want to remove a BundleLocation to.
bundle_store_uuid (str): The uuid for the bundle store we are associating with the new BundleLocation.
"""
with self.engine.begin() as connection:
connection.execute(cl_bundle_location.delete().where((cl_bundle_location.c.bundle_uuid == bundle_uuid) & (cl_bundle_location.c.bundle_store_uuid == bundle_store_uuid)))

def get_bundle_location(self, bundle_uuid: str, bundle_store_uuid: str) -> dict:
"""
Returns data about the location associated with the specified bundle and bundle store.
Expand Down