Skip to content

Commit 8c88d02

Browse files
committed
Update
1 parent 94f5023 commit 8c88d02

3 files changed

Lines changed: 171 additions & 99 deletions

File tree

lithops/standalone/backends/gcp_compute_engine/gcp_compute_engine.py

Lines changed: 114 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -398,21 +398,15 @@ def _instance_exists(self, instance_name):
398398
)
399399

400400
def _create_master_instance(self):
401+
"""
402+
Creates the master VM instance
403+
"""
401404
name = self.config.get('instance_name') or f'lithops-master-{self.network_key}'
402-
self.master = GCPComputeEngineInstance(self.config, self.compute_client, public=True)
403-
self.master.name = name
405+
self.master = GCPComputeEngineInstance(
406+
name, self.config, self.compute_client, public=True
407+
)
404408
self.master.instance_type = self.config['master_instance_type']
405409
self.master.delete_on_dismantle = False
406-
407-
if self._instance_exists(name):
408-
logger.debug(f'Using existing master VM {name}')
409-
self.master.get_instance_data()
410-
if self.master.get_status() == 'TERMINATED':
411-
logger.debug(f'Master VM {name} is stopped, starting')
412-
self.master.start()
413-
elif self.mode != StandaloneMode.CONSUME.value:
414-
logger.debug(f'Creating new VM instance {name}')
415-
self.master.create(public=True)
416410
self.master.get_instance_data()
417411

418412
def _create_compute_client(self):
@@ -439,12 +433,18 @@ def init(self):
439433
self._load_gce_data()
440434

441435
if self.mode == StandaloneMode.CONSUME.value:
436+
instance_name = self.config['instance_name']
437+
if not self.gce_data or instance_name != self.gce_data.get('master_name'):
438+
self.gce_data = {
439+
'mode': self.mode,
440+
'vpc_data_type': 'provided',
441+
'ssh_data_type': 'provided',
442+
'master_name': instance_name,
443+
'master_id': instance_name,
444+
}
445+
446+
self.config['instance_name'] = self.gce_data['master_name']
442447
self._create_master_instance()
443-
self.gce_data = {
444-
'mode': self.mode,
445-
'master_name': self.master.name,
446-
'master_id': self.master.get_instance_id()
447-
}
448448
self._dump_gce_data()
449449
return
450450

@@ -460,7 +460,7 @@ def init(self):
460460
'vpc_data_type': self.vpc_data_type,
461461
'ssh_data_type': self.ssh_data_type,
462462
'master_name': self.master.name,
463-
'master_id': self.master.get_instance_id(),
463+
'master_id': self.network_key,
464464
'network_name': self.config['network_name'],
465465
'network_key': self.network_key,
466466
'subnet_name': self.config['subnet_name'],
@@ -499,8 +499,7 @@ def _get_project_image(self, image_name):
499499

500500
def _request_source_image(self):
501501
"""
502-
Use a pre-built Lithops custom image when available; otherwise keep the
503-
configured Ubuntu base image.
502+
Requests the default image if not provided
504503
"""
505504
if not self._is_default_ubuntu_source_image(self.config.get('source_image')):
506505
return
@@ -516,6 +515,9 @@ def _request_source_image(self):
516515
self.config['source_image'] = image_ref
517516
return
518517

518+
if 'source_image' not in self.config:
519+
self.config['source_image'] = DEFAULT_UBUNTU_SOURCE_IMAGE
520+
519521
def _get_boot_disk_source(self, instance_data):
520522
for disk in instance_data.get('disks', []):
521523
if disk.get('boot'):
@@ -544,7 +546,7 @@ def _wait_image_ready(self, image_name, timeout=600):
544546

545547
def build_image(self, image_name, script_file, overwrite, include, extra_args=[]):
546548
"""
547-
Builds a custom GCE image with Lithops dependencies pre-installed.
549+
Builds a new VM Image
548550
"""
549551
image_name = image_name or DEFAULT_LITHOPS_IMAGE_NAME
550552

@@ -554,8 +556,8 @@ def build_image(self, image_name, script_file, overwrite, include, extra_args=[]
554556
else:
555557
image_ref = self._project_image_ref(image_name)
556558
raise Exception(
557-
f"The image with name '{image_name}' already exists "
558-
f"({image_ref}). Use '--overwrite' or '-o' to replace it"
559+
f"The image with name '{image_name}' already exists with ID: "
560+
f"'{image_ref}'. Use '--overwrite' or '-o' if you want to overwrite it"
559561
)
560562

561563
is_initialized = self.is_initialized()
@@ -570,52 +572,42 @@ def build_image(self, image_name, script_file, overwrite, include, extra_args=[]
570572
except Exception:
571573
pass
572574

573-
self.config['source_image'] = DEFAULT_UBUNTU_SOURCE_IMAGE
574-
575-
build_name = re.sub(
576-
r'[^a-z0-9-]',
577-
'-',
578-
f'building-image-{image_name}'.lower()
579-
)[:63].rstrip('-')
575+
self._request_source_image()
580576

581-
build_vm = GCPComputeEngineInstance(self.config, self.compute_client, public=True)
582-
build_vm.name = build_name
577+
build_vm = GCPComputeEngineInstance(
578+
'building-image-' + image_name, self.config, self.compute_client, public=True
579+
)
583580
build_vm.instance_type = self.config['master_instance_type']
584581
build_vm.delete_on_dismantle = False
585582
build_vm.create(public=True)
586583
build_vm.wait_ready()
587584

588-
logger.debug(f'Uploading installation script to {build_vm}')
589-
remote_script = '/tmp/install_lithops.sh'
585+
logger.debug(f"Uploading installation script to {build_vm}")
586+
remote_script = "/tmp/install_lithops.sh"
590587
script = get_host_setup_script(lithops_pip_spec='lithops[gcp,redis]')
591588
build_vm.get_ssh_client().upload_data_to_file(script, remote_script)
592-
logger.debug(
593-
'Executing Lithops installation script. Be patient, this can take up to 3 minutes'
594-
)
589+
logger.debug("Executing Lithops installation script. Be patient, this process can take up to 3 minutes")
595590
build_vm.get_ssh_client().run_remote_command(
596-
f'chmod 777 {remote_script}; sudo {remote_script}; rm {remote_script};'
591+
f"chmod 777 {remote_script}; sudo {remote_script}; rm {remote_script};"
597592
)
598-
logger.debug('Lithops installation script finished')
593+
logger.debug("Lithops installation script finsihed")
599594

600595
for src_dst_file in include:
601596
src_file, dst_file = src_dst_file.split(':')
602597
if os.path.isfile(src_file):
603-
logger.debug(
604-
f"Uploading local file '{src_file}' to VM image at '{dst_file}'"
605-
)
598+
logger.debug(f"Uploading local file '{src_file}' to VM image in '{dst_file}'")
606599
build_vm.get_ssh_client().upload_local_file(src_file, dst_file)
607600

608601
if script_file:
609-
script_path = os.path.expanduser(script_file)
602+
script = os.path.expanduser(script_file)
610603
logger.debug(f"Uploading user script '{script_file}' to {build_vm}")
611-
remote_user_script = '/tmp/install_user_lithops.sh'
612-
build_vm.get_ssh_client().upload_local_file(script_path, remote_user_script)
604+
remote_script = "/tmp/install_user_lithops.sh"
605+
build_vm.get_ssh_client().upload_local_file(script, remote_script)
613606
logger.debug(f"Executing user script '{script_file}'")
614607
build_vm.get_ssh_client().run_remote_command(
615-
f'chmod 777 {remote_user_script}; sudo {remote_user_script}; '
616-
f'rm {remote_user_script};'
608+
f"chmod 777 {remote_script}; sudo {remote_script}; rm {remote_script};"
617609
)
618-
logger.debug(f"User script '{script_file}' finished")
610+
logger.debug(f"User script '{script_file}' finsihed")
619611

620612
build_vm.stop()
621613
build_vm.wait_stopped()
@@ -624,28 +616,29 @@ def build_image(self, image_name, script_file, overwrite, include, extra_args=[]
624616
disk_name = self._get_boot_disk_source(instance_data)
625617
source_disk = f'zones/{self.zone}/disks/{disk_name}'
626618

627-
logger.debug('Starting VM image creation')
628-
logger.debug('Be patient, VM imaging can take up to 10 minutes')
629-
630619
op = self.compute_client.images().insert(
631620
project=self.project_name,
632621
body={
633622
'name': image_name,
634-
'description': 'Lithops custom VM image',
623+
'description': 'Lithops Image',
635624
'sourceDisk': source_disk,
636625
'labels': {'type': 'lithops-runtime'},
637626
},
638627
).execute()
639628
self._wait_operation(op['name'], scope='global')
629+
630+
logger.debug("Starting VM image creation")
631+
logger.debug("Be patient, VM imaging can take up to 10 minutes")
640632
self._wait_image_ready(image_name)
641633

642634
if not is_initialized:
643-
self.clean(all=True)
635+
while not self.clean(all=True):
636+
time.sleep(5)
644637
else:
645638
build_vm.delete()
646639

647640
image_ref = self._project_image_ref(image_name)
648-
logger.info(f'VM Image created. source_image: {image_ref}')
641+
logger.info(f"VM Image created. Image ID: {image_ref}")
649642

650643
def delete_image(self, image_name):
651644
"""
@@ -717,42 +710,73 @@ def list_images(self, **kwargs):
717710
return sorted(result, key=lambda x: x[2], reverse=True)
718711

719712
def clean(self, **kwargs):
713+
"""
714+
Clean all the backend resources.
715+
Returns True when cleanup completed, False if resources are still in use.
716+
"""
720717
all_clean = kwargs.get('all', False)
718+
logger.info('Cleaning GCP Compute Engine resources')
719+
721720
if not self.gce_data:
722721
self._load_gce_data()
723722

724723
if self.mode == StandaloneMode.CONSUME.value:
725724
self._delete_vpc_data()
726-
return
727-
728-
self._delete_vm_instances(all=all_clean)
725+
return True
729726

730-
master_name = self.gce_data.get('master_name') or (self.master.name if self.master else None)
731-
if master_name:
732-
master_pk = os.path.join(self.cache_dir, f'{master_name}-id_rsa.pub')
733-
if all_clean and os.path.isfile(master_pk):
734-
os.remove(master_pk)
727+
try:
728+
self._delete_vm_instances(all=all_clean)
735729

736-
if all_clean:
737-
self._delete_network_resources()
738-
self._delete_ssh_key()
739-
self._delete_vpc_data()
730+
master_name = self.gce_data.get('master_name') or (
731+
self.master.name if self.master else None
732+
)
733+
if master_name:
734+
master_pk = os.path.join(self.cache_dir, f'{master_name}-id_rsa.pub')
735+
if all_clean and os.path.isfile(master_pk):
736+
os.remove(master_pk)
737+
738+
if all_clean:
739+
self._delete_network_resources()
740+
self._delete_ssh_key()
741+
self._delete_vpc_data()
742+
return True
743+
except HttpError:
744+
return False
740745

741746
def _delete_vm_instances(self, all=False):
742-
prefixes = ('lithops-worker-', 'lithops-master-') if all else ('lithops-worker-',)
743-
instances = self.compute_client.instances().list(
744-
project=self.project_name, zone=self.zone
745-
).execute().get('items', [])
747+
"""
748+
Deletes all worker VM instances
749+
"""
750+
msg = (
751+
f'Deleting all Lithops worker VMs from {self.network_name}'
752+
if self.network_name else 'Deleting all Lithops worker VMs'
753+
)
754+
logger.info(msg)
746755

747-
for ins in instances:
748-
name = ins.get('name', '')
749-
if not name.startswith(prefixes):
750-
continue
751-
logger.debug(f"Deleting VM instance {name}")
752-
op = self.compute_client.instances().delete(
753-
project=self.project_name, zone=self.zone, instance=name
754-
).execute()
755-
self._wait_operation(op['name'], scope='zone')
756+
prefixes = (
757+
('lithops-worker-', 'lithops-master-', 'building-image-')
758+
if all else ('lithops-worker-',)
759+
)
760+
761+
def get_instance_names():
762+
instances = self.compute_client.instances().list(
763+
project=self.project_name, zone=self.zone
764+
).execute().get('items', []) or []
765+
return [
766+
ins['name'] for ins in instances
767+
if ins.get('name', '').startswith(prefixes)
768+
]
769+
770+
while True:
771+
names = get_instance_names()
772+
if not names:
773+
break
774+
for name in names:
775+
logger.debug(f"Deleting VM instance {name}")
776+
op = self.compute_client.instances().delete(
777+
project=self.project_name, zone=self.zone, instance=name
778+
).execute()
779+
self._wait_operation(op['name'], scope='zone')
756780

757781
def _delete_network_resources(self):
758782
"""
@@ -853,8 +877,7 @@ def dismantle(self, include_master=True):
853877
self.master.stop()
854878

855879
def get_instance(self, name, **kwargs):
856-
instance = GCPComputeEngineInstance(self.config, self.compute_client)
857-
instance.name = name
880+
instance = GCPComputeEngineInstance(name, self.config, self.compute_client)
858881
for key in kwargs:
859882
if hasattr(instance, key) and kwargs[key] is not None:
860883
setattr(instance, key, kwargs[key])
@@ -868,13 +891,14 @@ def get_worker_cpu_count(self):
868891

869892
def create_worker(self, name):
870893
"""
871-
Creates a new worker VM instance (same flow as AWS EC2 / IBM VPC).
894+
Creates a new worker VM instance
872895
"""
873896
if self.mode == StandaloneMode.CONSUME.value:
874897
raise NotImplementedError(f'{self.name}.create_worker() not available in consume mode')
875898

876-
worker = GCPComputeEngineInstance(self.config, self.compute_client, public=False)
877-
worker.name = name
899+
worker = GCPComputeEngineInstance(
900+
name, self.config, self.compute_client, public=False
901+
)
878902
worker.instance_type = self.config['worker_instance_type']
879903

880904
user = worker.ssh_credentials['username']
@@ -906,11 +930,15 @@ def get_runtime_key(self, runtime_name, version=__version__):
906930

907931
class GCPComputeEngineInstance:
908932

909-
def __init__(self, config, compute_client, public=False):
933+
def __init__(self, name, config, compute_client, public=False):
934+
"""
935+
Initialize a GCPComputeEngineInstance.
936+
VMs with public=True get an external IP (e.g. master or image build VM).
937+
"""
938+
self.name = name.lower()
910939
self.config = config
911940
self.compute_client = compute_client
912941
self.public = public
913-
self.name = self.config['instance_name']
914942
self.project_name = self.config['project_name']
915943
self.zone = self.config['zone']
916944

lithops/standalone/standalone.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,8 @@ def clean(self, **kwargs):
343343
"""
344344
Clan all the backend resources
345345
"""
346-
if self.is_initialized():
346+
all_clean = kwargs.get('all', False)
347+
if self.is_initialized() and not all_clean:
347348
try:
348349
self.init()
349350
self._make_request('POST', 'clean')

0 commit comments

Comments
 (0)