Skip to content

Commit 55c32fd

Browse files
authored
[GCP Functions] Migrate backend to Cloud Run functions v2 (#1466)
1 parent fb8e0ef commit 55c32fd

8 files changed

Lines changed: 146 additions & 63 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
- [Runtimes] Updated runtime images and related version references across backends.
1111
- [K8s] Added configuration for pod and container `securityContext`.
1212
- [Docs] Corrected MinIO/Ceph config template keys and removed obsolete Kubernetes image references.
13+
- [GCP Functions] Updated `gcp_functions` backend to Google Cloud Run functions (Cloud Functions v2 API).
1314

1415
### Fixed
1516
- [K8s] Fixed default runtime builds impacted by Debian Buster end-of-life.

docs/source/compute_config/gcp_functions.md

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
# Google Cloud Functions
1+
# Google Cloud Run functions (v2)
22

3-
Lithops with *GCP Functions* as serverless compute backend.
3+
Lithops with *GCP Functions* (Cloud Run functions v2 via the Cloud Functions v2 API) as serverless compute backend.
44

55
## Installation
66

@@ -32,6 +32,26 @@ python3 -m pip install lithops[gcp]
3232

3333
10. Enable the **Artifact Registry API**: Navigate to *APIs & services* tab on the menu. Click *ENABLE APIS AND SERVICES*. Look for "Artifact Registry API" at the search bar. Click *Enable*.
3434

35+
11. If you use `trigger: pub/sub`, enable the **Eventarc API** as well. Cloud Run functions v2 event triggers are managed through Eventarc.
36+
37+
```bash
38+
gcloud services enable eventarc.googleapis.com --project <PROJECT_ID>
39+
```
40+
41+
For first-time setup, you can enable all commonly required APIs in one command:
42+
43+
```bash
44+
gcloud services enable \
45+
cloudfunctions.googleapis.com \
46+
run.googleapis.com \
47+
eventarc.googleapis.com \
48+
pubsub.googleapis.com \
49+
cloudbuild.googleapis.com \
50+
artifactregistry.googleapis.com \
51+
storage.googleapis.com \
52+
--project <PROJECT_ID>
53+
```
54+
3555
## Configuration
3656

3757
1. Edit your lithops config and add the following keys:
@@ -54,7 +74,7 @@ python3 -m pip install lithops[gcp]
5474
|gcp | region | |yes | Region name of the GCP services (e.g. `us-east1`) |
5575
|gcp | credentials_path | |yes | **Absolute** path of your JSON key file downloaded in step 7 (e.g. `/home/myuser/lithops-invoker1234567890.json`). Alternatively you can set `GOOGLE_APPLICATION_CREDENTIALS` environment variable. If not provided it will try to load the default credentials from the environment|
5676

57-
### Google Cloud Functions
77+
### Google Cloud Run functions (v2)
5878
|Group|Key|Default|Mandatory|Additional info|
5979
|---|---|---|---|---|
6080
|gcp_functions | region | |no | Region name (e.g. `us-east1`). Functions and pub/sub queues will be created in the same region. Lithops will use the region set under the `gcp` section if it is not set here |
@@ -74,6 +94,11 @@ Once you have your compute and storage backends configured, you can run a hello
7494
lithops hello -b gcp_functions -s gcp_storage
7595
```
7696

97+
## References
98+
99+
- [Cloud Functions v2 REST API reference](https://docs.cloud.google.com/functions/docs/reference/rest)
100+
- [Cloud Run functions best practices](https://docs.cloud.google.com/run/docs/tips/functions-best-practices)
101+
77102

78103
## Viewing the execution logs
79104

docs/source/supported_clouds.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ Currently, Lithops for Google Cloud Platform supports these backends:
4848

4949
* - Compute
5050
- Storage
51-
* - `Google Cloud Functions <https://cloud.google.com/functions/docs>`_
52-
- `Google Cloud Storage <ttps://cloud.google.com/storage/docs>`_
51+
* - `Google Cloud Run functions (v2) <https://docs.cloud.google.com/functions/docs/>`_
52+
- `Google Cloud Storage <https://cloud.google.com/storage/docs>`_
5353
* - `Google Cloud Run <https://cloud.google.com/run/docs>`_
5454
-
5555

lithops/executors.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,18 @@ def save_data_to_clean(data):
627627
spawn_cleaner = not (CLEANER_PROCESS and CLEANER_PROCESS.poll() is None)
628628
if (jobs_to_clean or cs) and spawn_cleaner:
629629
cmd = [sys.executable, '-m', 'lithops.scripts.cleaner']
630-
CLEANER_PROCESS = sp.Popen(cmd, start_new_session=True)
630+
env = os.environ.copy()
631+
# Cleaner is forked while gRPC clients may already be active in the parent process.
632+
# Reduce noisy gRPC fork logs in the detached cleaner subprocess.
633+
env.setdefault('GRPC_ENABLE_FORK_SUPPORT', '0')
634+
env.setdefault('GRPC_VERBOSITY', 'ERROR')
635+
CLEANER_PROCESS = sp.Popen(
636+
cmd,
637+
start_new_session=True,
638+
env=env,
639+
stdout=sp.DEVNULL,
640+
stderr=sp.DEVNULL
641+
)
631642

632643
def job_summary(self, cloud_objects_n: Optional[int] = 0):
633644
"""

lithops/serverless/backends/gcp_functions/config.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@
2222
FH_ZIP_LOCATION = os.path.join(TEMP_DIR, 'lithops_gcp_functions/{}.zip')
2323
SCOPES = ('https://www.googleapis.com/auth/cloud-platform',
2424
'https://www.googleapis.com/auth/pubsub')
25-
FUNCTIONS_API_VERSION = 'v1'
26-
PUBSUB_API_VERSION = 'v1'
25+
FUNCTIONS_API_VERSION = 'v2'
2726
AUDIENCE = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
2827

29-
RUNTIME_MEMORY_MAX = 8192 # 8GB
30-
RUNTIME_MEMORY_OPTIONS = {128, 256, 512, 1024, 2048, 4096, 8192}
28+
RUNTIME_MEMORY_MAX = 32768 # 32GB
29+
RUNTIME_MEMORY_OPTIONS = {128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768}
3130

3231
AVAILABLE_PY_RUNTIMES = {
3332
'3.8': 'python38',

lithops/serverless/backends/gcp_functions/entry_point.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def main(data, context=None):
4444

4545
activation_id = uuid.uuid4().hex
4646
os.environ['__LITHOPS_ACTIVATION_ID'] = activation_id
47-
os.environ['__LITHOPS_BACKEND'] = 'Google Cloud Functions'
47+
os.environ['__LITHOPS_BACKEND'] = 'Google Cloud Run functions (v2)'
4848

4949
if 'get_metadata' in args:
5050
runtime_meta = get_runtime_metadata()
@@ -58,10 +58,10 @@ def main(data, context=None):
5858
else:
5959
return runtime_meta
6060
elif 'remote_invoker' in args:
61-
logger.info(f"Lithops v{__version__} - Starting Google Cloud Functions invoker")
61+
logger.info(f"Lithops v{__version__} - Starting Google Cloud Run functions (v2) invoker")
6262
function_invoker(args)
6363
else:
64-
logger.info(f"Lithops v{__version__} - Starting Google Cloud Functions execution")
64+
logger.info(f"Lithops v{__version__} - Starting Google Cloud Run functions (v2) execution")
6565
function_handler(args)
6666

6767
return {"activationId": activation_id}

lithops/serverless/backends/gcp_functions/gcp_functions.py

Lines changed: 88 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323
import httplib2
2424
import zipfile
2525
import time
26-
import urllib
26+
import requests
2727
import google.auth
2828
import google.oauth2.id_token
29+
import google.auth.transport.requests
2930
from threading import Lock
3031
from google.cloud import pubsub_v1
3132
from google.oauth2 import service_account
@@ -58,13 +59,13 @@ def __init__(self, gcf_config, internal_storage):
5859
self.internal_storage = internal_storage
5960

6061
self._build_api_resource()
61-
62-
self._api_endpoint = f'https://{self.region}-{self.project_name}.cloudfunctions.net/'
62+
self._function_url = None
63+
self._function_name = None
6364
self._api_token = None
6465

6566
logger.debug(f'Invocation trigger set to: {self.trigger}')
6667

67-
msg = COMPUTE_CLI_MSG.format('Google Cloud Functions')
68+
msg = COMPUTE_CLI_MSG.format('Google Cloud Run functions (v2)')
6869
logger.info(f"{msg} - Region: {self.region} - Project: {self.project_name}")
6970

7071
def _build_api_resource(self):
@@ -128,6 +129,21 @@ def _get_runtime_bin_location(self, runtime_name):
128129
function_name = self._format_function_name(runtime_name)
129130
return config.USER_RUNTIMES_PREFIX + '/' + function_name + '_bin.zip'
130131

132+
def _memory_to_gcfv2(self, memory_mb):
133+
return f'{memory_mb}Mi'
134+
135+
def _memory_from_gcfv2(self, memory_value):
136+
if isinstance(memory_value, int):
137+
return memory_value
138+
if isinstance(memory_value, str):
139+
if memory_value.endswith('Mi'):
140+
return int(memory_value[:-2])
141+
if memory_value.endswith('Gi'):
142+
return int(memory_value[:-2]) * 1024
143+
if memory_value.endswith('M'):
144+
return int(memory_value[:-1])
145+
raise ValueError(f'Unable to parse memory value: {memory_value}')
146+
131147
def _encode_payload(self, payload):
132148
return base64.b64encode(bytes(json.dumps(payload), 'utf-8')).decode('utf-8')
133149

@@ -137,9 +153,14 @@ def _get_token(self, function_name):
137153
"""
138154
invoke_mutex.acquire()
139155

140-
if not self._api_token or function_name not in self._function_url:
156+
if not self._api_token or self._function_name != function_name:
141157
logger.debug('Getting authentication token')
142-
self._function_url = self._api_endpoint + function_name
158+
function_location = self._get_function_location(function_name)
159+
response = self._api_resource.projects().locations().functions().get(
160+
name=function_location
161+
).execute(num_retries=self.num_retries)
162+
self._function_url = response['serviceConfig']['uri']
163+
self._function_name = function_name
143164
if self.credentials_path and os.path.isfile(self.credentials_path):
144165
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.credentials_path
145166
auth_req = google.auth.transport.requests.Request()
@@ -168,14 +189,10 @@ def _wait_function_deleted(self, function_location):
168189
# Wait until function is completely deleted
169190
while True:
170191
try:
171-
response = self._api_resource.projects().locations().functions().get(
192+
self._api_resource.projects().locations().functions().get(
172193
name=function_location
173194
).execute(num_retries=self.num_retries)
174-
logger.debug(f'Function status is {response["status"]}')
175-
if response['status'] == 'DELETE_IN_PROGRESS':
176-
time.sleep(self.retry_sleep)
177-
else:
178-
raise Exception(f'Unknown status: {response["status"]}')
195+
time.sleep(self.retry_sleep)
179196
except Exception:
180197
logger.debug('Function status is DELETED')
181198
break
@@ -218,13 +235,25 @@ def _create_function(self, runtime_name, memory, timeout=60):
218235
cloud_function = {
219236
'name': function_location,
220237
'description': 'Lithops Worker for Lithops v' + __version__,
221-
'entryPoint': 'main',
222-
'runtime': config.AVAILABLE_PY_RUNTIMES[utils.CURRENT_PY_VERSION],
223-
'timeout': str(timeout) + 's',
224-
'availableMemoryMb': memory,
225-
'serviceAccountEmail': self.service_account,
226-
'maxInstances': 0,
227-
'sourceArchiveUrl': f'gs://{self.internal_storage.bucket}/{bin_location}',
238+
'buildConfig': {
239+
'runtime': config.AVAILABLE_PY_RUNTIMES[utils.CURRENT_PY_VERSION],
240+
'entryPoint': 'main',
241+
'source': {
242+
'storageSource': {
243+
'bucket': self.internal_storage.bucket,
244+
'object': bin_location
245+
}
246+
}
247+
},
248+
'serviceConfig': {
249+
'timeoutSeconds': timeout,
250+
'availableMemory': self._memory_to_gcfv2(memory),
251+
'serviceAccountEmail': self.service_account,
252+
'maxInstanceCount': self.gcf_config['max_workers'],
253+
'minInstanceCount': 0,
254+
'maxInstanceRequestConcurrency': 1,
255+
'allTrafficOnLatestRevision': True
256+
},
228257
'labels': {
229258
'type': 'lithops-runtime',
230259
'lithops_version': __version__.replace('.', '-'),
@@ -233,41 +262,43 @@ def _create_function(self, runtime_name, memory, timeout=60):
233262
}
234263

235264
if self.trigger == 'https':
236-
cloud_function['httpsTrigger'] = {}
265+
pass
237266

238267
elif self.trigger == 'pub/sub':
239268
topic_name = self._format_topic_name(function_name)
240269
topic_location = self._get_topic_location(topic_name)
241270
cloud_function['eventTrigger'] = {
242-
'eventType': 'providers/cloud.pubsub/eventTypes/topic.publish',
243-
'resource': topic_location,
244-
'failurePolicy': {}
271+
'triggerRegion': self.region,
272+
'eventType': 'google.cloud.pubsub.topic.v1.messagePublished',
273+
'pubsubTopic': topic_location,
274+
'retryPolicy': 'RETRY_POLICY_RETRY'
245275
}
246276

247277
logger.info(f'Deploying function {function_location}')
248278
for attempt in range(self.num_retries):
249279
try:
250280
operation = self._api_resource.projects().locations().functions().create(
251-
location=self._default_location,
281+
parent=self._default_location,
282+
functionId=function_name,
252283
body=cloud_function
253284
).execute()
254285
break
255286
except Exception as e:
256287
if attempt < self.num_retries - 1:
257288
time.sleep(self.retry_sleep)
258289
else:
259-
raise Exception(f"Failed to create Cloud Function after {self.num_retries} attempts.") from e
290+
raise Exception(f"Failed to create Cloud Run function (v2) after {self.num_retries} attempts.") from e
260291

261292
# Wait until the function is completely deployed
262293
logger.info('Waiting for the function to be deployed')
263294
operation_name = operation['name']
264295
while True:
265-
op_status = self._api_resource.operations().get(
296+
op_status = self._api_resource.projects().locations().operations().get(
266297
name=operation_name
267298
).execute(num_retries=self.num_retries)
268299
if op_status.get('done'):
269300
if 'error' in op_status:
270-
raise Exception(f'Error while deploying Cloud Function: {op_status["error"]}')
301+
raise Exception(f'Error while deploying Cloud Run function (v2): {op_status["error"]}')
271302
logger.info("Deployment completed successfully.")
272303
break
273304
else:
@@ -376,7 +407,7 @@ def list_runtimes(self, runtime_name='all'):
376407
fn_name = func['name'].rsplit('/', 1)[-1]
377408
version = func['labels']['lithops_version'].replace('-', '.')
378409
rt_name = func['labels']['runtime_name']
379-
memory = func['availableMemoryMb']
410+
memory = self._memory_from_gcfv2(func['serviceConfig']['availableMemory'])
380411
if runtime_name == rt_name or runtime_name == 'all':
381412
runtimes.append((rt_name, memory, version, fn_name))
382413

@@ -401,18 +432,23 @@ def invoke(self, runtime_name, runtime_memory, payload={}, return_result=False):
401432

402433
if self.trigger == 'pub/sub':
403434
if return_result:
404-
function_location = self._get_function_location(function_name)
405-
response = self._api_resource.projects().locations().functions().call(
406-
name=function_location,
407-
body={'data': json.dumps({'data': self._encode_payload(payload)})}
408-
).execute(num_retries=self.num_retries)
409-
if 'result' in response and response['result'] == 'OK':
410-
object_key = '/'.join([JOBS_PREFIX, runtime_name + '.meta'])
411-
runtime_meta = json.loads(self.internal_storage.get_data(object_key))
412-
self.internal_storage.storage.delete_object(self.internal_storage.bucket, object_key)
413-
return runtime_meta
414-
else:
415-
raise Exception(f'Error at retrieving runtime metadata: {response}')
435+
topic_location = self._get_topic_location(self._format_topic_name(function_name))
436+
fut = self._pub_client.publish(
437+
topic_location,
438+
bytes(json.dumps(payload, default=str).encode('utf-8'))
439+
)
440+
invocation_id = fut.result()
441+
object_key = '/'.join([JOBS_PREFIX, runtime_name + '.meta'])
442+
for _ in range(max(1, self.num_retries * 4)):
443+
try:
444+
runtime_meta = json.loads(self.internal_storage.get_data(object_key))
445+
self.internal_storage.storage.delete_object(self.internal_storage.bucket, object_key)
446+
return runtime_meta
447+
except Exception:
448+
time.sleep(self.retry_sleep)
449+
raise Exception(
450+
f'Timed out waiting for runtime metadata for invocation {invocation_id}'
451+
)
416452
else:
417453
topic_location = self._get_topic_location(self._format_topic_name(function_name))
418454
fut = self._pub_client.publish(
@@ -423,12 +459,19 @@ def invoke(self, runtime_name, runtime_memory, payload={}, return_result=False):
423459

424460
elif self.trigger == 'https':
425461
function_url, api_token = self._get_token(function_name)
426-
req = urllib.request.Request(function_url, data=json.dumps(payload, default=str).encode('utf-8'))
427-
req.add_header("Authorization", f"Bearer {api_token}")
428-
res = urllib.request.urlopen(req)
462+
headers = {
463+
"Authorization": f"Bearer {api_token}",
464+
"Content-Type": "application/json"
465+
}
466+
res = requests.post(
467+
function_url,
468+
data=json.dumps(payload, default=str),
469+
headers=headers,
470+
timeout=120
471+
)
429472

430-
if res.getcode() in (200, 202):
431-
data = json.loads(res.read())
473+
if res.status_code in (200, 202):
474+
data = res.json()
432475
if return_result:
433476
return data
434477
return data["activationId"]

0 commit comments

Comments
 (0)