diff --git a/CHANGELOG.md b/CHANGELOG.md index de82b693a..76f3eb913 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,11 +12,18 @@ - [K8s] Added configuration for pod and container `securityContext`. - [Docs] Corrected MinIO/Ceph config template keys and removed obsolete Kubernetes image references. - [GCP Functions] Updated `gcp_functions` backend to Google Cloud Run functions (Cloud Functions v2 API). +- [Python] Updated all backend python versions (deprecated Python 3.9, added Python 3.14) +- [Azure Funcions] Updated default funcions plan to Flex Consumption +- [AWS EC2] Updated default Ubuntu Image to Ubuntu 24 +- [Azure VMS] Updated default Ubuntu Image to Ubuntu 24 ### Fixed - [K8s] Fixed default runtime builds impacted by Debian Buster end-of-life. - [GCP Cloud Run] Added Artifact Registry (`pkg.dev`) runtime deployment support - [K8s] Run default runtime image as non-root user (uid 1000) (#1469) +- [AWS] Fixed AWS backends execution +- [GCP] Fixed GCP backends execution +- [Azure] Fixed Azure backends execution ## [v3.6.4] diff --git a/config/config_template.yaml b/config/config_template.yaml index c960891af..49623b506 100644 --- a/config/config_template.yaml +++ b/config/config_template.yaml @@ -112,7 +112,7 @@ #ssh_password: #ssh_key_filename: ~/.ssh/id_rsa #request_spot_instances: True - #target_ami: # Default: Ubuntu Server 22.04 + #target_ami: # Default: Ubuntu Server 24.04 #master_instance_type: t3.micro #worker_instance_type: t3.medium #delete_on_dismantle: True @@ -220,6 +220,7 @@ #resource_group: # Falls back to azure.resource_group #region: # Falls back to azure.region #environment: lithops # Container Apps environment name + #environment_id: # Optional. Full resource ID of the Container Apps environment #docker_server: index.docker.io #docker_user: #docker_password: @@ -236,7 +237,7 @@ #azure_vms: #region: # Falls back to azure.region #instance_name: # Mandatory in consume mode - #image_id: Canonical:0001-com-ubuntu-server-jammy:22_04-lts-gen2:latest + #image_id: Canonical:ubuntu-24_04-lts:server:latest #master_instance_type: Standard_B1s #worker_instance_type: Standard_B2s #ssh_username: ubuntu diff --git a/docs/source/compute_config/aws_ec2.md b/docs/source/compute_config/aws_ec2.md index 3d29f4368..a107fd9c2 100644 --- a/docs/source/compute_config/aws_ec2.md +++ b/docs/source/compute_config/aws_ec2.md @@ -9,7 +9,7 @@ This guide assumes that you are already familiar with AWS, and that you have aut ### Choose an operating system image for the VM Any Virtual Machine (VM) needs to define the instance's operating system and version. Lithops supports both standard operating system choices provided by AWS and pre-defined custom images that already contain all dependencies required by Lithops. -- Option 1: By default, Lithops uses an Ubuntu 22.04 image. In this case, no further action is required and you can continue to the next step. Lithops will install all required dependencies in the VM by itself. Note that this can take about 3 minutes to complete all installations. +- Option 1: By default, Lithops uses an Ubuntu 24.04 image. In this case, no further action is required and you can continue to the next step. Lithops will install all required dependencies in the VM by itself. Note that this can take about 3 minutes to complete all installations. - Option 2: Alternatively, you can use a pre-built custom image that will greatly improve VM creation time for Lithops jobs. To benefit from this approach, navigate to [runtime/aws_ec2](https://github.com/lithops-cloud/lithops/tree/master/runtime/aws_ec2), and follow the instructions. @@ -119,7 +119,7 @@ In summary, you can use one of the following settings: |aws_ec2 | ssh_password | |no | Password for accessing the worker VMs. If not provided, it is created randomly| |aws_ec2 | ssh_key_filename | ~/.ssh/id_rsa | no | Path to the ssh key file provided to access the VPC. If not provided, Lithops will use the default path and create a new ssh key for the VPC | |aws_ec2 | request_spot_instances | True | no | Request spot instance for worker VMs| -|aws_ec2 | target_ami | | no | Virtual machine image id. Default is Ubuntu Server 22.04 | +|aws_ec2 | target_ami | | no | Virtual machine image id. Default is Ubuntu Server 24.04 | |aws_ec2 | master_instance_type | t2.micro | no | Profile name for the master VM | |aws_ec2 | worker_instance_type | t2.medium | no | Profile name for the worker VMs | |aws_ec2 | delete_on_dismantle | True | no | Delete the worker VMs when they are stopped. Master VM is never deleted when stopped | diff --git a/docs/source/compute_config/azure_containers.md b/docs/source/compute_config/azure_containers.md index 637042d35..cb1eeae80 100644 --- a/docs/source/compute_config/azure_containers.md +++ b/docs/source/compute_config/azure_containers.md @@ -95,14 +95,15 @@ az containerapp env create --name lithops --resource-group LithopsResourceGroup |---|---|---|---|---| |azure_containers| resource_group | |no | Name of a resource group, for example: `LithopsResourceGroup`. Lithops will use the `resource_group` set under the `azure` section if it is not set here | |azure_containers| region | |no | The location where you created the `lithops` Container APP environment. For example: `westeurope`, `westus2`, etc. Lithops will use the `region` set under the `azure` section if it is not set here| -|azure_containers| environment | lithops |no | The environment name you created in the step 5 of the installation | +|azure_containers| environment | lithops |no | Container Apps environment name. Used to resolve the environment resource ID at deploy time when `environment_id` is not set | +|azure_containers| environment_id | |no | Full Azure resource ID of the Container Apps environment (for example: `/subscriptions//resourceGroups//providers/Microsoft.App/managedEnvironments/lithops`). If set, Lithops uses it directly and skips the `az containerapp env show` lookup | |azure_containers | docker_server | index.docker.io |no | Container registry URL | |azure_containers | docker_user | |no | Container registry user name | |azure_containers | docker_password | |no | Container registry password/token. In case of Docker hub, login to your docker hub account and generate a new access token [here](https://hub.docker.com/settings/security)| |azure_containers | max_workers | 1000 | no | Max number of parallel workers. Although Azure limits the number of parallel workers to 30, it is convenient to keep this value high| |azure_containers | worker_processes | 1 | no | Number of Lithops processes within a given worker. This can be used to parallelize function activations within a worker | |azure_containers| runtime | |no | Docker image name| -|azure_containers | runtime_memory | 512 |no | Memory limit in MB. Default 512Mi | +|azure_containers | runtime_memory | 512 |no | Memory limit in MB. Allowed values (MB): 512, 1024, 1536, 2048, 2560, 3072, 3584, 4096, 4608, 5120, 5632, 6144, 6656, 7168, 7680, 8192. Each maps to a fixed CPU/memory pair on the Consumption plan. Consumption-only environments are limited to 4096 MB (2 vCPU / 4 Gi) | |azure_containers | runtime_timeout | 600 |no | Runtime timeout in seconds. Default 10 minutes | |azure_containers| trigger | pub/sub | no | Currently it supports pub/sub invocation| |azure_containers | invoke_pool_threads | 32 |no | Number of concurrent threads used for invocation | diff --git a/docs/source/compute_config/azure_functions.md b/docs/source/compute_config/azure_functions.md index 4d7b59e84..7e6d673bd 100644 --- a/docs/source/compute_config/azure_functions.md +++ b/docs/source/compute_config/azure_functions.md @@ -12,15 +12,13 @@ python3 -m pip install lithops[azure] 2. Install [Azure CLI](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli?view=azure-cli-latest) -3. Install the [Azure Functions core tools](https://github.com/Azure/azure-functions-core-tools) - -4. Sign in with the Azure CLI: +3. Sign in with the Azure CLI: ```bash az login ``` -5. Create a Resource Group and a Storage Account: +4. Create a Resource Group and a Storage Account: Option 1: @@ -80,7 +78,7 @@ az login |Group|Key|Default|Mandatory|Additional info| |---|---|---|---|---| -|azure_storage| storage_account_name | |yes | Storage account name. The name generated in step 5 of the installation if you followed these instructions | +|azure_storage| storage_account_name | |yes | Storage account name. The name generated in step 4 of the installation if you followed these instructions | |azure_storage| storage_account_key | | yes | An Account Key, found in *Storage Accounts* > `account_name` > *Security + networking* > *Access Keys*| ### Azure Functions @@ -88,9 +86,10 @@ az login |Group|Key|Default|Mandatory|Additional info| |---|---|---|---|---| |azure_functions| resource_group | |no | Name of a resource group, for example: `LithopsResourceGroup`. Lithops will use the `resource_group` set under the `azure` section if it is not set here | -|azure_functions| region | |no | The location of the consumption plan for the runtime. Use `az functionapp list-consumption-locations` to view the available locations. For example: `westeurope`, `westus2`, etc. Lithops will use the `region` set under the `azure` section if it is not set here| +|azure_functions| region | |no | The Flex Consumption plan location for the runtime. Use `az functionapp list-flexconsumption-locations` to view the available locations. For example: `westeurope`, `westus2`, etc. Lithops will use the `region` set under the `azure` section if it is not set here| +|azure_functions | runtime_memory | 2048 | no | Flex Consumption instance memory in MB. Supported values: `512`, `2048`, `4096`. Other values are mapped to the nearest supported size | |azure_functions | max_workers | 1000 | no | Max number of parallel workers. Although Azure limits the number of workers to 200, it is convenient to keep this value high| -|azure_functions | worker_processes | 1 | no | Number of Lithops processes within a given worker. This can be used to parallelize function activations within a worker | +|azure_functions | worker_processes | 1 | no | Lithops-side parallelism setting. Not applied as an Azure Functions app setting on Flex Consumption | |azure_functions| runtime | |no | Runtime name already deployed in the service| |azure_functions | runtime_timeout | 300 |no | Runtime timeout in seconds. Default 5 minutes | |azure_functions| trigger | pub/sub | no | One of 'https' or 'pub/sub'| diff --git a/docs/source/compute_config/azure_vms.md b/docs/source/compute_config/azure_vms.md index c74b06a21..2bfa7a5ed 100644 --- a/docs/source/compute_config/azure_vms.md +++ b/docs/source/compute_config/azure_vms.md @@ -83,7 +83,7 @@ Edit your lithops config and add the relevant keys: |Group|Key|Default|Mandatory|Additional info| |---|---|---|---|---| |azure_vms| region | |no | Azure location for deploying the VMs. For example: `westeurope`, `westus2`, etc. Lithops will use the `region` set under the `azure` section if it is not set here| -|azure_vms | image_id | Canonical:0001-com-ubuntu-server-jammy:22_04-lts-gen2:latest |no | Image ID. ARM resource identifier | +|azure_vms | image_id | Canonical:ubuntu-24_04-lts:server:latest |no | Image ID. ARM resource identifier | |azure_vms | ssh_username | ubuntu |no | Username to access the VM | |azure_vms | ssh_password | |no | Password for accessing the worker VMs. If not provided, it is created randomly| |azure_vms | ssh_key_filename | ~/.ssh/id_rsa | no | Path to the ssh key file provided to access the VPC. It will use the default path if not provided | diff --git a/docs/source/compute_config/gcp_compute_engine.md b/docs/source/compute_config/gcp_compute_engine.md index 04c7fc09e..2f5ae07cc 100644 --- a/docs/source/compute_config/gcp_compute_engine.md +++ b/docs/source/compute_config/gcp_compute_engine.md @@ -97,7 +97,7 @@ Lithops attaches the service account from `credentials_path` to master and worke |gcp_compute_engine | worker_instance_type | e2-standard-2 |no | Worker VM machine type | |gcp_compute_engine | ssh_username | ubuntu |no | Username to access the VM | |gcp_compute_engine | ssh_password | |no | Password for worker VMs. If not provided, it is created randomly | -|gcp_compute_engine | ssh_key_filename | ~/.ssh/id_rsa |no | SSH private key for the master VM. If not provided, Lithops creates one | +|gcp_compute_engine | ssh_key_filename | |no | SSH private key for the master VM. If not provided, Lithops creates `~/.ssh/lithops-key-.gcp_ce.id_rsa` | |gcp_compute_engine | request_spot_instances | False |no | Use Spot VMs for workers | |gcp_compute_engine | delete_on_dismantle | True |no | Delete worker VMs when stopped. Master VM is never deleted when stopped | |gcp_compute_engine | max_workers | 100 |no | Max number of workers per `FunctionExecutor()` | @@ -138,7 +138,7 @@ gcp_compute_engine: |gcp_compute_engine | project_name | |yes | GCP project ID | |gcp_compute_engine | zone | |yes | Compute Engine zone | |gcp_compute_engine | ssh_username | ubuntu |no | Username to access the VM | -|gcp_compute_engine | ssh_key_filename | ~/.ssh/id_rsa |no | Path to the SSH private key | +|gcp_compute_engine | ssh_key_filename | |no | Path to the SSH private key. If not provided, Lithops creates `~/.ssh/lithops-key-.gcp_ce.id_rsa` | |gcp_compute_engine | worker_processes | AUTO |no | Parallel Lithops processes per worker | ## Test Lithops diff --git a/lithops/_grpc_env.py b/lithops/_grpc_env.py new file mode 100644 index 000000000..2b80a0a69 --- /dev/null +++ b/lithops/_grpc_env.py @@ -0,0 +1,21 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os + +# Google Cloud clients (Pub/Sub, etc.) use gRPC. Forking while gRPC is active +# (tqdm/multiprocessing, cleaner subprocess) logs noisy C-core messages on macOS. +# Must be set before gRPC initializes. +os.environ.setdefault('GRPC_ENABLE_FORK_SUPPORT', '0') +os.environ.setdefault('GRPC_VERBOSITY', 'NONE') diff --git a/lithops/executors.py b/lithops/executors.py index e8343b241..0382d263f 100644 --- a/lithops/executors.py +++ b/lithops/executors.py @@ -628,10 +628,6 @@ def save_data_to_clean(data): if (jobs_to_clean or cs) and spawn_cleaner: cmd = [sys.executable, '-m', 'lithops.scripts.cleaner'] env = os.environ.copy() - # Cleaner is forked while gRPC clients may already be active in the parent process. - # Reduce noisy gRPC fork logs in the detached cleaner subprocess. - env.setdefault('GRPC_ENABLE_FORK_SUPPORT', '0') - env.setdefault('GRPC_VERBOSITY', 'ERROR') CLEANER_PROCESS = sp.Popen( cmd, start_new_session=True, diff --git a/lithops/libs/imp/imp.py b/lithops/libs/imp/imp.py index 9c6d42b6b..22a38a43f 100644 --- a/lithops/libs/imp/imp.py +++ b/lithops/libs/imp/imp.py @@ -1,11 +1,11 @@ from _imp import is_builtin, is_frozen -from importlib._bootstrap import _ERR_MSG from importlib import machinery import os import sys import tokenize +_ERR_MSG = 'No module named {!r}' SEARCH_ERROR = 0 PY_SOURCE = 1 diff --git a/lithops/localhost/v1/runner.py b/lithops/localhost/v1/runner.py index 31920977c..7ac39b774 100644 --- a/lithops/localhost/v1/runner.py +++ b/lithops/localhost/v1/runner.py @@ -40,9 +40,12 @@ logger = logging.getLogger('lithops.localhost.runner') -# Change spawn method for MacOS -if platform.system() == 'Darwin': - mp.set_start_method("fork") +# Python 3.14 defaults to forkserver on Linux; Lithops requires fork. +if platform.system() != 'Windows': + try: + mp.set_start_method('fork') + except RuntimeError: + pass def run_job(): diff --git a/lithops/localhost/v2/runner.py b/lithops/localhost/v2/runner.py index d4915dbd5..a43bdde41 100644 --- a/lithops/localhost/v2/runner.py +++ b/lithops/localhost/v2/runner.py @@ -42,9 +42,12 @@ logger = logging.getLogger('lithops.localhost.runner') -# Change spawn method for MacOS -if platform.system() == 'Darwin': - mp.set_start_method("fork") +# Python 3.14 defaults to forkserver on Linux; Lithops requires fork. +if platform.system() != 'Windows': + try: + mp.set_start_method('fork') + except RuntimeError: + pass def run_job(): diff --git a/lithops/serverless/backends/aws_batch/aws_batch.py b/lithops/serverless/backends/aws_batch/aws_batch.py index d566dbd23..ccc80edc2 100644 --- a/lithops/serverless/backends/aws_batch/aws_batch.py +++ b/lithops/serverless/backends/aws_batch/aws_batch.py @@ -19,7 +19,6 @@ import os import re import logging -import subprocess import sys import botocore import time @@ -400,8 +399,7 @@ def build_runtime(self, runtime_name, runtime_file, extra_args=[]): finally: os.remove(batch_config.RUNTIME_ZIP) - cmd = f'{docker_path} login --username AWS --password-stdin {registry}' - subprocess.check_output(cmd.split(), input=ecr_token) + utils.docker_login('AWS', ecr_token.decode('utf-8'), registry) try: self.ecr_client.create_repository(repositoryName=repo_name, diff --git a/lithops/serverless/backends/aws_lambda/aws_lambda.py b/lithops/serverless/backends/aws_lambda/aws_lambda.py index e01ec3787..05742384f 100644 --- a/lithops/serverless/backends/aws_lambda/aws_lambda.py +++ b/lithops/serverless/backends/aws_lambda/aws_lambda.py @@ -21,7 +21,6 @@ import time import json import zipfile -import subprocess import botocore.exceptions import base64 @@ -357,8 +356,7 @@ def build_runtime(self, runtime_name, runtime_file, extra_args=[]): auth_data = res['authorizationData'].pop() ecr_token = base64.b64decode(auth_data['authorizationToken']).split(b':')[1] - cmd = f'{docker_path} login --username AWS --password-stdin {registry}' - subprocess.check_output(cmd.split(), input=ecr_token) + utils.docker_login('AWS', ecr_token.decode('utf-8'), registry) repo_name = self._format_repo_name(runtime_name) diff --git a/lithops/serverless/backends/azure_containers/azure_containers.py b/lithops/serverless/backends/azure_containers/azure_containers.py index d29a1b674..ce3bb7dc9 100644 --- a/lithops/serverless/backends/azure_containers/azure_containers.py +++ b/lithops/serverless/backends/azure_containers/azure_containers.py @@ -17,8 +17,11 @@ import os import json import time +import copy import logging import hashlib +import shutil +import subprocess as sp from azure.storage.queue import QueueServiceClient from lithops import utils @@ -58,6 +61,46 @@ def __init__(self, ac_config, internal_storage): msg = COMPUTE_CLI_MSG.format('Azure Container Apps') logger.info(f"{msg} - Region: {self.location}") + def _check_az_cli(self): + if not shutil.which('az'): + raise Exception( + 'Azure CLI (az) command not found. ' + 'Install it from https://docs.microsoft.com/en-us/cli/azure/install-azure-cli' + ) + + def _run_az_command(self, cmd, return_json=False, return_result=False): + """ + Run an Azure CLI command using shell=True. + """ + self._check_az_cli() + quiet = logger.getEffectiveLevel() != logging.DEBUG + kwargs = {'shell': True, 'encoding': 'UTF-8', 'stderr': sp.PIPE} + if quiet and not (return_json or return_result): + kwargs['stdout'] = sp.DEVNULL + try: + if return_json or return_result: + result = sp.check_output(cmd, **kwargs) + else: + sp.check_call(cmd, **kwargs) + return None + except sp.CalledProcessError as e: + err_msg = f'Azure CLI command failed: {cmd}' + if e.stderr: + err_msg += f'\n{e.stderr.strip()}' + raise Exception(err_msg) from e + + result = result.strip() + if return_json: + try: + return json.loads(result) + except json.JSONDecodeError as e: + raise Exception( + f'Failed to parse Azure CLI output as JSON: {result}' + ) from e + if return_result: + return result.replace('"', '') + return result + def _format_containerapp_name(self, runtime_name, runtime_memory, version=__version__): """ Formates the conatiner app name @@ -68,6 +111,29 @@ def _format_containerapp_name(self, runtime_name, runtime_memory, version=__vers return f'lithops-worker-{version.replace(".", "")}-{name_hash}'[:31] + def _get_managed_environment_id(self): + environment_id = self.ac_config.get('environment_id') + if environment_id: + return environment_id + + cmd = (f'az containerapp env show -g {self.resource_group} -n {self.environment} ' + f'--query id --only-show-errors') + return self._run_az_command(cmd, return_result=True) + + def _containerapp_exists(self, containerapp_name): + cmd = (f'az containerapp show --name {containerapp_name} ' + f'--resource-group {self.resource_group} --only-show-errors') + kwargs = {'shell': True} + if logger.getEffectiveLevel() != logging.DEBUG: + kwargs['stderr'] = sp.DEVNULL + kwargs['stdout'] = sp.DEVNULL + try: + sp.check_call(cmd, **kwargs) + return True + except sp.CalledProcessError: + logger.debug(f'Container app {containerapp_name} not found, will create it') + return False + def _get_default_runtime_image_name(self): """ Generates the default runtime image name @@ -78,8 +144,8 @@ def _get_default_runtime_image_name(self): def deploy_runtime(self, runtime_name, memory, timeout): """ - Deploys a new runtime into Azure Function Apps - from the provided Linux image for consumption plan + Deploys a new runtime into Azure Container Apps + from the provided container image """ if runtime_name == self._get_default_runtime_image_name(): self._build_default_runtime(runtime_name) @@ -127,16 +193,13 @@ def build_runtime(self, runtime_name, dockerfile, extra_args=[]): finally: os.remove(config.FH_ZIP_LOCATION) - docker_user = self.ac_config.get("docker_user") - docker_password = self.ac_config.get("docker_password") - docker_server = self.ac_config.get("docker_server") - logger.debug(f'Pushing runtime {runtime_name} to container registry') - + docker_user = self.ac_config.get('docker_user') + docker_password = self.ac_config.get('docker_password') + docker_server = self.ac_config.get('docker_server') if docker_user and docker_password: logger.debug('Container registry credentials found in config. Logging in into the registry') - cmd = f'{docker_path} login -u {docker_user} --password-stdin {docker_server}' - utils.run_command(cmd, input=docker_password) + utils.docker_login(docker_user, docker_password, docker_server) if utils.is_podman(docker_path): cmd = f'{docker_path} push {runtime_name} --format docker --remove-signatures' @@ -161,66 +224,80 @@ def _create_app(self, runtime_name, memory, timeout): in_queue = self.queue_service.get_queue_client(containerapp_name) in_queue.clear_messages() - ca_temaplate = config.CONTAINERAPP_JSON - ca_temaplate['name'] = containerapp_name - ca_temaplate['location'] = self.location - ca_temaplate['tags']['type'] = 'lithops-runtime' - ca_temaplate['tags']['lithops_version'] = str(__version__) - ca_temaplate['tags']['runtime_name'] = runtime_name - ca_temaplate['tags']['runtime_memory'] = str(memory) + ca_template = copy.deepcopy(config.CONTAINERAPP_JSON) + ca_template['name'] = containerapp_name + ca_template['location'] = self.location + ca_template['tags']['type'] = 'lithops-runtime' + ca_template['tags']['lithops_version'] = str(__version__) + ca_template['tags']['runtime_name'] = runtime_name + ca_template['tags']['runtime_memory'] = str(memory) try: runtime_memory, runtime_cpu = config.ALLOWED_MEM[memory] - ca_temaplate['properties']['template']['containers'][0]['resources']['cpu'] = runtime_cpu - ca_temaplate['properties']['template']['containers'][0]['resources']['memory'] = runtime_memory + ca_template['properties']['template']['containers'][0]['resources']['cpu'] = runtime_cpu + ca_template['properties']['template']['containers'][0]['resources']['memory'] = runtime_memory except Exception: raise Exception(f'The memory {memory} is not allowed, you must choose ' f'one of thses memory values: {config.ALLOWED_MEM.keys()}') - ca_temaplate['properties']['template']['containers'][0]['image'] = runtime_name - ca_temaplate['properties']['template']['containers'][0]['env'][0]['value'] = containerapp_name - ca_temaplate['properties']['template']['scale']['rules'][0]['azureQueue']['queueName'] = containerapp_name - ca_temaplate['properties']['template']['scale']['maxReplicas'] = min(self.ac_config['max_workers'], 30) + ca_template['properties']['template']['containers'][0]['image'] = runtime_name + ca_template['properties']['template']['containers'][0]['env'][0]['value'] = containerapp_name + ca_template['properties']['template']['scale']['rules'][0]['azureQueue']['queueName'] = containerapp_name + ca_template['properties']['template']['scale']['maxReplicas'] = min(self.ac_config['max_workers'], 30) + + grace_period = min(int(timeout), 600) + if int(timeout) > 600: + logger.debug( + f'Container Apps terminationGracePeriodSeconds is capped at 600 seconds ' + f'(requested {timeout} seconds)' + ) + ca_template['properties']['template']['terminationGracePeriodSeconds'] = grace_period - cmd = f"az containerapp env show -g {self.resource_group} -n {self.environment} --query id --only-show-errors" - envorinemnt_id = utils.run_command(cmd, return_result=True) - ca_temaplate['properties']['managedEnvironmentId'] = envorinemnt_id + ca_template['properties']['environmentId'] = self._get_managed_environment_id() cmd = f"az storage account show-connection-string -g {self.resource_group} --name {self.storage_account_name} --query connectionString --out json" - queueconnection = utils.run_command(cmd, return_result=True) - ca_temaplate['properties']['configuration']['secrets'][0]['value'] = queueconnection + queueconnection = self._run_az_command(cmd, return_result=True) + ca_template['properties']['configuration']['secrets'][0]['value'] = queueconnection if self.ac_config.get('docker_password'): - ca_temaplate['properties']['configuration']['secrets'][1]['value'] = self.ac_config['docker_password'] - ca_temaplate['properties']['configuration']['registries'][0]['server'] = self.ac_config['docker_server'] - ca_temaplate['properties']['configuration']['registries'][0]['username'] = self.ac_config['docker_user'] + ca_template['properties']['configuration']['secrets'][1]['value'] = self.ac_config['docker_password'] + ca_template['properties']['configuration']['registries'][0]['server'] = self.ac_config['docker_server'] + ca_template['properties']['configuration']['registries'][0]['username'] = self.ac_config['docker_user'] else: - del ca_temaplate['properties']['configuration']['secrets'][1] - del ca_temaplate['properties']['configuration']['registries'] + del ca_template['properties']['configuration']['secrets'][1] + del ca_template['properties']['configuration']['registries'] with open(config.CA_JSON_LOCATION, 'w') as f: - f.write(json.dumps(ca_temaplate)) + f.write(json.dumps(ca_template)) - cmd = (f'az containerapp create --name {containerapp_name} ' - f'--resource-group {self.resource_group} ' - f'--yaml {config.CA_JSON_LOCATION} --only-show-errors') - - logger.debug('Deploying Azure Container App') + logger.info('Deploying Azure Container App (this may take several minutes)') deployed = False retries = 0 + last_error = None while retries < 10: try: - utils.run_command(cmd) + if self._containerapp_exists(containerapp_name): + logger.debug(f'Container app {containerapp_name} already exists, updating') + cmd = (f'az containerapp update --name {containerapp_name} ' + f'--resource-group {self.resource_group} ' + f'--yaml {config.CA_JSON_LOCATION} --only-show-errors') + else: + cmd = (f'az containerapp create --name {containerapp_name} ' + f'--resource-group {self.resource_group} ' + f'--yaml {config.CA_JSON_LOCATION} --only-show-errors') + self._run_az_command(cmd) os.remove(config.CA_JSON_LOCATION) deployed = True break - except Exception: + except Exception as e: + last_error = e + logger.warning(f'Container app deploy attempt {retries + 1} failed: {e}') time.sleep(10) retries += 1 if not deployed: - raise Exception(f"The Azure Container App cannot be deployed: {cmd}") + raise Exception(f"The Azure Container App cannot be deployed: {cmd}") from last_error def delete_runtime(self, runtime_name, memory, version=__version__): """ @@ -229,7 +306,7 @@ def delete_runtime(self, runtime_name, memory, version=__version__): logger.info(f'Deleting runtime: {runtime_name} - {memory}MB') containerapp_name = self._format_containerapp_name(runtime_name, memory, version) cmd = f'az containerapp delete --name {containerapp_name} --resource-group {self.resource_group} -y --only-show-errors' - utils.run_command(cmd) + self._run_az_command(cmd) try: self.queue_service.delete_queue(containerapp_name) @@ -299,7 +376,7 @@ def _generate_runtime_meta(self, runtime_name, memory): logger.debug("Metadata extracted succesfully") return runtime_meta except StorageNoSuchKeyError: - logger.debug(f'Get runtime metadata retry {i+1}') + logger.debug(f'Get runtime metadata retry {i + 1}') raise Exception('Could not get metadata. Review container logs in the Azure Portal') @@ -310,8 +387,8 @@ def list_runtimes(self, runtime_name='all'): logger.debug('Listing all deployed runtimes') runtimes = [] - response = os.popen('az containerapp list --query "[].{Name:name, Tags:tags}\" --only-show-errors').read() - response = json.loads(response) + cmd = 'az containerapp list --query "[].{Name:name, Tags:tags}" --only-show-errors' + response = self._run_az_command(cmd, return_json=True) for containerapp in response: if containerapp['Tags'] and 'type' in containerapp['Tags'] \ diff --git a/lithops/serverless/backends/azure_containers/config.py b/lithops/serverless/backends/azure_containers/config.py index 57bda0862..e9c3062cb 100644 --- a/lithops/serverless/backends/azure_containers/config.py +++ b/lithops/serverless/backends/azure_containers/config.py @@ -42,6 +42,14 @@ 3072: ('3Gi', 1.5), 3584: ('3.5Gi', 1.75), 4096: ('4Gi', 2), + 4608: ('4.5Gi', 2.25), + 5120: ('5Gi', 2.5), + 5632: ('5.5Gi', 2.75), + 6144: ('6Gi', 3), + 6656: ('6.5Gi', 3.25), + 7168: ('7Gi', 3.5), + 7680: ('7.5Gi', 3.75), + 8192: ('8Gi', 4), } REQUIRED_AZURE_STORAGE_PARAMS = ('storage_account_name', 'storage_account_key') @@ -50,7 +58,7 @@ CONTAINERAPP_JSON = { "type": "Microsoft.App/containerApps", "name": "", - "apiVersion": "2022-03-01", + "apiVersion": "2026-01-01", "kind": "containerapp", "location": "", "tags": { @@ -60,7 +68,7 @@ "runtime_memory": "", }, "properties": { - "managedEnvironmentId": "", + "environmentId": "", "configuration": { "activeRevisionsMode": "single", "secrets": [{ @@ -167,7 +175,7 @@ def load_config(config_data): config_data['azure_containers']['region'] = config_data['azure_containers'].pop('location') for key in DEFAULT_CONFIG_KEYS: - if key not in config_data['azure_containers']: + if key not in config_data['azure_containers'] or not config_data['azure_containers'][key]: config_data['azure_containers'][key] = DEFAULT_CONFIG_KEYS[key] for key in REQUIRED_AZURE_STORAGE_PARAMS: diff --git a/lithops/serverless/backends/azure_functions/azure_functions.py b/lithops/serverless/backends/azure_functions/azure_functions.py index ca639d6df..8b23a8867 100644 --- a/lithops/serverless/backends/azure_functions/azure_functions.py +++ b/lithops/serverless/backends/azure_functions/azure_functions.py @@ -23,8 +23,10 @@ import logging import shutil import zipfile +import subprocess as sp import http.client from urllib.parse import urlparse +from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError from azure.storage.queue import QueueServiceClient from lithops import utils @@ -62,22 +64,141 @@ def __init__(self, af_config, internal_storage): msg = COMPUTE_CLI_MSG.format('Azure Functions') logger.info(f"{msg} - Region: {self.location}") - def _format_function_name(self, runtime_name, version=__version__): + def _check_az_cli(self): + if not shutil.which('az'): + raise Exception( + 'Azure CLI (az) command not found. ' + 'Install it from https://docs.microsoft.com/en-us/cli/azure/install-azure-cli' + ) + + def _run_az_command(self, cmd, return_json=False, return_result=False): + """ + Run an Azure CLI command using shell=True. + """ + self._check_az_cli() + quiet = logger.getEffectiveLevel() != logging.DEBUG + kwargs = {'shell': True, 'encoding': 'UTF-8', 'stderr': sp.PIPE} + if quiet and not (return_json or return_result): + kwargs['stdout'] = sp.DEVNULL + try: + if return_json or return_result: + result = sp.check_output(cmd, **kwargs) + else: + sp.check_call(cmd, **kwargs) + return None + except sp.CalledProcessError as e: + err_msg = f'Azure CLI command failed: {cmd}' + if e.stderr: + err_msg += f'\n{e.stderr.strip()}' + raise Exception(err_msg) from e + + result = result.strip() + if return_json: + try: + return json.loads(result) + except json.JSONDecodeError as e: + raise Exception( + f'Failed to parse Azure CLI output as JSON: {result}' + ) from e + if return_result: + return result.replace('"', '') + return result + + def _function_app_exists(self, function_name): + cmd = (f'az functionapp show --name {function_name} ' + f'--resource-group {self.resource_group}') + kwargs = {} + if logger.getEffectiveLevel() != logging.DEBUG: + kwargs['stderr'] = sp.DEVNULL + kwargs['stdout'] = sp.DEVNULL + try: + sp.check_call(cmd, shell=True, **kwargs) + return True + except sp.CalledProcessError: + logger.debug(f'Function app {function_name} not found, will create it') + return False + + def _create_deploy_zip(self, build_dir): + """ + Create a zip of the build directory contents for OneDeploy. + """ + zip_path = os.path.join(TEMP_DIR, f'{os.path.basename(build_dir)}-deploy.zip') + if os.path.exists(zip_path): + os.remove(zip_path) + + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: + for root, _, files in os.walk(build_dir): + for file in files: + file_path = os.path.join(root, file) + arcname = os.path.relpath(file_path, build_dir) + zf.write(file_path, arcname) + + return zip_path + + def _publish_function(self, function_name, build_dir): """ - Formates the function name + Deploy the function app package using Azure CLI OneDeploy (Flex Consumption). """ - ac_name = self.storage_account_name - name = f'{ac_name}-{runtime_name}-{version}-{self.trigger}' - name_hash = hashlib.sha1(name.encode("utf-8")).hexdigest()[:10] + zip_path = self._create_deploy_zip(build_dir) + try: + logger.info(f'Publishing function: {function_name}') + if utils.is_linux_system(): + cmd = (f'az functionapp deployment source config-zip ' + f'-g {self.resource_group} -n {function_name} ' + f'--src {zip_path} --build-remote false') + else: + cmd = (f'az functionapp deployment source config-zip ' + f'-g {self.resource_group} -n {function_name} ' + f'--src {zip_path}') + + max_retries = 10 + last_error = None + for attempt in range(1, max_retries + 1): + try: + time.sleep(10) + self._run_az_command(cmd) + logger.info(f'Function {function_name} published successfully') + break + except Exception as e: + last_error = e + logger.warning(f'Publish attempt {attempt}/{max_retries} failed: {e}') + else: + raise Exception( + f'Failed to publish function {function_name} after {max_retries} attempts' + ) from last_error + finally: + if os.path.exists(zip_path): + os.remove(zip_path) + + time.sleep(10) + def _get_function_identity(self, runtime_name, version=__version__): + """ + Returns a stable hash for a Lithops runtime on Azure Functions. + """ + name = f'{self.storage_account_name}-{runtime_name}-{version}-{self.trigger}' + return hashlib.sha1(name.encode("utf-8")).hexdigest()[:12] + + def _format_function_name(self, runtime_name, version=__version__): + """ + Formats the function app name. + Host ID collisions are avoided via AzureFunctionsWebHost__hostid. + """ + name_hash = self._get_function_identity(runtime_name, version) return f'lithops-worker-{runtime_name}-{version.replace(".", "")}-{name_hash}' + def _format_host_id(self, runtime_name, version=__version__): + """ + Unique Functions host ID (must start with a letter, alphanumeric only). + """ + return f'l{self._get_function_identity(runtime_name, version)}' + def _format_queue_name(self, function_name, q_type): """ Generates the queue name """ - hash = function_name.rsplit("-", 1)[-1] - return f'lithops-worker-{hash}-{q_type}' + name_hash = function_name.rsplit("-", 1)[-1] + return f'lithops-worker-{name_hash}-{q_type}' def _get_default_runtime_name(self): """ @@ -89,7 +210,7 @@ def _get_default_runtime_name(self): def deploy_runtime(self, runtime_name, memory, timeout): """ Deploys a new runtime into Azure Function Apps - from the provided Linux image for consumption plan + on the Flex Consumption plan """ if runtime_name == self._get_default_runtime_name(): self._build_default_runtime(runtime_name) @@ -186,63 +307,53 @@ def _create_function(self, runtime_name, memory, timeout): logger.info(f'Creating Azure Function from runtime {runtime_name}') function_name = self._format_function_name(runtime_name) - if self.trigger == 'pub/sub': - try: - in_q_name = self._format_queue_name(function_name, config.IN_QUEUE) - logger.debug(f'Creating queue {in_q_name}') - self.queue_service.create_queue(in_q_name) - except Exception: - in_queue = self.queue_service.get_queue_client(in_q_name) - in_queue.clear_messages() - try: - out_q_name = self._format_queue_name(function_name, config.OUT_QUEUE) - logger.debug(f'Creating queue {out_q_name}') - self.queue_service.create_queue(out_q_name) - except Exception: - out_queue = self.queue_service.get_queue_client(out_q_name) - out_queue.clear_messages() + instance_memory = config.get_flex_instance_memory(memory) + if instance_memory != memory: + logger.debug( + f'Using Flex Consumption instance memory {instance_memory}MB ' + f'(requested {memory}MB)' + ) cmd = (f'az functionapp create --name {function_name} ' f'--storage-account {self.storage_account_name} ' f'--resource-group {self.resource_group} ' - '--os-type Linux --runtime python ' + f'--flexconsumption-location {self.location} ' + '--runtime python ' f'--runtime-version {utils.CURRENT_PY_VERSION} ' f'--functions-version {self.functions_version} ' - f'--consumption-plan-location {self.location} ' + f'--instance-memory {instance_memory} ' f'--tags type=lithops-runtime lithops_version={__version__} runtime_name={runtime_name}') - utils.run_command(cmd) + if not self._function_app_exists(function_name): + self._run_az_command(cmd) + else: + logger.debug(f'Function app {function_name} already exists, skipping create') + host_id = self._format_host_id(runtime_name) cmd = (f'az functionapp config appsettings set --name {function_name} ' f'--resource-group {self.resource_group} ' - f'--settings FUNCTIONS_WORKER_PROCESS_COUNT=1 ' - f'PYTHON_THREADPOOL_THREAD_COUNT=None') - # utils.run_command(cmd) + f'--settings AzureFunctionsWebHost__hostid={host_id}') + self._run_az_command(cmd) build_dir = os.path.join(config.BUILD_DIR, function_name) - os.chdir(build_dir) - if utils.is_linux_system(): - cmd = f'func azure functionapp publish {function_name} --python --no-build' - else: - cmd = f'func azure functionapp publish {function_name} --python' - logger.info(f'Publishing function: {function_name}') - while True: - try: - time.sleep(10) - utils.run_command(cmd) - break - except Exception: - pass + self._publish_function(function_name, build_dir) - time.sleep(10) + if self.trigger == 'pub/sub': + in_q_name = self._format_queue_name(function_name, config.IN_QUEUE) + logger.debug(f'Creating queue {in_q_name}') + self._ensure_queue(in_q_name) + out_q_name = self._format_queue_name(function_name, config.OUT_QUEUE) + logger.debug(f'Creating queue {out_q_name}') + self._ensure_queue(out_q_name) - def delete_runtime(self, runtime_name, memory, version=__version__): + def delete_runtime(self, runtime_name, memory, version=__version__, function_name=None): """ Deletes a runtime """ logger.info(f'Deleting runtime: {runtime_name} - {memory}MB') - function_name = self._format_function_name(runtime_name, version) + if function_name is None: + function_name = self._format_function_name(runtime_name, version) cmd = f'az functionapp delete --name {function_name} --resource-group {self.resource_group}' - utils.run_command(cmd) + self._run_az_command(cmd) try: in_q_name = self._format_queue_name(function_name, config.IN_QUEUE) @@ -320,6 +431,33 @@ def get_runtime_key(self, runtime_name, runtime_memory, version=__version__): return runtime_key + def _ensure_queue(self, queue_name): + """ + Create a queue if it does not exist, or clear it if it already exists. + """ + queue_client = self.queue_service.get_queue_client(queue_name) + try: + queue_client.create_queue() + except ResourceExistsError: + try: + queue_client.clear_messages() + except ResourceNotFoundError: + pass + + def _clean_queues(self): + """ + Delete all Lithops worker queues, including orphaned ones from failed deploys. + """ + if self.trigger != 'pub/sub': + return + + try: + for queue in self.queue_service.list_queues(name_starts_with='lithops-worker-'): + logger.debug(f'Deleting queue {queue.name}') + self.queue_service.delete_queue(queue.name) + except Exception as e: + logger.debug(f'Error cleaning queues: {e}') + def clean(self, **kwargs): """ Deletes all Lithops Azure Function Apps runtimes @@ -329,7 +467,9 @@ def clean(self, **kwargs): runtimes = self.list_runtimes() for runtime_name, runtime_memory, version, wk_name in runtimes: - self.delete_runtime(runtime_name, runtime_memory, version) + self.delete_runtime(runtime_name, runtime_memory, version, function_name=wk_name) + + self._clean_queues() def _generate_runtime_meta(self, runtime_name, memory): """ @@ -357,8 +497,8 @@ def list_runtimes(self, runtime_name='all'): logger.debug('Listing all deployed runtimes') runtimes = [] - response = os.popen('az functionapp list --query "[].{Name:name, Tags:tags}\"').read() - response = json.loads(response) + cmd = 'az functionapp list --query "[].{Name:name, Tags:tags}"' + response = self._run_az_command(cmd, return_json=True) for functionapp in response: if functionapp['Tags'] and 'type' in functionapp['Tags'] \ diff --git a/lithops/serverless/backends/azure_functions/config.py b/lithops/serverless/backends/azure_functions/config.py index ea4eb97ed..eb9bf1c70 100644 --- a/lithops/serverless/backends/azure_functions/config.py +++ b/lithops/serverless/backends/azure_functions/config.py @@ -27,7 +27,7 @@ DEFAULT_CONFIG_KEYS = { 'runtime_timeout': 300, # Default: 300 seconds => 5 minutes - 'runtime_memory': 1536, # Default memory: 1536 MB + 'runtime_memory': 2048, # Default Flex Consumption instance memory: 2048 MB 'max_workers': 1000, 'worker_processes': 1, 'invoke_pool_threads': 10, @@ -37,6 +37,22 @@ AVAILABLE_PY_RUNTIMES = ['3.10', '3.11', '3.12', '3.13', '3.14'] +FLEX_INSTANCE_MEMORY = {512, 2048, 4096} + + +def get_flex_instance_memory(memory): + """ + Map runtime_memory to a valid Flex Consumption instance size (MB). + """ + if memory in FLEX_INSTANCE_MEMORY: + return memory + if memory <= 512: + return 512 + if memory <= 2048: + return 2048 + return 4096 + + REQUIRED_AZURE_STORAGE_PARAMS = ('storage_account_name', 'storage_account_key') REQUIRED_AZURE_FUNCTIONS_PARAMS = ('resource_group', 'region') @@ -118,7 +134,7 @@ }, "extensionBundle": { "id": "Microsoft.Azure.Functions.ExtensionBundle", - "version": "[2.*, 3.0.0)" + "version": "[4.*, 5.0.0)" } } """ diff --git a/lithops/serverless/backends/code_engine/code_engine.py b/lithops/serverless/backends/code_engine/code_engine.py index d48fcfcb3..56f4251c6 100644 --- a/lithops/serverless/backends/code_engine/code_engine.py +++ b/lithops/serverless/backends/code_engine/code_engine.py @@ -266,8 +266,7 @@ def build_runtime(self, docker_image_name, dockerfile, extra_args=[]): if docker_user and docker_password: logger.debug('Container registry credentials found in config. Logging in into the registry') - cmd = f'{docker_path} login -u {docker_user} --password-stdin {docker_server}' - utils.run_command(cmd, input=docker_password) + utils.docker_login(docker_user, docker_password, docker_server) if utils.is_podman(docker_path): cmd = f'{docker_path} push {docker_image_name} --format docker --remove-signatures' @@ -496,7 +495,7 @@ def invoke(self, docker_image_name, runtime_memory, job_payload): config_map = self._create_config_map(activation_id, job_payload) container['env'][1]['valueFrom']['configMapKeyRef']['name'] = config_map - container['resources']['requests']['memory'] = f'{runtime_memory/1024}G' + container['resources']['requests']['memory'] = f'{runtime_memory / 1024}G' container['resources']['requests']['cpu'] = str(self.config['runtime_cpu']) logger.debug('ExecutorID {} | JobID {} - Going to run {} activations ' @@ -585,7 +584,7 @@ def _create_job_definition(self, docker_image_name, runtime_memory, timeout=None container['image'] = docker_image_name container['name'] = jobdef_name container['env'][0]['value'] = 'run' - container['resources']['requests']['memory'] = f'{runtime_memory/1024}G' + container['resources']['requests']['memory'] = f'{runtime_memory / 1024}G' container['resources']['requests']['cpu'] = str(self.config['runtime_cpu']) if not all(key in self.config for key in ["docker_user", "docker_password"]): diff --git a/lithops/serverless/backends/gcp_cloudrun/__init__.py b/lithops/serverless/backends/gcp_cloudrun/__init__.py index ae7707626..a685d3a12 100644 --- a/lithops/serverless/backends/gcp_cloudrun/__init__.py +++ b/lithops/serverless/backends/gcp_cloudrun/__init__.py @@ -1,3 +1,5 @@ +from lithops import _grpc_env # noqa: F401 + from .cloudrun import GCPCloudRunBackend as ServerlessBackend __all__ = ['ServerlessBackend'] diff --git a/lithops/serverless/backends/gcp_functions/__init__.py b/lithops/serverless/backends/gcp_functions/__init__.py index 8f3d647bc..7aed4e043 100644 --- a/lithops/serverless/backends/gcp_functions/__init__.py +++ b/lithops/serverless/backends/gcp_functions/__init__.py @@ -1,3 +1,5 @@ +from lithops import _grpc_env # noqa: F401 + from .gcp_functions import GCPFunctionsBackend as ServerlessBackend __all__ = ['ServerlessBackend'] diff --git a/lithops/serverless/backends/gcp_functions/gcp_functions.py b/lithops/serverless/backends/gcp_functions/gcp_functions.py index 382f5460a..bf531ca2b 100644 --- a/lithops/serverless/backends/gcp_functions/gcp_functions.py +++ b/lithops/serverless/backends/gcp_functions/gcp_functions.py @@ -32,6 +32,7 @@ from google.oauth2 import service_account from google_auth_httplib2 import AuthorizedHttp from googleapiclient.discovery import build +from googleapiclient.errors import HttpError from google.auth import jwt from lithops import utils @@ -354,18 +355,26 @@ def deploy_runtime(self, runtime_name, memory, timeout): return runtime_meta - def delete_runtime(self, runtime_name, runtime_memory, version=__version__): - function_name = self._format_function_name(runtime_name, runtime_memory, version) + def delete_runtime(self, runtime_name, runtime_memory, version=__version__, + function_name=None): + if function_name is None: + function_name = self._format_function_name( + runtime_name, runtime_memory, version) function_location = self._get_function_location(function_name) logger.info(f'Deleting runtime: {runtime_name} - {runtime_memory}MB') # Delete function - self._api_resource.projects().locations().functions().delete( - name=function_location, - ).execute(num_retries=self.num_retries) - logger.debug('Request Ok - Waiting until the function is completely deleted') - - self._wait_function_deleted(function_location) + try: + self._api_resource.projects().locations().functions().delete( + name=function_location, + ).execute(num_retries=self.num_retries) + logger.debug('Request Ok - Waiting until the function is completely deleted') + self._wait_function_deleted(function_location) + except HttpError as e: + if e.resp.status == 404: + logger.debug(f'Function {function_name} not found, skipping delete') + else: + raise if self.trigger == 'pub/sub': # Delete Pub/Sub topic attached as trigger for the cloud function @@ -392,7 +401,8 @@ def clean(self, **kwargs): logger.debug('Going to delete all deployed runtimes') runtimes = self.list_runtimes() for runtime_name, runtime_memory, version, wk_name in runtimes: - self.delete_runtime(runtime_name, runtime_memory, version) + self.delete_runtime( + runtime_name, runtime_memory, version, function_name=wk_name) def list_runtimes(self, runtime_name='all'): logger.debug('Listing deployed runtimes') diff --git a/lithops/serverless/backends/ibm_cf/ibm_cf.py b/lithops/serverless/backends/ibm_cf/ibm_cf.py index 89e488ad0..a9f1443c0 100644 --- a/lithops/serverless/backends/ibm_cf/ibm_cf.py +++ b/lithops/serverless/backends/ibm_cf/ibm_cf.py @@ -189,8 +189,7 @@ def build_runtime(self, docker_image_name, dockerfile, extra_args=[]): if docker_user and docker_password: logger.debug('Container registry credentials found in config. Logging in into the registry') - cmd = f'{docker_path} login -u {docker_user} --password-stdin {docker_server}' - utils.run_command(cmd, input=docker_password) + utils.docker_login(docker_user, docker_password, docker_server) if utils.is_podman(docker_path): cmd = f'{docker_path} push {docker_image_name} --format docker --remove-signatures' diff --git a/lithops/serverless/backends/k8s/k8s.py b/lithops/serverless/backends/k8s/k8s.py index 7aeb38cc2..b81dd3555 100644 --- a/lithops/serverless/backends/k8s/k8s.py +++ b/lithops/serverless/backends/k8s/k8s.py @@ -202,8 +202,7 @@ def build_runtime(self, docker_image_name, dockerfile, extra_args=[]): if docker_user and docker_password: logger.debug('Container registry credentials found in config. Logging in into the registry') - cmd = f'{docker_path} login -u {docker_user} --password-stdin {docker_server}' - utils.run_command(cmd, input=docker_password) + utils.docker_login(docker_user, docker_password, docker_server) logger.debug(f'Pushing runtime {docker_image_name} to container registry') if utils.is_podman(docker_path): diff --git a/lithops/serverless/backends/knative/knative.py b/lithops/serverless/backends/knative/knative.py index e05297110..1e662817f 100644 --- a/lithops/serverless/backends/knative/knative.py +++ b/lithops/serverless/backends/knative/knative.py @@ -578,8 +578,7 @@ def build_runtime(self, runtime_name, dockerfile, extra_args=[]): if docker_user and docker_password: logger.debug('Container registry credentials found in config. Logging in into the registry') - cmd = f'{docker_path} login -u {docker_user} --password-stdin {docker_server}' - utils.run_command(cmd, input=docker_password) + utils.docker_login(docker_user, docker_password, docker_server) logger.debug(f'Pushing runtime {runtime_name} to container registry') if utils.is_podman(docker_path): diff --git a/lithops/serverless/backends/openwhisk/openwhisk.py b/lithops/serverless/backends/openwhisk/openwhisk.py index c23693e55..6f01bdbca 100644 --- a/lithops/serverless/backends/openwhisk/openwhisk.py +++ b/lithops/serverless/backends/openwhisk/openwhisk.py @@ -129,8 +129,7 @@ def build_runtime(self, docker_image_name, dockerfile, extra_args=[]): logger.debug(f'Pushing runtime {docker_image_name} to container registry') if docker_user and docker_password: - cmd = f'{docker_path} login -u {docker_user} --password-stdin {docker_server}' - utils.run_command(cmd, input=docker_password) + utils.docker_login(docker_user, docker_password, docker_server) if utils.is_podman(docker_path): cmd = f'{docker_path} push {docker_image_name} --format docker --remove-signatures' diff --git a/lithops/standalone/backends/aws_ec2/aws_ec2.py b/lithops/standalone/backends/aws_ec2/aws_ec2.py index 96379c5f4..e5fb6d381 100644 --- a/lithops/standalone/backends/aws_ec2/aws_ec2.py +++ b/lithops/standalone/backends/aws_ec2/aws_ec2.py @@ -30,7 +30,14 @@ from lithops.util.ssh_client import SSHClient, ssh_boot_status_message from lithops.constants import COMPUTE_CLI_MSG, CACHE_DIR from lithops.config import load_yaml_config, dump_yaml_config -from lithops.standalone.utils import CLOUD_CONFIG_WORKER, CLOUD_CONFIG_WORKER_PK, StandaloneMode, get_host_setup_script +from lithops.standalone.utils import ( + CLOUD_CONFIG_WORKER, + CLOUD_CONFIG_WORKER_PK, + StandaloneMode, + get_host_setup_script, + prepare_standalone_clean, + standalone_clean_stop_early, +) from lithops.standalone import LithopsValidationError @@ -38,11 +45,11 @@ INSTANCE_STX_TIMEOUT = 180 -DEFAULT_UBUNTU_IMAGE = 'ubuntu/images/hvm-ssd/ubuntu-jammy-22.04-amd64-server-*' -DEFAULT_UBUNTU_IMAGE_VERSION = DEFAULT_UBUNTU_IMAGE.replace('*', '202306*') +DEFAULT_UBUNTU_IMAGE = 'ubuntu/images/hvm-ssd-gp3/ubuntu-noble-24.04-amd64-server-*' +DEFAULT_UBUNTU_IMAGE_VERSION = DEFAULT_UBUNTU_IMAGE.replace('*', '202*') DEFAULT_UBUNTU_ACCOUNT_ID = '099720109477' -DEFAULT_LITHOPS_IMAGE_NAME = 'lithops-ubuntu-jammy-22.04-amd64-server' +DEFAULT_LITHOPS_IMAGE_NAME = 'lithops-ubuntu-noble-24.04-amd64-server' def b64s(string): @@ -96,6 +103,7 @@ def __init__(self, ec2_config, mode): self.master = None self.workers = [] + self._init_created = None msg = COMPUTE_CLI_MSG.format('AWS EC2') logger.info(f"{msg} - Region: {self.region_name}") @@ -104,6 +112,8 @@ def is_initialized(self): """ Checks if the backend is initialized """ + if self.mode == StandaloneMode.CONSUME.value: + return True return os.path.isfile(self.cache_file) def _load_ec2_data(self): @@ -115,7 +125,7 @@ def _load_ec2_data(self): if self.ec2_data: logger.debug(f'EC2 data loaded from {self.cache_file}') - if 'vpc_id' in self.ec2_data: + if self.ec2_data and 'vpc_id' in self.ec2_data: self.vpc_key = self.ec2_data['vpc_id'][-6:] self.vpc_name = self.ec2_data['vpc_name'] @@ -132,6 +142,93 @@ def _delete_vpc_data(self): if os.path.exists(self.cache_file): os.remove(self.cache_file) + def _reset_init_created(self): + self._init_created = { + 'vpc': False, + 'subnet': False, + 'internet_gateway': False, + 'security_group': False, + 'ssh_key': False, + } + + def _safe_rollback_delete(self, delete_fn, resource_desc): + try: + delete_fn() + except ClientError as err: + code = err.response.get('Error', {}).get('Code', '') + if code in ('InvalidGroup.NotFound', 'InvalidSubnetID.NotFound', + 'InvalidVpcID.NotFound', 'InvalidInternetGatewayID.NotFound', + 'InvalidKeyPair.NotFound'): + pass + else: + logger.warning(f'Rollback: could not delete {resource_desc}: {err}') + except Exception as err: + logger.warning(f'Rollback: could not delete {resource_desc}: {err}') + + def _rollback_init_resources(self): + """ + Deletes AWS resources created during a failed init(). + """ + if not self._init_created: + return + + logger.info('Rolling back AWS EC2 resources provisioned during failed init') + created = self._init_created + + if created.get('security_group') and self.config.get('security_group_id'): + sg_id = self.config['security_group_id'] + self._safe_rollback_delete( + lambda: self.ec2_client.delete_security_group(GroupId=sg_id), + f'security group {sg_id}', + ) + + if created.get('internet_gateway') and self.config.get('internet_gateway_id'): + igw_id = self.config['internet_gateway_id'] + vpc_id = self.config.get('vpc_id') + if vpc_id: + self._safe_rollback_delete( + lambda: self.ec2_client.detach_internet_gateway( + InternetGatewayId=igw_id, VpcId=vpc_id + ), + f'detach internet gateway {igw_id}', + ) + self._safe_rollback_delete( + lambda: self.ec2_client.delete_internet_gateway(InternetGatewayId=igw_id), + f'internet gateway {igw_id}', + ) + + if created.get('subnet') and self.config.get('public_subnet_id'): + subnet_id = self.config['public_subnet_id'] + self._safe_rollback_delete( + lambda: self.ec2_client.delete_subnet(SubnetId=subnet_id), + f'subnet {subnet_id}', + ) + + if created.get('vpc') and self.config.get('vpc_id'): + vpc_id = self.config['vpc_id'] + self._safe_rollback_delete( + lambda: self.ec2_client.delete_vpc(VpcId=vpc_id), + f'VPC {vpc_id}', + ) + + if created.get('ssh_key'): + key_filename = self.config.get('ssh_key_filename') + if key_filename and 'lithops-key-' in key_filename: + for path in (key_filename, f'{key_filename}.pub'): + if os.path.isfile(path): + os.remove(path) + if self.config.get('ssh_key_name'): + key_name = self.config['ssh_key_name'] + self._safe_rollback_delete( + lambda: self.ec2_client.delete_key_pair(KeyName=key_name), + f'SSH key pair {key_name}', + ) + + if self.vpc_data_type == 'created': + self._delete_vpc_data() + + self._init_created = None + def _create_vpc(self): """ Creates a new VPC @@ -164,6 +261,8 @@ def _create_vpc(self): self.ec2_client.create_tags(Resources=[response['Vpc']['VpcId']], Tags=tags) self.config['vpc_id'] = response['Vpc']['VpcId'] + if self._init_created is not None: + self._init_created['vpc'] = True def _create_subnets(self): """ @@ -187,6 +286,8 @@ def _create_subnets(self): ) public_subnet_id = response['Subnet']['SubnetId'] self.config['public_subnet_id'] = public_subnet_id + if self._init_created is not None: + self._init_created['subnet'] = True # if 'private_subnet_id' in self.ec2_data: # sg_info = self.ec2_client.describe_subnets( @@ -232,6 +333,8 @@ def _create_internet_gateway(self): VpcId=self.config['vpc_id'], InternetGatewayId=internet_gateway_id ) self.config['internet_gateway_id'] = internet_gateway_id + if self._init_created is not None: + self._init_created['internet_gateway'] = True def _create_nat_gateway(self): """ @@ -390,6 +493,8 @@ def _create_security_group(self): ) self.config['security_group_id'] = response['GroupId'] + if self._init_created is not None: + self._init_created['security_group'] = True def _create_ssh_key(self): """ @@ -409,10 +514,12 @@ def _create_ssh_key(self): filename = os.path.join("~", ".ssh", f"{keyname}.{self.name}.id_rsa") key_filename = os.path.expanduser(filename) + ssh_key_created = False if not os.path.isfile(key_filename): logger.debug("Generating new ssh key pair") os.system(f'ssh-keygen -b 2048 -t rsa -f {key_filename} -q -N ""') logger.debug(f"SHH key pair generated: {key_filename}") + ssh_key_created = True try: self.ec2_client.delete_key_pair(KeyName=keyname) except ClientError: @@ -427,8 +534,11 @@ def _create_ssh_key(self): ssh_key_data = file.read() self.ec2_client.import_key_pair(KeyName=keyname, PublicKeyMaterial=ssh_key_data) self.config['ssh_key_name'] = keyname + ssh_key_created = True self.config['ssh_key_filename'] = key_filename + if self._init_created is not None and ssh_key_created: + self._init_created['ssh_key'] = True def _request_image_id(self): """ @@ -460,7 +570,13 @@ def _request_image_id(self): 'Values': [DEFAULT_UBUNTU_IMAGE_VERSION] }], Owners=[DEFAULT_UBUNTU_ACCOUNT_ID]) - self.config['target_ami'] = response['Images'][0]['ImageId'] + images = response['Images'] + if not images: + raise Exception( + f'No Ubuntu image found matching {DEFAULT_UBUNTU_IMAGE_VERSION}' + ) + images = sorted(images, key=lambda i: i['CreationDate'], reverse=True) + self.config['target_ami'] = images[0]['ImageId'] def _create_master_instance(self): """ @@ -554,57 +670,63 @@ def init(self): elif self.mode in [StandaloneMode.CREATE.value, StandaloneMode.REUSE.value]: - # Create the VPC if not exists - self._create_vpc() - - # Set the suffix used for the VPC resources - self.vpc_key = self.config['vpc_id'][-6:] - - # Create the Subnet if not exists - self._create_subnets() - # Create the internet gateway if not exists - self. _create_internet_gateway() - # Create the NAT gateway - # self._create_nat_gateway() - # Create routing tables - self._create_routing_tables() - # Create the security group if not exists - self._create_security_group() - # Create the ssh key pair if not exists - self._create_ssh_key() - # Requests the Ubuntu image ID - self._request_image_id() - # Request SPOT price - self._request_spot_price() - # Request instance types - self._get_all_instance_types() - - # Create the master VM instance - self._create_master_instance() - - self.ec2_data = { - 'mode': self.mode, - 'vpc_data_type': self.vpc_data_type, - 'ssh_data_type': self.ssh_data_type, - 'master_name': self.master.name, - 'master_id': self.vpc_key, - 'vpc_name': self.vpc_name, - 'vpc_id': self.config['vpc_id'], - 'instance_role': self.config['instance_role'], - 'target_ami': self.config['target_ami'], - 'ssh_key_name': self.config['ssh_key_name'], - 'ssh_key_filename': self.config['ssh_key_filename'], - 'public_subnet_id': self.config['public_subnet_id'], - # 'private_subnet_id': self.config['private_subnet_id'], - 'security_group_id': self.config['security_group_id'], - 'internet_gateway_id': self.config['internet_gateway_id'], - # 'nat_gateway_id': self.config['nat_gateway_id'], - # 'private_rtb_id': self.config['private_rtb_id'], - 'public_rtb_id': self.config['public_rtb_id'], - 'instance_types': self.instance_types - } + self._reset_init_created() + try: + # Create the VPC if not exists + self._create_vpc() + + # Set the suffix used for the VPC resources + self.vpc_key = self.config['vpc_id'][-6:] + + # Create the Subnet if not exists + self._create_subnets() + # Create the internet gateway if not exists + self._create_internet_gateway() + # Create the NAT gateway + # self._create_nat_gateway() + # Create routing tables + self._create_routing_tables() + # Create the security group if not exists + self._create_security_group() + # Create the ssh key pair if not exists + self._create_ssh_key() + # Requests the Ubuntu image ID + self._request_image_id() + # Request SPOT price + self._request_spot_price() + # Request instance types + self._get_all_instance_types() + + # Create the master VM instance + self._create_master_instance() - self._dump_ec2_data() + self.ec2_data = { + 'mode': self.mode, + 'vpc_data_type': self.vpc_data_type, + 'ssh_data_type': self.ssh_data_type, + 'master_name': self.master.name, + 'master_id': self.vpc_key, + 'vpc_name': self.vpc_name, + 'vpc_id': self.config['vpc_id'], + 'instance_role': self.config['instance_role'], + 'target_ami': self.config['target_ami'], + 'ssh_key_name': self.config['ssh_key_name'], + 'ssh_key_filename': self.config['ssh_key_filename'], + 'public_subnet_id': self.config['public_subnet_id'], + # 'private_subnet_id': self.config['private_subnet_id'], + 'security_group_id': self.config['security_group_id'], + 'internet_gateway_id': self.config['internet_gateway_id'], + # 'nat_gateway_id': self.config['nat_gateway_id'], + # 'private_rtb_id': self.config['private_rtb_id'], + 'public_rtb_id': self.config['public_rtb_id'], + 'instance_types': self.instance_types + } + self._dump_ec2_data() + except Exception: + self._rollback_init_resources() + raise + finally: + self._init_created = None def build_image(self, image_name, script_file, overwrite, include, extra_args=[]): """ @@ -719,29 +841,67 @@ def list_images(): time.sleep(2) logger.debug(f"VM Image '{image_name}' successfully deleted") + @staticmethod + def _ubuntu_lts_major_version(name): + match = re.search(r'(\d{2})\.04', name) + if match: + return int(match.group(1)) + return None + + @staticmethod + def _include_ubuntu_lts_image(image, min_major=22): + if image.get('State') != 'available': + return False + name = image.get('Name', '') + if 'minimal' in name or 'pro-server' in name: + return False + if '/ubuntu-' not in name or '-amd64-server-' not in name: + return False + major = AWSEC2Backend._ubuntu_lts_major_version(name) + return major is not None and major >= min_major + + def _iter_canonical_ubuntu_images(self): + paginator = self.ec2_client.get_paginator('describe_images') + for page in paginator.paginate( + Owners=[DEFAULT_UBUNTU_ACCOUNT_ID], + Filters=[ + {'Name': 'name', 'Values': [ + 'ubuntu/images/hvm-ssd/ubuntu-*-amd64-server-*', + 'ubuntu/images/hvm-ssd-gp3/ubuntu-*-amd64-server-*', + ]}, + {'Name': 'state', 'Values': ['available']}, + {'Name': 'architecture', 'Values': ['x86_64']}, + ]): + yield from page['Images'] + def list_images(self): """ - List VM Images + List Ubuntu LTS images (22.04+) from Canonical and Lithops images in the + account. Returns tuples of (name, image_id, creation_date). """ - images_def = self.ec2_client.describe_images(Filters=[ - { - 'Name': 'name', - 'Values': [DEFAULT_UBUNTU_IMAGE] - }], Owners=[DEFAULT_UBUNTU_ACCOUNT_ID])['Images'] - images_user = self.ec2_client.describe_images(Filters=[ - { - 'Name': 'name', - 'Values': ['*lithops*'] - }])['Images'] - images_def.extend(images_user) - result = set() - for image in images_def: - created_at = datetime.strptime(image['CreationDate'], "%Y-%m-%dT%H:%M:%S.%fZ") - created_at = created_at.strftime("%Y-%m-%d %H:%M:%S") + for image in self._iter_canonical_ubuntu_images(): + if not self._include_ubuntu_lts_image(image): + continue + created_at = datetime.strptime( + image['CreationDate'], "%Y-%m-%dT%H:%M:%S.%fZ" + ).strftime("%Y-%m-%d %H:%M:%S") result.add((image['Name'], image['ImageId'], created_at)) + paginator = self.ec2_client.get_paginator('describe_images') + for page in paginator.paginate(Filters=[ + {'Name': 'name', 'Values': ['*lithops*']}, + {'Name': 'state', 'Values': ['available']}, + ]): + for image in page['Images']: + if 'lithops' not in image.get('Name', '').lower(): + continue + created_at = datetime.strptime( + image['CreationDate'], "%Y-%m-%dT%H:%M:%S.%fZ" + ).strftime("%Y-%m-%d %H:%M:%S") + result.add((image['Name'], image['ImageId'], created_at)) + return sorted(result, key=lambda x: x[2], reverse=True) def _delete_vm_instances(self, all=False): @@ -918,21 +1078,17 @@ def clean(self, all=False): """ logger.info('Cleaning AWS EC2 resources') - if not self.ec2_data: + prepare_standalone_clean(self, self._load_ec2_data) + if standalone_clean_stop_early(self, self.ec2_data, self._delete_vpc_data, all): return True - if self.mode == StandaloneMode.CONSUME.value: - self._delete_vpc_data() - return True - else: - self._delete_vm_instances(all=all) - if all: - if self._delete_vpc(): - self._delete_ssh_key() - self._delete_vpc_data() - return True - else: - return False + self._delete_vm_instances(all=all) + if all: + if self._delete_vpc(): + self._delete_ssh_key() + self._delete_vpc_data() + return True + return False def clear(self, job_keys=None): """ @@ -1007,7 +1163,11 @@ def get_runtime_key(self, runtime_name, version=__version__): Creates the runtime key """ name = runtime_name.replace('/', '-').replace(':', '-') - runtime_key = os.path.join(self.name, version, self.ec2_data['master_id'], name) + if self.mode == StandaloneMode.CONSUME.value: + master_id = self.config['instance_id'] + else: + master_id = self.ec2_data['master_id'] + runtime_key = os.path.join(self.name, version, master_id, name) return runtime_key @@ -1118,9 +1278,18 @@ def wait_ready(self, timeout=INSTANCE_STX_TIMEOUT): start = time.time() - self.get_public_ip() if self.public else self.get_private_ip() - while (time.time() - start < timeout): + self.get_instance_data() + if self.instance_data: + if self.public: + ip = self.instance_data.get('PublicIpAddress') + if ip: + self.public_ip = ip + else: + ip = self.instance_data.get('PrivateIpAddress') + if ip: + self.private_ip = ip + if self.is_ready(): start_time = round(time.time() - start, 2) logger.debug(f'{self} ready in {start_time} seconds') @@ -1153,10 +1322,36 @@ def wait_stopped(self, timeout=INSTANCE_STX_TIMEOUT): raise TimeoutError(f'Stop probe expired on {self}') + def _rollback_instance_resources(self, instance_created=False, spot_request_id=None): + """ + Deletes EC2 instance resources provisioned during a failed create(). + """ + logger.info(f'Rolling back resources provisioned for {self.name}') + + if spot_request_id: + try: + self.ec2_client.cancel_spot_instance_requests( + SpotInstanceRequestIds=[spot_request_id] + ) + except ClientError as err: + logger.warning(f'Rollback: could not cancel spot request: {err}') + + if instance_created and self.instance_id: + try: + self.ec2_client.terminate_instances(InstanceIds=[self.instance_id]) + except ClientError as err: + logger.warning(f'Rollback: could not terminate {self.name}: {err}') + + self.instance_data = None + self.instance_id = None + def _create_instance(self, user_data=None): """ Creates a new VM instance """ + instance_created = False + spot_request_id = None + ebs_volumes = self.config.get('ebs_volumes', []) BlockDeviceMappings = [] @@ -1196,81 +1391,142 @@ def _create_instance(self, user_data=None): 'Groups': [self.config['security_group_id']] }] - if BlockDeviceMappings is not None: + if BlockDeviceMappings: LaunchSpecification['BlockDeviceMappings'] = BlockDeviceMappings - if self.spot_instance and not self.public: + try: + if self.spot_instance and not self.public: - logger.debug(f"Creating new VM instance {self.name} (Spot)") + logger.debug(f"Creating new VM instance {self.name} (Spot)") - if user_data: - # Allow master VM to access workers trough ssh key or password - LaunchSpecification['UserData'] = b64s(user_data) + if user_data: + # Allow master VM to access workers trough ssh key or password + LaunchSpecification['UserData'] = b64s(user_data) - spot_request = self.ec2_client.request_spot_instances( - SpotPrice=str(self.config['spot_price']), - InstanceCount=1, - LaunchSpecification=LaunchSpecification)['SpotInstanceRequests'][0] + spot_request = self.ec2_client.request_spot_instances( + SpotPrice=str(self.config['spot_price']), + InstanceCount=1, + LaunchSpecification=LaunchSpecification)['SpotInstanceRequests'][0] - request_id = spot_request['SpotInstanceRequestId'] - failures = ['price-too-low', 'capacity-not-available'] + spot_request_id = spot_request['SpotInstanceRequestId'] + failures = ['price-too-low', 'capacity-not-available'] - while spot_request['State'] == 'open': - time.sleep(5) - spot_request = self.ec2_client.describe_spot_instance_requests( - SpotInstanceRequestIds=[request_id])['SpotInstanceRequests'][0] - - if spot_request['State'] == 'failed' or spot_request['Status']['Code'] in failures: - msg = "The spot request failed for the following reason: " + spot_request['Status']['Message'] - logger.debug(msg) - self.ec2_client.cancel_spot_instance_requests(SpotInstanceRequestIds=[request_id]) - raise Exception(msg) - else: - logger.debug(spot_request['Status']['Message']) + while spot_request['State'] == 'open': + time.sleep(5) + spot_request = self.ec2_client.describe_spot_instance_requests( + SpotInstanceRequestIds=[spot_request_id])['SpotInstanceRequests'][0] - self.ec2_client.create_tags( - Resources=[spot_request['InstanceId']], - Tags=[{'Key': 'Name', 'Value': self.name}] - ) + if spot_request['State'] == 'failed' or spot_request['Status']['Code'] in failures: + msg = "The spot request failed for the following reason: " + spot_request['Status']['Message'] + logger.debug(msg) + self.ec2_client.cancel_spot_instance_requests( + SpotInstanceRequestIds=[spot_request_id] + ) + raise Exception(msg) + else: + logger.debug(spot_request['Status']['Message']) - filters = [{'Name': 'instance-id', 'Values': [spot_request['InstanceId']]}] - resp = self.ec2_client.describe_instances(Filters=filters)['Reservations'][0] + self.ec2_client.create_tags( + Resources=[spot_request['InstanceId']], + Tags=[{'Key': 'Name', 'Value': self.name}] + ) - else: - logger.debug(f"Creating new VM instance {self.name}") + filters = [{'Name': 'instance-id', 'Values': [spot_request['InstanceId']]}] + resp = self.ec2_client.describe_instances(Filters=filters)['Reservations'][0] - LaunchSpecification['MinCount'] = 1 - LaunchSpecification['MaxCount'] = 1 - LaunchSpecification["TagSpecifications"] = [{"ResourceType": "instance", "Tags": [{'Key': 'Name', 'Value': self.name}]}] - LaunchSpecification["InstanceInitiatedShutdownBehavior"] = 'terminate' if self.delete_on_dismantle else 'stop' + else: + logger.debug(f"Creating new VM instance {self.name}") - if user_data: - LaunchSpecification['UserData'] = user_data + LaunchSpecification['MinCount'] = 1 + LaunchSpecification['MaxCount'] = 1 + LaunchSpecification["TagSpecifications"] = [{"ResourceType": "instance", "Tags": [{'Key': 'Name', 'Value': self.name}]}] + LaunchSpecification["InstanceInitiatedShutdownBehavior"] = 'terminate' if self.delete_on_dismantle else 'stop' - resp = self.ec2_client.run_instances(**LaunchSpecification) + if user_data: + LaunchSpecification['UserData'] = user_data - logger.debug(f"VM instance {self.name} created successfully ") + resp = self.ec2_client.run_instances(**LaunchSpecification) - self.instance_data = resp['Instances'][0] - self.instance_id = self.instance_data['InstanceId'] + logger.debug(f"VM instance {self.name} created successfully ") - return self.instance_data + self.instance_data = resp['Instances'][0] + self.instance_id = self.instance_data['InstanceId'] + instance_created = True + + return self.instance_data + except Exception: + self._rollback_instance_resources( + instance_created=instance_created, + spot_request_id=spot_request_id, + ) + raise + + def _describe_instance_by_id(self): + """ + Describe this instance by ID, retrying on EC2 eventual consistency. + """ + last_error = None + for attempt in range(12): + try: + res = self.ec2_client.describe_instances( + InstanceIds=[self.instance_id] + ) + return res['Reservations'] + except ClientError as err: + code = err.response.get('Error', {}).get('Code', '') + if code != 'InvalidInstanceID.NotFound': + raise + last_error = err + if attempt < 11: + logger.debug( + f'Instance {self.instance_id} not visible yet ' + f'(attempt {attempt + 1}/12), retrying...' + ) + time.sleep(2) + continue + logger.debug( + f'Instance {self.instance_id} not found by ID, ' + f'looking up {self.name} by tag' + ) + self.instance_id = None + self.instance_data = None + return [] + if last_error: + raise last_error + return [] + + def _describe_instances_by_name(self): + filters = [ + {'Name': 'tag:Name', 'Values': [self.name]}, + {'Name': 'instance-state-name', + 'Values': ['pending', 'running', 'stopping', 'stopped']}, + ] + res = self.ec2_client.describe_instances(Filters=filters) + instances = [ + inst for reservation in res['Reservations'] + for inst in reservation.get('Instances', []) + if inst.get('State', {}).get('Name') != 'terminated' + ] + if not instances: + return [] + instances.sort(key=lambda inst: inst.get('LaunchTime', ''), reverse=True) + return [{'Instances': [instances[0]]}] def get_instance_data(self): """ Returns the instance information """ if self.instance_id: - res = self.ec2_client.describe_instances(InstanceIds=[self.instance_id]) - reserv = res['Reservations'] + reserv = self._describe_instance_by_id() else: - filters = [{'Name': 'tag:Name', 'Values': [self.name]}] - res = self.ec2_client.describe_instances(Filters=filters) - reserv = res['Reservations'] + reserv = [] + + if not reserv: + reserv = self._describe_instances_by_name() - instance_data = reserv[0]['Instances'][0] if len(reserv) > 0 else None + instance_data = reserv[0]['Instances'][0] if reserv else None - if instance_data and instance_data['State']['Name'] != 'terminated': + if instance_data: self.instance_data = instance_data self.instance_id = instance_data['InstanceId'] self.private_ip = self.instance_data.get('PrivateIpAddress') diff --git a/lithops/standalone/backends/azure_vms/azure_vms.py b/lithops/standalone/backends/azure_vms/azure_vms.py index e67402650..a04b45214 100644 --- a/lithops/standalone/backends/azure_vms/azure_vms.py +++ b/lithops/standalone/backends/azure_vms/azure_vms.py @@ -23,21 +23,39 @@ from concurrent.futures import ThreadPoolExecutor from azure.identity import DefaultAzureCredential from azure.mgmt.compute import ComputeManagementClient +from azure.mgmt.compute.models import ( + HardwareProfile, + ImageReference, + LinuxConfiguration, + ManagedDiskParameters, + NetworkInterfaceReference, + NetworkProfile, + OSDisk, + OSProfile, + SshConfiguration, + SshPublicKey, + StorageProfile, + VirtualMachine, +) from azure.mgmt.network import NetworkManagementClient -from azure.core.exceptions import ResourceNotFoundError +from azure.core.exceptions import HttpResponseError, ResourceNotFoundError from lithops.version import __version__ from lithops.util.ssh_client import SSHClient, ssh_boot_status_message from lithops.constants import COMPUTE_CLI_MSG, CACHE_DIR, SA_CONFIG_FILE from lithops.config import load_yaml_config, dump_yaml_config -from lithops.standalone.utils import StandaloneMode +from lithops.standalone.utils import ( + StandaloneMode, + prepare_standalone_clean, + standalone_clean_stop_early, +) from lithops.standalone import LithopsValidationError logger = logging.getLogger(__name__) INSTANCE_START_TIMEOUT = 180 -DEFAULT_UBUNTU_IMAGE = 'Canonical:0001-com-ubuntu-server-jammy:22_04-lts-gen2:latest' +DEFAULT_UBUNTU_IMAGE = 'Canonical:ubuntu-24_04-lts:server:latest' def b64s(string): @@ -75,6 +93,7 @@ def __init__(self, config, mode): self.master = None self.workers = [] self.instance_types = {} + self._init_created = None msg = COMPUTE_CLI_MSG.format('Azure Virtual Machines') logger.info(f"{msg} - Region: {self.location}") @@ -83,6 +102,8 @@ def is_initialized(self): """ Checks if the backend is initialized """ + if self.mode == StandaloneMode.CONSUME.value: + return True return os.path.isfile(self.cache_file) def _load_azure_vms_data(self): @@ -94,7 +115,7 @@ def _load_azure_vms_data(self): if self.azure_data: logger.debug(f'Azure VMs data loaded from {self.cache_file}') - if 'vnet_name' in self.azure_data: + if self.azure_data and 'vnet_name' in self.azure_data: self.vnet_key = self.azure_data['vnet_id'][-6:] self.vnet_name = self.azure_data['vnet_name'] @@ -111,19 +132,92 @@ def _delete_vpc_data(self): if os.path.exists(self.cache_file): os.remove(self.cache_file) + def _reset_init_created(self): + self._init_created = { + 'vnet': False, + 'subnet': False, + 'nsg': False, + 'floating_ip': False, + 'ssh_key': False, + } + + def _safe_rollback_delete(self, delete_fn, resource_desc): + try: + delete_fn() + except ResourceNotFoundError: + pass + except Exception as err: + logger.warning(f'Rollback: could not delete {resource_desc}: {err}') + + def _rollback_init_resources(self): + """ + Deletes Azure resources created during a failed init(). + """ + if not self._init_created: + return + + logger.info('Rolling back Azure VMs resources provisioned during failed init') + rg = self.config['resource_group'] + created = self._init_created + + if created.get('floating_ip') and self.config.get('floating_ip_name'): + fip_name = self.config['floating_ip_name'] + self._safe_rollback_delete( + lambda: self.network_client.public_ip_addresses.begin_delete(rg, fip_name).result(), + f'public IP {fip_name}', + ) + + if created.get('nsg') and self.config.get('security_group_name'): + sg_name = self.config['security_group_name'] + self._safe_rollback_delete( + lambda: self.network_client.network_security_groups.begin_delete(rg, sg_name).result(), + f'network security group {sg_name}', + ) + + if created.get('subnet') and self.config.get('vnet_name') and self.config.get('subnet_name'): + vnet_name = self.config['vnet_name'] + subnet_name = self.config['subnet_name'] + self._safe_rollback_delete( + lambda: self.network_client.subnets.begin_delete( + rg, vnet_name, subnet_name + ).result(), + f'subnet {subnet_name}', + ) + + if created.get('vnet') and self.config.get('vnet_name'): + vnet_name = self.config['vnet_name'] + self._safe_rollback_delete( + lambda: self.network_client.virtual_networks.begin_delete(rg, vnet_name).result(), + f'virtual network {vnet_name}', + ) + + if created.get('ssh_key'): + key_filename = self.config.get('ssh_key_filename') + if key_filename and 'lithops-key-' in key_filename: + for path in (key_filename, f'{key_filename}.pub'): + if os.path.isfile(path): + os.remove(path) + + if self.vnet_data_type == 'created': + self._delete_vpc_data() + + self._init_created = None + def _create_vnet(self): """ Creates a new Virtual Network """ if 'vnet_name' in self.config: + logger.debug(f'Using user-provided virtual network {self.config["vnet_name"]}') return - if 'vnet_name' in self.azure_data: + if self.azure_data and 'vnet_name' in self.azure_data: vnets_info = list(self.network_client.virtual_networks.list(self.config['resource_group'])) for vnet in vnets_info: if vnet.name == self.vnet_name: self.config['vnet_id'] = vnet.id self.config['vnet_name'] = vnet.name + logger.debug(f'Using existing virtual network {vnet.name}') return self.vnet_name = self.config.get('vnet_name', f'lithops-vnet-{str(uuid.uuid4())[-6:]}') @@ -137,6 +231,7 @@ def _create_vnet(self): if vnet.name == self.vnet_name: self.config['vnet_id'] = vnet.id self.config['vnet_name'] = vnet.name + logger.debug(f'Using existing virtual network {vnet.name}') break if 'vnet_name' not in self.config: @@ -152,36 +247,40 @@ def _create_vnet(self): }, ) vnet_result = poller.result() - logger.debug( - f"Provisioned virtual network {vnet_result.name} with address prefixes {vnet_result.address_space.address_prefixes}" - ) self.config['vnet_id'] = vnet_result.id self.config['vnet_name'] = vnet_result.name + if self._init_created is not None: + self._init_created['vnet'] = True def _create_subnet(self): """ Creates a new subnet """ if 'subnet_name' in self.config: + logger.debug(f'Using user-provided virtual subnet {self.config["subnet_name"]}') return - if 'subnet_name' in self.azure_data: + if self.azure_data and 'subnet_name' in self.azure_data: subnets_info = list(self.network_client.subnets.list(self.config['resource_group'], self.vnet_name)) for subnet in subnets_info: if subnet.name == self.azure_data['subnet_name']: self.config['subnet_id'] = subnet.id self.config['subnet_name'] = subnet.name + logger.debug(f'Using existing virtual subnet {subnet.name}') return self.subnet_name = self.vnet_name + '-subnet' subnets_info = list(self.network_client.subnets.list(self.config['resource_group'], self.vnet_name)) for subnet in subnets_info: - if subnet.name == self.azure_data['subnet_name']: + if subnet.name == self.subnet_name: self.config['subnet_id'] = subnet.id self.config['subnet_name'] = subnet.name + logger.debug(f'Using existing virtual subnet {subnet.name}') + break if 'subnet_name' not in self.config: + logger.debug(f'Creating virtual subnet {self.subnet_name}') poller = self.network_client.subnets.begin_create_or_update( self.config['resource_group'], self.vnet_name, @@ -189,28 +288,47 @@ def _create_subnet(self): {"address_prefix": "10.0.0.0/24"}, ) subnet_result = poller.result() + self.config['subnet_id'] = subnet_result.id + self.config['subnet_name'] = subnet_result.name + if self._init_created is not None: + self._init_created['subnet'] = True + def _use_security_group(self, sg_info): + """ + Reuse an existing security group when it is in the configured region. + """ + if sg_info.location.lower() != self.location.lower(): logger.debug( - f"Provisioned virtual subnet {subnet_result.name} with address prefix {subnet_result.address_prefix}" + f'Skipping security group {sg_info.name} in {sg_info.location}; ' + f'expected region {self.location}' ) - self.config['subnet_id'] = subnet_result.id - self.config['subnet_name'] = subnet_result.name + return False + + self.config['security_group_id'] = sg_info.id + self.config['security_group_name'] = sg_info.name + logger.debug( + f'Using existing network security group {sg_info.name} in {sg_info.location}' + ) + return True def _create_security_group(self): """ Creates a new Security group """ if 'security_group_id' in self.config: + logger.debug( + f'Using user-provided network security group ' + f'{self.config.get("security_group_name", self.config["security_group_id"])}' + ) return - if 'security_group_id' in self.azure_data: + if self.azure_data and 'security_group_id' in self.azure_data: try: sg_info = self.network_client.network_security_groups.get( self.config['resource_group'], self.azure_data['security_group_name'] ) - self.config['security_group_id'] = sg_info.id - self.config['security_group_name'] = sg_info.name - return + if self._use_security_group(sg_info): + return except ResourceNotFoundError: pass @@ -220,12 +338,12 @@ def _create_security_group(self): sg_info = self.network_client.network_security_groups.get( self.config['resource_group'], security_group_name ) - self.config['security_group_id'] = sg_info.id - self.config['security_group_name'] = sg_info.name + self._use_security_group(sg_info) except ResourceNotFoundError: pass if 'security_group_id' not in self.config: + logger.debug(f'Creating network security group {security_group_name}') nsg_rules = [ { "name": "allow-ssh", @@ -289,6 +407,8 @@ def _create_security_group(self): self.config['security_group_name'] = sg_result.name self.config['security_group_id'] = sg_result.id + if self._init_created is not None: + self._init_created['nsg'] = True def _create_master_floating_ip(self): """ @@ -303,10 +423,13 @@ def get_floating_ip(fip_name): self.config['floating_ip'] = fip_info.ip_address self.config['floating_ip_name'] = fip_info.name self.config['floating_ip_id'] = fip_info.id + logger.debug( + f'Using existing public IP address {fip_info.ip_address} ({fip_info.name})' + ) except ResourceNotFoundError: pass - if 'floating_ip_id' in self.azure_data: + if self.azure_data and 'floating_ip_id' in self.azure_data: get_floating_ip(self.azure_data['floating_ip_name']) floating_ip_name = self.vnet_name + '-ip' @@ -315,6 +438,7 @@ def get_floating_ip(fip_name): get_floating_ip(floating_ip_name) if 'floating_ip_id' not in self.config: + logger.debug(f'Creating public IP address {floating_ip_name}') poller = self.network_client.public_ip_addresses.begin_create_or_update( self.config['resource_group'], floating_ip_name, @@ -326,21 +450,24 @@ def get_floating_ip(fip_name): }, ) ip_address_result = poller.result() - logger.debug(f"Provisioned public IP address {ip_address_result.ip_address}") self.config['floating_ip'] = ip_address_result.ip_address self.config['floating_ip_name'] = ip_address_result.name self.config['floating_ip_id'] = ip_address_result.id + if self._init_created is not None: + self._init_created['floating_ip'] = True def _create_ssh_key(self): """ Creates a new ssh key pair """ if 'ssh_key_filename' in self.config: + logger.debug(f'Using user-provided SSH key pair {self.config["ssh_key_filename"]}') return - if 'ssh_key_filename' in self.azure_data: + if self.azure_data and 'ssh_key_filename' in self.azure_data: if os.path.isfile(self.azure_data['ssh_key_filename']): self.config['ssh_key_filename'] = self.azure_data['ssh_key_filename'] + logger.debug(f'Using existing SSH key pair {self.config["ssh_key_filename"]}') return keyname = f'lithops-key-{str(uuid.uuid4())[-8:]}' @@ -351,6 +478,8 @@ def _create_ssh_key(self): logger.debug("Generating new ssh key pair") os.system(f'ssh-keygen -b 2048 -t rsa -f {key_filename} -q -N ""') logger.debug(f"SHH key pair generated: {key_filename}") + if self._init_created is not None: + self._init_created['ssh_key'] = True self.config['ssh_key_filename'] = key_filename @@ -358,7 +487,7 @@ def _get_all_instance_types(self): """ Get all virtual machine sizes in the specified location """ - if 'instance_types' in self.azure_data: + if self.azure_data and 'instance_types' in self.azure_data: self.instance_types = self.azure_data['instance_types'] return @@ -393,72 +522,69 @@ def init(self): """ logger.debug(f'Initializing Azure Virtual Machines backend ({self.mode} mode)') - self._load_azure_vms_data() - if self.mode == StandaloneMode.CONSUME.value: - instance_name = self.config['instance_name'] - if not self.azure_data or instance_name != self.azure_data.get('instance_name'): + if 'master_instance_type' not in self.config: try: instance_data = self.compute_client.virtual_machines.get( - self.config['resource_group'], instance_name + self.config['resource_group'], self.config['instance_name'] ) except ResourceNotFoundError: - raise Exception(f"VM Instance {instance_name} does not exist") - - self.azure_data = { - 'mode': self.mode, - 'vnet_data_type': 'provided', - 'ssh_data_type': 'provided', - 'instance_name': self.config['instance_name'], - 'master_id': instance_data.vm_id, - 'instance_type': instance_data.hardware_profile.vm_size - } - - # Create the master VM instance - self.config['master_instance_type'] = self.azure_data['instance_type'] + raise Exception( + f"VM Instance {self.config['instance_name']} does not exist" + ) + self.config['master_instance_type'] = instance_data.hardware_profile.vm_size self._create_master_instance() + return - elif self.mode in [StandaloneMode.CREATE.value, StandaloneMode.REUSE.value]: - - # Create the Virtual Netowrk if not exists - self._create_vnet() - - # Set the suffix used for the VNET resources - self.vnet_key = self.config['vnet_id'][-6:] - - # Create the Subnet if not exists - self._create_subnet() - # Create the security group if not exists - self._create_security_group() - # Create the master VM floating IP address - self._create_master_floating_ip() - # Create the ssh key pair if not exists - self._create_ssh_key() - # Request instance types - self._get_all_instance_types() + self._load_azure_vms_data() - # Create the master VM instance - self._create_master_instance() + if self.mode in [StandaloneMode.CREATE.value, StandaloneMode.REUSE.value]: - self.azure_data = { - 'mode': self.mode, - 'vnet_data_type': self.vnet_data_type, - 'ssh_data_type': self.ssh_data_type, - 'master_name': self.master.name, - 'master_id': self.vnet_key, - 'vnet_name': self.config['vnet_name'], - 'vnet_id': self.config['vnet_id'], - 'subnet_name': self.config['subnet_name'], - 'subnet_id': self.config['subnet_id'], - 'ssh_key_filename': self.config['ssh_key_filename'], - 'security_group_id': self.config['security_group_id'], - 'security_group_name': self.config['security_group_name'], - 'floating_ip_id': self.config['floating_ip_id'], - 'floating_ip_name': self.config['floating_ip_name'], - 'instance_types': self.instance_types - } + self._reset_init_created() + try: + # Create the Virtual Netowrk if not exists + self._create_vnet() + + # Set the suffix used for the VNET resources + self.vnet_key = self.config['vnet_id'][-6:] + + # Create the Subnet if not exists + self._create_subnet() + # Create the security group if not exists + self._create_security_group() + # Create the master VM floating IP address + self._create_master_floating_ip() + # Create the ssh key pair if not exists + self._create_ssh_key() + # Request instance types + self._get_all_instance_types() + + # Create the master VM instance + self._create_master_instance() - self._dump_azure_vms_data() + self.azure_data = { + 'mode': self.mode, + 'vnet_data_type': self.vnet_data_type, + 'ssh_data_type': self.ssh_data_type, + 'master_name': self.master.name, + 'master_id': self.vnet_key, + 'vnet_name': self.config['vnet_name'], + 'vnet_id': self.config['vnet_id'], + 'subnet_name': self.config['subnet_name'], + 'subnet_id': self.config['subnet_id'], + 'ssh_key_filename': self.config['ssh_key_filename'], + 'security_group_id': self.config['security_group_id'], + 'security_group_name': self.config['security_group_name'], + 'floating_ip_id': self.config['floating_ip_id'], + 'floating_ip_name': self.config['floating_ip_name'], + 'instance_types': self.instance_types + } + self._dump_azure_vms_data() + except Exception: + self._rollback_init_resources() + raise + finally: + self._init_created = None def build_image(self, image_name, script_file, overwrite, include, extra_args=[]): """ @@ -491,19 +617,26 @@ def list_images(self): def _delete_vm_instances(self, all=False): """ - Deletes all worker VM instances + Deletes Lithops VM instances in the resource group. + When all=True, every lithops master/worker is removed (any VNet). """ - msg = (f"Deleting Lithops VMs from {self.azure_data['vnet_name']}") - logger.info(msg) + if all: + logger.info(f'Deleting all Lithops VMs in {self.config["resource_group"]}') + else: + logger.info(f'Deleting Lithops worker VMs from {self.azure_data["vnet_name"]}') vms_prefixes = ('lithops-worker', 'lithops-master') if all else ('lithops-worker',) instances_to_delete = [] vms_info = self.compute_client.virtual_machines.list(self.config['resource_group']) for vm in vms_info: - if 'type' in vm.tags and vm.tags['type'] == 'lithops-runtime' \ - and vm.name.startswith(vms_prefixes) and vm.tags['lithops_vnet'] == self.vnet_name: - instances_to_delete.append(vm) + if 'type' not in vm.tags or vm.tags['type'] != 'lithops-runtime': + continue + if not vm.name.startswith(vms_prefixes): + continue + if not all and vm.tags.get('lithops_vnet') != self.vnet_name: + continue + instances_to_delete.append(vm) def delete_instance(instance): logger.debug(f"Deleting VM instance {instance.name}") @@ -530,14 +663,41 @@ def delete_instance(instance): futures = [executor.submit(delete_instance, i) for i in instances_to_delete] [fut.result() for fut in futures] - master_pk = os.path.join(self.cache_dir, f"{self.azure_data['master_name']}-id_rsa.pub") - if all and os.path.isfile(master_pk): - os.remove(master_pk) + if all and self.azure_data: + master_pk = os.path.join( + self.cache_dir, f"{self.azure_data['master_name']}-id_rsa.pub" + ) + if os.path.isfile(master_pk): + os.remove(master_pk) - if self.azure_data['vnet_data_type'] == 'provided': + if self.azure_data and self.azure_data.get('vnet_data_type') == 'provided': return - def _delete_vnet_and_subnet(self): + def _try_delete_security_group(self, security_group_name): + """ + Delete the Lithops NSG if it is no longer attached to any NIC. + """ + if not security_group_name: + return + + try: + logger.debug(f'Deleting network security group {security_group_name}') + self.network_client.network_security_groups.begin_delete( + self.config['resource_group'], + security_group_name, + ).result() + except ResourceNotFoundError: + pass + except HttpResponseError as err: + if 'InUseNetworkSecurityGroupCannotBeDeleted' in str(err): + logger.warning( + f'Network security group {security_group_name} is still in use; ' + 'delete remaining Lithops VMs/NICs and run clean again' + ) + else: + logger.warning(f'Could not delete network security group {security_group_name}: {err}') + + def _delete_vnet_and_subnet(self, delete_security_group=False): """ Deletes all the Azure VMs resources """ @@ -575,6 +735,9 @@ def _delete_vnet_and_subnet(self): except ResourceNotFoundError: pass + if delete_security_group: + self._try_delete_security_group(self.azure_data.get('security_group_name')) + def _delete_ssh_key(self): """ Deletes the ssh key @@ -591,20 +754,21 @@ def _delete_ssh_key(self): def clean(self, all=False): """ - Clean all the VPC resources + Clean Lithops resources for the configured region (cache file for + ``location`` in config). Same flow as the other standalone cloud backends. """ logger.info('Cleaning Azure Virtual Machines resources') - if not self.azure_data: - return + prepare_standalone_clean(self, self._load_azure_vms_data) + if standalone_clean_stop_early( + self, self.azure_data, self._delete_vpc_data, all): + return True - if self.azure_data['mode'] == StandaloneMode.CONSUME.value: + self._delete_vm_instances(all=all) + if all: + self._delete_vnet_and_subnet(delete_security_group=True) + self._delete_ssh_key() self._delete_vpc_data() - else: - self._delete_vm_instances(all=all) - self._delete_vnet_and_subnet() if all else None - self._delete_ssh_key() if all else None - self._delete_vpc_data() if all else None def clear(self, job_keys=None): """ @@ -667,7 +831,11 @@ def get_runtime_key(self, runtime_name, version=__version__): Creates the runtime key """ name = runtime_name.replace('/', '-').replace(':', '-') - runtime_key = os.path.join(self.name, version, self.azure_data['master_id'], name) + if self.mode == StandaloneMode.CONSUME.value: + master_id = self.master.instance_id + else: + master_id = self.azure_data['master_id'] + runtime_key = os.path.join(self.name, version, master_id, name) return runtime_key @@ -782,12 +950,47 @@ def wait_ready(self, timeout=INSTANCE_START_TIMEOUT): raise TimeoutError(f'Readiness probe expired on {self}') + def _rollback_instance_resources(self, vm_created=False, nic_created=False, public_ip_created=False): + """ + Deletes VM instance resources provisioned during a failed create(). + """ + rg = self.config['resource_group'] + logger.info(f'Rolling back resources provisioned for {self.name}') + + if vm_created: + try: + self._delete_instance() + except Exception as err: + logger.warning(f'Rollback: could not delete VM {self.name}: {err}') + + if nic_created and not vm_created: + nic_name = self.name + '-nic' + try: + self.network_client.network_interfaces.begin_delete(rg, nic_name).result() + except ResourceNotFoundError: + pass + except Exception as err: + logger.warning(f'Rollback: could not delete NIC {nic_name}: {err}') + + if public_ip_created: + ip_name = self.name + '-ip' + try: + self.network_client.public_ip_addresses.begin_delete(rg, ip_name).result() + except ResourceNotFoundError: + pass + except Exception as err: + logger.warning(f'Rollback: could not delete public IP {ip_name}: {err}') + def _create_instance(self, user_data=None): """ Creates a new VM instance """ logger.debug(f"Creating new VM instance {self.name}") + nic_created = False + public_ip_created = False + vm_created = False + # Create NIC nic_params = { 'location': self.location, @@ -798,101 +1001,110 @@ def _create_instance(self, user_data=None): "network_security_group": {"id": self.config['security_group_id']} } - if self.public and not self.public_ip: - poller = self.network_client.public_ip_addresses.begin_create_or_update( - self.config['resource_group'], - self.name + '-ip', - { - "location": self.location, - "sku": {"name": "Standard"}, - "public_ip_allocation_method": "Static", - "public_ip_address_version": "IPV4", - }, - ) - ip_address_result = poller.result() - self.public_ip = ip_address_result.ip_address - logger.debug(f"Provisioned public IP address {self.public_ip}") - nic_params['ip_configurations'][0]['public_ip_address'] = {"id": ip_address_result.id} - - elif self.public: - nic_params['ip_configurations'][0]['public_ip_address'] = {"id": self.config['floating_ip_id']} + try: + if self.public and not self.public_ip: + poller = self.network_client.public_ip_addresses.begin_create_or_update( + self.config['resource_group'], + self.name + '-ip', + { + "location": self.location, + "sku": {"name": "Standard"}, + "public_ip_allocation_method": "Static", + "public_ip_address_version": "IPV4", + }, + ) + ip_address_result = poller.result() + public_ip_created = True + self.public_ip = ip_address_result.ip_address + nic_params['ip_configurations'][0]['public_ip_address'] = {"id": ip_address_result.id} - poller = self.network_client.network_interfaces.begin_create_or_update( - self.config['resource_group'], - self.name + '-nic', - nic_params - ) - nic_data = poller.result() - self.private_ip = nic_data.ip_configurations[0].private_ip_address + elif self.public: + nic_params['ip_configurations'][0]['public_ip_address'] = {"id": self.config['floating_ip_id']} - # Create VM - vm_username = self.ssh_credentials['username'] - with open(self.ssh_credentials['key_filename'] + '.pub', 'r') as pk: - vm_pk_data = pk.read().strip() + poller = self.network_client.network_interfaces.begin_create_or_update( + self.config['resource_group'], + self.name + '-nic', + nic_params + ) + nic_data = poller.result() + nic_created = True + self.private_ip = nic_data.ip_configurations[0].private_ip_address + + # Create VM + vm_username = self.ssh_credentials['username'] + with open(self.ssh_credentials['key_filename'] + '.pub', 'r') as pk: + vm_pk_data = pk.read().strip() + + image_publisher, image_offer, image_sku, image_version = DEFAULT_UBUNTU_IMAGE.split(':') + image_reference = ( + ImageReference(id=self.config['image_id']) + if 'image_id' in self.config + else ImageReference( + publisher=image_publisher, + offer=image_offer, + sku=image_sku, + version=image_version, + ) + ) - vm_parameters = { - 'location': self.location, - 'tags': { - 'type': 'lithops-runtime', - 'lithops_version': str(__version__), - 'lithops_vnet': self.config['vnet_name'] - }, - 'os_profile': { - 'computer_name': self.name, - 'admin_username': vm_username, - 'linux_configuration': { - 'disable_password_authentication': True, - "ssh": { - "public_keys": [ - { - "path": f"/home/{vm_username}/.ssh/authorized_keys", - "key_data": vm_pk_data - } - ] - } + vm = VirtualMachine( + location=self.location, + tags={ + 'type': 'lithops-runtime', + 'lithops_version': str(__version__), + 'lithops_vnet': self.config['vnet_name'], }, - }, - 'hardware_profile': { - 'vm_size': self.instance_type - }, - 'storage_profile': { - 'image_reference': { - 'publisher': DEFAULT_UBUNTU_IMAGE.split(':')[0], - 'offer': DEFAULT_UBUNTU_IMAGE.split(':')[1], - 'sku': DEFAULT_UBUNTU_IMAGE.split(':')[2], - 'version': DEFAULT_UBUNTU_IMAGE.split(':')[3] - }, - 'osDisk': { - 'name': self.name + '-osdisk', - 'createOption': 'fromImage', - 'managedDisk': { - 'storageAccountType': 'Standard_LRS' - } - } - }, - 'network_profile': { - 'network_interfaces': [{ - 'id': nic_data.id, - 'properties': { - 'primary': True - } - }] - } - } - - if 'image_id' in self.config: - vm_parameters['storage_profile']['image_reference'] = {"id": self.config['image_id']} + os_profile=OSProfile( + computer_name=self.name, + admin_username=vm_username, + linux_configuration=LinuxConfiguration( + disable_password_authentication=True, + ssh=SshConfiguration( + public_keys=[ + SshPublicKey( + path=f'/home/{vm_username}/.ssh/authorized_keys', + key_data=vm_pk_data, + ) + ] + ), + ), + ), + hardware_profile=HardwareProfile(vm_size=self.instance_type), + storage_profile=StorageProfile( + image_reference=image_reference, + os_disk=OSDisk( + name=self.name + '-osdisk', + create_option='FromImage', + managed_disk=ManagedDiskParameters( + storage_account_type='Standard_LRS', + ), + ), + ), + network_profile=NetworkProfile( + network_interfaces=[ + NetworkInterfaceReference(id=nic_data.id, primary=True) + ] + ), + ) - poller = self.compute_client.virtual_machines.begin_create_or_update( - self.config['resource_group'], - self.name, - vm_parameters - ) + poller = self.compute_client.virtual_machines.begin_create_or_update( + self.config['resource_group'], + self.name, + vm + ) - self.instance_data = poller.result() - self.instance_id = self.instance_data.vm_id + self.instance_data = poller.result() + vm_created = True + self.instance_id = self.instance_data.vm_id - return self.instance_data + return self.instance_data + except Exception: + self._rollback_instance_resources( + vm_created=vm_created, + nic_created=nic_created, + public_ip_created=public_ip_created, + ) + raise def get_instance_data(self): """ diff --git a/lithops/standalone/backends/gcp_compute_engine/__init__.py b/lithops/standalone/backends/gcp_compute_engine/__init__.py index 3a749babe..5305883a8 100644 --- a/lithops/standalone/backends/gcp_compute_engine/__init__.py +++ b/lithops/standalone/backends/gcp_compute_engine/__init__.py @@ -1,3 +1,5 @@ +from lithops import _grpc_env # noqa: F401 + from .gcp_compute_engine import GCPComputeEngineBackend as StandaloneBackend __all__ = ['StandaloneBackend'] diff --git a/lithops/standalone/backends/gcp_compute_engine/config.py b/lithops/standalone/backends/gcp_compute_engine/config.py index f5b593491..4509f70b5 100644 --- a/lithops/standalone/backends/gcp_compute_engine/config.py +++ b/lithops/standalone/backends/gcp_compute_engine/config.py @@ -1,6 +1,4 @@ # -# Copyright Cloudlab URV 2021 -# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -31,7 +29,6 @@ 'source_image': 'projects/ubuntu-os-cloud/global/images/family/ubuntu-2404-lts-amd64', 'ssh_username': 'ubuntu', 'ssh_password': str(uuid.uuid4()), - 'ssh_key_filename': '~/.ssh/id_rsa', 'delete_on_dismantle': True, 'max_workers': 100, 'request_spot_instances': False, diff --git a/lithops/standalone/backends/gcp_compute_engine/gcp_compute_engine.py b/lithops/standalone/backends/gcp_compute_engine/gcp_compute_engine.py index 7b8e2d015..93a89adfb 100644 --- a/lithops/standalone/backends/gcp_compute_engine/gcp_compute_engine.py +++ b/lithops/standalone/backends/gcp_compute_engine/gcp_compute_engine.py @@ -1,6 +1,4 @@ # -# Copyright Cloudlab URV 2021 -# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -38,6 +36,8 @@ CLOUD_CONFIG_WORKER, CLOUD_CONFIG_WORKER_PK, get_host_setup_script, + prepare_standalone_clean, + standalone_clean_stop_early, ) from lithops.standalone import LithopsValidationError @@ -46,10 +46,6 @@ INSTANCE_START_TIMEOUT = 180 UBUNTU_OS_PROJECT = 'ubuntu-os-cloud' -UBUNTU_LTS_FAMILIES = ( - 'ubuntu-2404-lts-amd64', - 'ubuntu-2204-lts', -) DEFAULT_UBUNTU_SOURCE_IMAGE = ( 'projects/ubuntu-os-cloud/global/images/family/ubuntu-2404-lts-amd64' ) @@ -86,6 +82,7 @@ def __init__(self, config, mode): self.master = None self.workers = [] self.instance_types = {} + self._init_created = None msg = COMPUTE_CLI_MSG.format('GCP Compute Engine') logger.info(f"{msg} - Zone: {self.zone} - Project: {self.project_name}") @@ -153,7 +150,14 @@ def _wait_operation(self, operation_name, scope='zone'): time.sleep(2) def _load_gce_data(self): + """ + Loads GCE data from local cache + """ self.gce_data = load_yaml_config(self.cache_file) + + if self.gce_data: + logger.debug(f'GCE data loaded from {self.cache_file}') + if self.gce_data and 'network_name' in self.gce_data: self.network_name = self.gce_data['network_name'] self.network_key = self.gce_data.get('network_key') @@ -165,6 +169,104 @@ def _delete_vpc_data(self): if os.path.exists(self.cache_file): os.remove(self.cache_file) + def _reset_init_created(self): + self._init_created = { + 'network': False, + 'subnet': False, + 'firewall': False, + 'internal_firewall': False, + 'router': False, + 'ssh_key': False, + } + + def _safe_rollback_delete(self, delete_fn, resource_desc): + try: + delete_fn() + except HttpError as err: + if getattr(err.resp, 'status', None) != 404: + logger.warning(f'Rollback: could not delete {resource_desc}: {err}') + except Exception as err: + logger.warning(f'Rollback: could not delete {resource_desc}: {err}') + + def _rollback_init_resources(self): + """ + Deletes GCE resources created during a failed init(). + """ + if not self._init_created: + return + + logger.info('Rolling back GCP Compute Engine resources provisioned during failed init') + created = self._init_created + + fw_names = [] + if created.get('firewall') and self.config.get('firewall_name'): + fw_names.append(self.config['firewall_name']) + if created.get('internal_firewall') and self.config.get('internal_firewall_name'): + fw_names.append(self.config['internal_firewall_name']) + + for fw_name in fw_names: + self._safe_rollback_delete( + lambda name=fw_name: self._wait_operation( + self.compute_client.firewalls().delete( + project=self.project_name, firewall=name + ).execute()['name'], + scope='global', + ), + f'firewall {fw_name}', + ) + + if created.get('router') and self.config.get('router_name'): + router_name = self.config['router_name'] + self._safe_rollback_delete( + lambda: self._wait_operation( + self.compute_client.routers().delete( + project=self.project_name, + region=self.region, + router=router_name, + ).execute()['name'], + scope='region', + ), + f'Cloud Router {router_name}', + ) + + if created.get('subnet') and self.config.get('subnet_name'): + subnet_name = self.config['subnet_name'] + self._safe_rollback_delete( + lambda: self._wait_operation( + self.compute_client.subnetworks().delete( + project=self.project_name, + region=self.region, + subnetwork=subnet_name, + ).execute()['name'], + scope='region', + ), + f'subnet {subnet_name}', + ) + + if created.get('network') and self.config.get('network_name'): + network_name = self.config['network_name'] + self._safe_rollback_delete( + lambda: self._wait_operation( + self.compute_client.networks().delete( + project=self.project_name, network=network_name + ).execute()['name'], + scope='global', + ), + f'network {network_name}', + ) + + if created.get('ssh_key'): + key_filename = self.config.get('ssh_key_filename') + if key_filename and 'lithops-key-' in key_filename: + for path in (key_filename, f'{key_filename}.pub'): + if os.path.isfile(path): + os.remove(path) + + if self.vpc_data_type == 'created': + self._delete_vpc_data() + + self._init_created = None + def _resource_exists(self, getter): try: getter() @@ -228,6 +330,8 @@ def _create_network(self): } op = self.compute_client.networks().insert(project=self.project_name, body=body).execute() self._wait_operation(op['name'], scope='global') + if self._init_created is not None: + self._init_created['network'] = True logger.debug( f'Creating subnet {subnet_name} in {self.region} ' @@ -244,6 +348,8 @@ def _create_network(self): project=self.project_name, region=self.region, body=body ).execute() self._wait_operation(op['name'], scope='region') + if self._init_created is not None: + self._init_created['subnet'] = True logger.debug(f'Creating firewall {firewall_name} (SSH tcp/22 from internet)') body = { @@ -254,6 +360,8 @@ def _create_network(self): } op = self.compute_client.firewalls().insert(project=self.project_name, body=body).execute() self._wait_operation(op['name'], scope='global') + if self._init_created is not None: + self._init_created['firewall'] = True internal_fw_name = f'{self.network_name}-internal-fw' logger.debug( @@ -268,6 +376,8 @@ def _create_network(self): } op = self.compute_client.firewalls().insert(project=self.project_name, body=body).execute() self._wait_operation(op['name'], scope='global') + if self._init_created is not None: + self._init_created['internal_firewall'] = True self.config['network_name'] = self.network_name self.config['subnet_name'] = subnet_name @@ -348,29 +458,33 @@ def _create_cloud_nat(self): self.config['router_name'] = router_name self.config['nat_name'] = nat_name + if self._init_created is not None: + self._init_created['router'] = True def _create_ssh_key(self): """ - Creates a new SSH key pair on the client (same pattern as AWS EC2 / IBM VPC). - Used for Lithops client -> master SSH; workers use the master lithops_id_rsa key. + Creates a new ssh key pair (same pattern as AWS EC2 / Azure VMs / IBM VPC). """ - if 'ssh_key_filename' in self.gce_data and os.path.isfile(self.gce_data['ssh_key_filename']): - self.config['ssh_key_filename'] = self.gce_data['ssh_key_filename'] + if 'ssh_key_filename' in self.config: return - user_key = os.path.expanduser(self.config.get('ssh_key_filename', '~/.ssh/id_rsa')) - if os.path.isfile(user_key) and 'lithops-key-' not in os.path.basename(user_key): - logger.debug(f'Using user-provided SSH key {user_key}') - self.config['ssh_key_filename'] = user_key - return + if 'ssh_key_filename' in self.gce_data: + key_filename = os.path.expanduser(self.gce_data['ssh_key_filename']) + if os.path.isfile(key_filename): + self.config['ssh_key_filename'] = key_filename + return keyname = f'lithops-key-{str(uuid.uuid4())[-8:]}' filename = os.path.join("~", ".ssh", f"{keyname}.{self.name}.id_rsa") key_filename = os.path.expanduser(filename) + if not os.path.isfile(key_filename): logger.debug("Generating new ssh key pair") os.system(f'ssh-keygen -b 2048 -t rsa -f {key_filename} -q -N ""') logger.debug(f"SSH key pair generated: {key_filename}") + if self._init_created is not None: + self._init_created['ssh_key'] = True + self.config['ssh_key_filename'] = key_filename def _load_instance_types(self): @@ -447,43 +561,52 @@ def init(self): self._create_master_instance() self._dump_gce_data() return - + elif self.mode in [StandaloneMode.CREATE.value, StandaloneMode.REUSE.value]: - self._create_network() - self._create_ssh_key() - self._request_source_image() - if 'instance_name' not in self.config: - self.config['instance_name'] = f'lithops-master-{self.network_key}' - self._create_master_instance() - self._load_instance_types() - self.gce_data = { - 'mode': self.mode, - 'vpc_data_type': self.vpc_data_type, - 'ssh_data_type': self.ssh_data_type, - 'master_name': self.master.name, - 'master_id': self.network_key, - 'network_name': self.config['network_name'], - 'network_key': self.network_key, - 'subnet_name': self.config['subnet_name'], - 'firewall_name': self.config['firewall_name'], - 'internal_firewall_name': self.config['internal_firewall_name'], - 'router_name': self.config.get('router_name'), - 'nat_name': self.config.get('nat_name'), - 'ssh_key_filename': self.config['ssh_key_filename'], - 'source_image': self.config['source_image'], - 'instance_types': self.instance_types, - } - self._dump_gce_data() + self._reset_init_created() + try: + self._create_network() + self._create_ssh_key() + self._request_source_image() + if 'instance_name' not in self.config: + self.config['instance_name'] = f'lithops-master-{self.network_key}' + self._create_master_instance() + self._load_instance_types() + self.gce_data = { + 'mode': self.mode, + 'vpc_data_type': self.vpc_data_type, + 'ssh_data_type': self.ssh_data_type, + 'master_name': self.master.name, + 'master_id': self.network_key, + 'network_name': self.config['network_name'], + 'network_key': self.network_key, + 'subnet_name': self.config['subnet_name'], + 'firewall_name': self.config['firewall_name'], + 'internal_firewall_name': self.config['internal_firewall_name'], + 'router_name': self.config.get('router_name'), + 'nat_name': self.config.get('nat_name'), + 'ssh_key_filename': self.config['ssh_key_filename'], + 'source_image': self.config['source_image'], + 'instance_types': self.instance_types, + } + self._dump_gce_data() + except Exception: + self._rollback_init_resources() + raise + finally: + self._init_created = None @staticmethod def _is_default_ubuntu_source_image(source_image): if not source_image: return True - return ( - source_image == DEFAULT_UBUNTU_SOURCE_IMAGE - or source_image.endswith('/family/ubuntu-2404-lts-amd64') - or source_image.endswith('/family/ubuntu-2204-lts') - ) + if source_image == DEFAULT_UBUNTU_SOURCE_IMAGE: + return True + ubuntu_prefix = f'projects/{UBUNTU_OS_PROJECT}/global/images/' + if not source_image.startswith(ubuntu_prefix): + return False + path = source_image[len(ubuntu_prefix):] + return path.startswith('family/ubuntu-') or path.startswith('ubuntu-') def _project_image_ref(self, image_name): return f'projects/{self.project_name}/global/images/{image_name}' @@ -679,26 +802,43 @@ def _iter_project_images(self, project): previous_request=request, previous_response=response ) + @staticmethod + def _ubuntu_lts_major_version(name, family=''): + for text in (family, name): + match = re.search(r'(\d{2})\d{2}', text) + if match: + return int(match.group(1)) + return None + + @staticmethod + def _include_ubuntu_lts_image(image, min_major=22): + if image.get('deprecated') or image.get('status') != 'READY': + return False + name = image.get('name', '') + family = image.get('family', '') + if '-lts' not in name and '-lts' not in family: + return False + major = GCPComputeEngineBackend._ubuntu_lts_major_version(name, family) + return major is not None and major >= min_major + def list_images(self, **kwargs): """ - List Ubuntu LTS image families (latest) and custom Lithops images in the project. - Returns tuples of (name, image_id, creation_date) like other standalone backends. + List Ubuntu LTS images (22.04+) from ubuntu-os-cloud and Lithops images + in the project. Returns tuples of (name, image_id, creation_date). """ result = set() - for family in UBUNTU_LTS_FAMILIES: - try: - image = self.compute_client.images().getFromFamily( - project=UBUNTU_OS_PROJECT, family=family - ).execute() - except HttpError as err: - if getattr(err.resp, 'status', None) == 404: - continue - raise - + for image in self._iter_project_images(UBUNTU_OS_PROJECT): + if not self._include_ubuntu_lts_image(image): + continue + name = image['name'] created_at = self._format_image_timestamp(image.get('creationTimestamp')) - family_ref = f'projects/{UBUNTU_OS_PROJECT}/global/images/family/{family}' - result.add((image['name'], family_ref, created_at)) + family = image.get('family') + if family: + image_id = f'projects/{UBUNTU_OS_PROJECT}/global/images/family/{family}' + else: + image_id = f'projects/{UBUNTU_OS_PROJECT}/global/images/{name}' + result.add((name, image_id, created_at)) for image in self._iter_project_images(self.project_name): name = image.get('name', '') @@ -718,11 +858,9 @@ def clean(self, **kwargs): all_clean = kwargs.get('all', False) logger.info('Cleaning GCP Compute Engine resources') - if not self.gce_data: - self._load_gce_data() - - if self.mode == StandaloneMode.CONSUME.value: - self._delete_vpc_data() + prepare_standalone_clean(self, self._load_gce_data) + if standalone_clean_stop_early( + self, self.gce_data, self._delete_vpc_data, all_clean): return True try: @@ -925,7 +1063,10 @@ def create_worker(self, name): def get_runtime_key(self, runtime_name, version=__version__): runtime = runtime_name.replace('/', '-').replace(':', '-') - master_id = self.gce_data.get('master_id', self.config.get('instance_name', self.master.name)) + if self.mode == StandaloneMode.CONSUME.value: + master_id = self.config['instance_name'] + else: + master_id = self.gce_data['master_id'] return os.path.join(self.name, version, master_id, runtime) @@ -1178,13 +1319,35 @@ def create(self, public=False, ssh_public_key=None, user_data=None, 'provisioningModel': 'SPOT', 'instanceTerminationAction': 'STOP' } - op = self.compute_client.instances().insert( - project=self.project_name, - zone=self.zone, - body=body - ).execute() - self._wait_zone_operation(op['name']) - self.get_instance_data() + try: + op = self.compute_client.instances().insert( + project=self.project_name, + zone=self.zone, + body=body + ).execute() + self._wait_zone_operation(op['name']) + self.get_instance_data() + except Exception: + self._rollback_instance_resources() + raise + + def _rollback_instance_resources(self): + """ + Deletes a GCE VM provisioned during a failed create(). + """ + logger.info(f'Rolling back resources provisioned for {self.name}') + if self._exists(): + try: + op = self.compute_client.instances().delete( + project=self.project_name, zone=self.zone, instance=self.name + ).execute() + self._wait_zone_operation(op['name']) + except HttpError as err: + if getattr(err.resp, 'status', None) != 404: + logger.warning(f'Rollback: could not delete {self.name}: {err}') + except Exception as err: + logger.warning(f'Rollback: could not delete {self.name}: {err}') + self.instance_data = None def _wait_until_status(self, target_status, timeout=INSTANCE_START_TIMEOUT): start = time.time() diff --git a/lithops/standalone/backends/ibm_vpc/ibm_vpc.py b/lithops/standalone/backends/ibm_vpc/ibm_vpc.py index bea3b5095..d137edfc0 100644 --- a/lithops/standalone/backends/ibm_vpc/ibm_vpc.py +++ b/lithops/standalone/backends/ibm_vpc/ibm_vpc.py @@ -37,7 +37,9 @@ CLOUD_CONFIG_WORKER_PK, StandaloneMode, get_host_setup_script, - LithopsValidationError + LithopsValidationError, + prepare_standalone_clean, + standalone_clean_stop_early, ) logger = logging.getLogger(__name__) @@ -76,6 +78,7 @@ def __init__(self, config, mode): self.master = None self.workers = [] + self._init_created = None self.iam_api_key = self.config.get('iam_api_key') authenticator = IAMAuthenticator(self.iam_api_key, url=self.config.get('iam_endpoint')) @@ -95,6 +98,8 @@ def is_initialized(self): """ Checks if the backend is initialized """ + if self.mode == StandaloneMode.CONSUME.value: + return True return os.path.isfile(self.cache_file) def _load_vpc_data(self): @@ -106,7 +111,7 @@ def _load_vpc_data(self): if self.vpc_data: logger.debug(f'VPC data loaded from {self.cache_file}') - if 'vpc_id' in self.vpc_data: + if self.vpc_data and 'vpc_id' in self.vpc_data: self.vpc_key = self.vpc_data['vpc_id'][-6:] self.vpc_name = self.vpc_data['vpc_name'] @@ -125,6 +130,85 @@ def _delete_vpc_data(self): if os.path.exists(self.cache_file): os.remove(self.cache_file) + def _reset_init_created(self): + self._init_created = { + 'vpc': False, + 'ssh_key': False, + 'subnet': False, + 'gateway': False, + } + + def _safe_rollback_delete(self, delete_fn, resource_desc): + try: + delete_fn() + except ApiException as err: + if err.code not in (404, 400): + logger.warning(f'Rollback: could not delete {resource_desc}: {err}') + except Exception as err: + logger.warning(f'Rollback: could not delete {resource_desc}: {err}') + + def _rollback_init_resources(self): + """ + Deletes IBM VPC resources created during a failed init(). + """ + if not self._init_created: + return + + logger.info('Rolling back IBM VPC resources provisioned during failed init') + created = self._init_created + vpc_key = self.vpc_key or ( + self.config['vpc_id'][-6:] if self.config.get('vpc_id') else '' + ) + + if created.get('gateway') and self.config.get('gateway_id'): + gateway_id = self.config['gateway_id'] + gateway_name = f'lithops-gateway-{vpc_key}' + + if self.config.get('subnet_id'): + self._safe_rollback_delete( + lambda: self.vpc_cli.unset_subnet_public_gateway(self.config['subnet_id']), + f'subnet public gateway for {self.config["subnet_id"]}', + ) + + self._safe_rollback_delete( + lambda: self.vpc_cli.delete_public_gateway(gateway_id), + f'public gateway {gateway_name}', + ) + + if created.get('subnet') and self.config.get('subnet_id'): + subnet_name = f'lithops-subnet-{vpc_key}' + subnet_id = self.config['subnet_id'] + self._safe_rollback_delete( + lambda: self.vpc_cli.delete_subnet(subnet_id), + f'subnet {subnet_name}', + ) + + if created.get('vpc') and self.config.get('vpc_id'): + vpc_name = self.vpc_name or self.config.get('vpc_name', 'lithops-vpc') + vpc_id = self.config['vpc_id'] + self._safe_rollback_delete( + lambda: self.vpc_cli.delete_vpc(vpc_id), + f'VPC {vpc_name}', + ) + + if created.get('ssh_key'): + key_filename = self.config.get('ssh_key_filename') + if key_filename and 'lithops-key-' in key_filename: + for path in (key_filename, f'{key_filename}.pub'): + if os.path.isfile(path): + os.remove(path) + if self.config.get('ssh_key_id'): + key_id = self.config['ssh_key_id'] + self._safe_rollback_delete( + lambda: self.vpc_cli.delete_key(id=key_id), + f'SSH key {key_id}', + ) + + if self.vpc_data_type == 'created': + self._delete_vpc_data() + + self._init_created = None + def _create_vpc(self): """ Creates a new VPC @@ -166,6 +250,8 @@ def _create_vpc(self): vpc_prototype['resource_group'] = {'id': self.config['resource_group_id']} response = self.vpc_cli.create_vpc(**vpc_prototype) vpc_info = response.result + if self._init_created is not None: + self._init_created['vpc'] = True self.config['vpc_id'] = vpc_info['id'] self.config['security_group_id'] = vpc_info['default_security_group']['id'] @@ -229,10 +315,12 @@ def _get_ssh_key(): if key["name"] == keyname: return key + ssh_key_created = False if not os.path.isfile(key_filename): logger.debug("Generating new ssh key pair") os.system(f'ssh-keygen -b 2048 -t rsa -f {key_filename} -q -N ""') logger.debug(f"SHH key pair generated: {key_filename}") + ssh_key_created = True else: key_info = _get_ssh_key() @@ -244,6 +332,7 @@ def _get_ssh_key(): public_key=ssh_key_data, name=keyname, type="rsa", resource_group={"id": self.config['resource_group_id']} ).get_result() + ssh_key_created = True except ApiException as e: logger.error(e) if "Key with name already exists" in e.message: @@ -252,6 +341,7 @@ def _get_ssh_key(): public_key=ssh_key_data, name=keyname, type="rsa", resource_group={"id": self.config['resource_group_id']}, ).get_result() + ssh_key_created = True else: if "Key with fingerprint already exists" in e.message: logger.error("Can't register an SSH key with the same fingerprint") @@ -259,6 +349,8 @@ def _get_ssh_key(): self.config['ssh_key_id'] = key_info["id"] self.config['ssh_key_filename'] = key_filename + if self._init_created is not None and ssh_key_created: + self._init_created['ssh_key'] = True def _create_subnet(self): """ @@ -300,6 +392,8 @@ def _create_subnet(self): subnet_prototype['total_ipv4_address_count'] = 8192 response = self.vpc_cli.create_subnet(subnet_prototype) subnet_data = response.result + if self._init_created is not None: + self._init_created['subnet'] = True self.config['subnet_id'] = subnet_data['id'] self.config['zone_name'] = subnet_data['zone']['name'] @@ -339,6 +433,8 @@ def _create_gateway(self): gateway_prototype['floating_ip'] = {'id': fip_id} response = self.vpc_cli.create_public_gateway(**gateway_prototype) gateway_data = response.result + if self._init_created is not None: + self._init_created['gateway'] = True self.config['gateway_id'] = gateway_data['id'] @@ -468,46 +564,52 @@ def init(self): elif self.mode in [StandaloneMode.CREATE.value, StandaloneMode.REUSE.value]: - # Create the VPC if not exists - self._create_vpc() - - # Set the suffix used for the VPC resources - self.vpc_key = self.config['vpc_id'][-6:] - - # Create the ssh key pair if not exists - self._create_ssh_key() - # Create a new subnaet if not exists - self._create_subnet() - # Create a new gateway if not exists - self._create_gateway() - # Create the master VM floating IP address - self._create_master_floating_ip() - # Requests the Ubuntu image ID - self._request_image_id() - - # Create the master VM instance - self._create_master_instance() + self._reset_init_created() + try: + # Create the VPC if not exists + self._create_vpc() + + # Set the suffix used for the VPC resources + self.vpc_key = self.config['vpc_id'][-6:] + + # Create the ssh key pair if not exists + self._create_ssh_key() + # Create a new subnaet if not exists + self._create_subnet() + # Create a new gateway if not exists + self._create_gateway() + # Create the master VM floating IP address + self._create_master_floating_ip() + # Requests the Ubuntu image ID + self._request_image_id() + + # Create the master VM instance + self._create_master_instance() - self.vpc_data = { - 'mode': self.mode, - 'vpc_data_type': self.vpc_data_type, - 'ssh_data_type': self.ssh_data_type, - 'master_name': self.master.name, - 'master_id': self.vpc_key, - 'vpc_name': self.vpc_name, - 'vpc_id': self.config['vpc_id'], - 'subnet_id': self.config['subnet_id'], - 'security_group_id': self.config['security_group_id'], - 'floating_ip': self.config['floating_ip'], - 'floating_ip_id': self.config['floating_ip_id'], - 'gateway_id': self.config['gateway_id'], - 'zone_name': self.config['zone_name'], - 'image_id': self.config['image_id'], - 'ssh_key_id': self.config['ssh_key_id'], - 'ssh_key_filename': self.config['ssh_key_filename'] - } - - self._dump_vpc_data() + self.vpc_data = { + 'mode': self.mode, + 'vpc_data_type': self.vpc_data_type, + 'ssh_data_type': self.ssh_data_type, + 'master_name': self.master.name, + 'master_id': self.vpc_key, + 'vpc_name': self.vpc_name, + 'vpc_id': self.config['vpc_id'], + 'subnet_id': self.config['subnet_id'], + 'security_group_id': self.config['security_group_id'], + 'floating_ip': self.config['floating_ip'], + 'floating_ip_id': self.config['floating_ip_id'], + 'gateway_id': self.config['gateway_id'], + 'zone_name': self.config['zone_name'], + 'image_id': self.config['image_id'], + 'ssh_key_id': self.config['ssh_key_id'], + 'ssh_key_filename': self.config['ssh_key_filename'] + } + self._dump_vpc_data() + except Exception: + self._rollback_init_resources() + raise + finally: + self._init_created = None def build_image(self, image_name, script_file, overwrite, include, extra_args=[]): """ @@ -797,16 +899,15 @@ def clean(self, all=False): """ logger.info('Cleaning IBM VPC resources') - if not self.vpc_data: + prepare_standalone_clean(self, self._load_vpc_data) + if standalone_clean_stop_early(self, self.vpc_data, self._delete_vpc_data, all): return - if self.mode == StandaloneMode.CONSUME.value: + self._delete_vm_instances(all=all) + if all: + self._delete_vpc() + self._delete_ssh_key() self._delete_vpc_data() - else: - self._delete_vm_instances(all=all) - self._delete_vpc() if all else None - self._delete_ssh_key() if all else None - self._delete_vpc_data() if all else None def clear(self, job_keys=None): """ @@ -881,7 +982,11 @@ def get_runtime_key(self, runtime_name, version=__version__): Creates the runtime key """ name = runtime_name.replace('/', '-').replace(':', '-') - runtime_key = os.path.join(self.name, version, self.vpc_data['master_id'], name) + if self.mode == StandaloneMode.CONSUME.value: + master_id = self.config['instance_id'] + else: + master_id = self.vpc_data['master_id'] + runtime_key = os.path.join(self.name, version, master_id, name) return runtime_key @@ -1052,6 +1157,17 @@ def _create_instance(self, user_data): return self.instance_data + def _rollback_instance_resources(self, instance_created=False): + """ + Deletes a VM instance provisioned during a failed create(). + """ + if instance_created and self.instance_id: + logger.info(f'Rolling back resources provisioned for {self.name}') + try: + self._delete_instance() + except Exception as err: + logger.warning(f'Rollback: could not delete {self.name}: {err}') + def _attach_floating_ip(self, instance): """ Attach a floating IP address only if the VM is the master instance @@ -1139,12 +1255,21 @@ def create(self, check_if_exists=False, user_data=None): logger.debug(f'VM instance {self.name} already exists') vsi_exists = True - instance = self._create_instance(user_data=user_data) if not vsi_exists else self.start() + instance_created = False + try: + if not vsi_exists: + instance = self._create_instance(user_data=user_data) + instance_created = True + else: + instance = self.start() - if instance and self.public: - self._attach_floating_ip(instance) + if instance and self.public: + self._attach_floating_ip(instance) - return self.instance_id + return self.instance_id + except Exception: + self._rollback_instance_resources(instance_created=instance_created) + raise def start(self): """ @@ -1297,7 +1422,7 @@ def wrapper(*args, **kwargs): def _sleep_or_raise(sleep_time, err): if i < RETRIES - 1: time.sleep(sleep_time) - logger.warning((f'Got exception {err}, retrying for the {i} time, left retries {RETRIES - 1 -i}')) + logger.warning((f'Got exception {err}, retrying for the {i} time, left retries {RETRIES - 1 - i}')) return min(sleep_time * SLEEP_FACTOR, MAX_SLEEP) else: raise err diff --git a/lithops/standalone/utils.py b/lithops/standalone/utils.py index d52208ec7..d324d18cd 100644 --- a/lithops/standalone/utils.py +++ b/lithops/standalone/utils.py @@ -22,6 +22,33 @@ class StandaloneMode(Enum): REUSE = "reuse" +def prepare_standalone_clean(backend, load_cache_fn): + """ + Load persisted stack metadata from disk when the backend has a cache file. + + Standalone cloud backends call this at the start of clean() so cleanup works + even when clean() is invoked without a prior init() in the same process. + """ + if backend.is_initialized(): + load_cache_fn() + + +def standalone_clean_stop_early(backend, stack_data, delete_cache_fn, all_flag): + """ + Common clean() early exits for consume mode and missing stack metadata. + + Returns True when no further cloud resource cleanup is required. + """ + if backend.mode == StandaloneMode.CONSUME.value: + delete_cache_fn() + return True + if not stack_data: + if all_flag: + delete_cache_fn() + return True + return False + + class WorkerStatus(Enum): STARTING = "starting" STARTED = "started" diff --git a/lithops/storage/backends/gcp_storage/__init__.py b/lithops/storage/backends/gcp_storage/__init__.py index 8c6290057..7611f6d33 100644 --- a/lithops/storage/backends/gcp_storage/__init__.py +++ b/lithops/storage/backends/gcp_storage/__init__.py @@ -1,3 +1,5 @@ +from lithops import _grpc_env # noqa: F401 + from .gcp_storage import GCPStorageBackend as StorageBackend __all__ = ['StorageBackend'] diff --git a/lithops/tests/test_retries.py b/lithops/tests/test_retries.py index 66e892a19..e952c8c13 100644 --- a/lithops/tests/test_retries.py +++ b/lithops/tests/test_retries.py @@ -117,7 +117,7 @@ def deterministic_failure(path, timing_map, i): else: time.sleep(-timing_code) raise RuntimeError( - f"Deliberately fail on invocation number {invocation_count+1} for input {i}" + f"Deliberately fail on invocation number {invocation_count + 1} for input {i}" ) diff --git a/lithops/utils.py b/lithops/utils.py index 2fe26eefc..f7ac3d4bf 100644 --- a/lithops/utils.py +++ b/lithops/utils.py @@ -726,6 +726,31 @@ def readline(self): return retval +def docker_login(docker_user, docker_password, docker_server): + """ + Log in to a container registry using docker/podman. + + Docker Hub must be logged in without an explicit server host, matching + `docker login -u USER --password-stdin`. + """ + if not docker_user or not docker_password: + raise Exception('docker_user and docker_password are required') + if not docker_server: + raise Exception('docker_server is required') + + docker_path = get_docker_path() + docker_password = docker_password.strip() + + if 'docker.io' in docker_server: + cmd = f'{docker_path} login -u {docker_user} --password-stdin' + else: + cmd = (f'{docker_path} login -u {docker_user} --password-stdin ' + f'{docker_server}') + + logger.debug('Logging in to container registry') + run_command(cmd, input=docker_password) + + def run_command(cmd, return_result=False, input=None): kwargs = {} diff --git a/lithops/worker/handler.py b/lithops/worker/handler.py index 0de58c580..754d0cc42 100644 --- a/lithops/worker/handler.py +++ b/lithops/worker/handler.py @@ -28,7 +28,6 @@ import multiprocessing as mp from queue import Queue, Empty from threading import Thread -from multiprocessing import Process, Pipe from tblib import pickling_support from types import SimpleNamespace from multiprocessing.managers import SyncManager @@ -48,6 +47,10 @@ logger = logging.getLogger(__name__) +# Python 3.14 defaults to forkserver on Linux, which requires pickling Process +# arguments. Lithops relies on fork semantics for JobRunner subprocesses. +_MP_CTX = mp.get_context('fork') if is_unix_system() else None + class ShutdownSentinel: """Put an instance of this class on the queue to shut it down""" @@ -82,7 +85,7 @@ def function_handler(payload): work_queue.put(ShutdownSentinel()) python_queue_consumer(0, work_queue, ) else: - manager = SyncManager() + manager = _MP_CTX.Manager() if _MP_CTX else SyncManager() manager.start() work_queue = manager.Queue() job_runners = [] @@ -93,7 +96,7 @@ def function_handler(payload): for pid in range(worker_processes): work_queue.put(ShutdownSentinel()) - p = mp.Process(target=python_queue_consumer, args=(pid, work_queue,)) + p = _MP_CTX.Process(target=python_queue_consumer, args=(pid, work_queue,)) job_runners.append(p) p.start() @@ -201,10 +204,10 @@ def run_task(task): # send init status event call_status.send_init_event() - handler_conn, jobrunner_conn = Pipe() + handler_conn, jobrunner_conn = _MP_CTX.Pipe() jobrunner = JobRunner(task, jobrunner_conn, internal_storage) logger.debug('Starting JobRunner process') - jrp = Process(target=jobrunner.run) if is_unix_system() else Thread(target=jobrunner.run) + jrp = _MP_CTX.Process(target=jobrunner.run) if is_unix_system() else Thread(target=jobrunner.run) process_id = os.getpid() if is_unix_system() else mp.current_process().pid sys_monitor = SystemMonitor(process_id) diff --git a/runtime/aws_ec2/README.md b/runtime/aws_ec2/README.md index 152640162..f9a7455c5 100644 --- a/runtime/aws_ec2/README.md +++ b/runtime/aws_ec2/README.md @@ -10,7 +10,7 @@ For building the default VM image that contains all dependencies required by Lit lithops image build -b aws_ec2 ``` -This command will create an image called "lithops-ubuntu-jammy-22.04-amd64-server" in the target region. +This command will create an image called "lithops-ubuntu-noble-24.04-amd64-server" in the target region. If the image already exists, and you want to update it, use the `--overwrite` or `-o` parameter: ``` @@ -52,4 +52,4 @@ aws_ec2: ... ``` -Note that if you name your VM Image (AMI) as "lithops-ubuntu-jammy-22.04-amd64-server", there is no need to provide the `target_ami` in the config, since lithops automatically looks for this AMI name. +Note that if you name your VM Image (AMI) as "lithops-ubuntu-noble-24.04-amd64-server", there is no need to provide the `target_ami` in the config, since lithops automatically looks for this AMI name. diff --git a/runtime/azure_functions/README.md b/runtime/azure_functions/README.md index b3febff28..bcf749e5d 100644 --- a/runtime/azure_functions/README.md +++ b/runtime/azure_functions/README.md @@ -17,7 +17,7 @@ fexec.call_async(my_function, 3) result = lithops.get_result() ``` -* Note that Azure Functions does not allow to set a specific memory size for the runtimes, so the parameter `runtime_memory` won't take effect. +* Flex Consumption supports instance memory sizes of 512 MB, 2048 MB, and 4096 MB. Lithops maps `runtime_memory` to the nearest supported value (default: 2048 MB). ## Custom runtime