1717import os
1818import time
1919import json
20- import urllib
2120import yaml
2221import hashlib
2322import logging
2423import httplib2
24+ import requests
2525import google .auth
2626import google .oauth2 .id_token
27+ import google .auth .transport .requests
2728from threading import Lock
2829from google .oauth2 import service_account
2930from google_auth_httplib2 import AuthorizedHttp
3031from googleapiclient .discovery import build
32+ from googleapiclient .errors import HttpError
3133
3234from lithops import utils
3335from lithops .constants import COMPUTE_CLI_MSG
@@ -51,6 +53,7 @@ def __init__(self, cloudrun_config, internal_storage):
5153 self .credentials_path = cloudrun_config .get ('credentials_path' )
5254
5355 self ._build_api_resource ()
56+ self ._resolve_artifact_registry_repository_fallback ()
5457
5558 self ._service_url = None
5659 self ._id_token = None
@@ -121,10 +124,122 @@ def _build_api_resource(self):
121124 'api_endpoint' : f'https://{ self .region } -run.googleapis.com'
122125 }
123126 )
127+ self ._ar_resource = build (
128+ 'artifactregistry' , 'v1' ,
129+ http = http , cache_discovery = False
130+ )
124131
125132 self .cr_config ['project_name' ] = self .project_name
126133 self .cr_config ['service_account' ] = self .service_account
127134
135+ def _parse_artifact_registry_image_name (self , image_name ):
136+ """
137+ Parse Artifact Registry image format:
138+ REGION-docker.pkg.dev/PROJECT/REPOSITORY/IMAGE[:TAG]
139+ """
140+ parts = image_name .split ('/' )
141+ if len (parts ) < 4 :
142+ return None
143+ host , project , repository = parts [0 ], parts [1 ], parts [2 ]
144+ if not host .endswith ('-docker.pkg.dev' ):
145+ return None
146+ location = host .replace ('-docker.pkg.dev' , '' )
147+ return project , location , repository
148+
149+ def _artifact_registry_uploader_identity (self ):
150+ if self .credentials_path and os .path .isfile (self .credentials_path ):
151+ try :
152+ with open (self .credentials_path , 'r' ) as f :
153+ cred_data = json .load (f )
154+ return cred_data .get ('client_email' )
155+ except Exception :
156+ return None
157+ return self .service_account
158+
159+ def _list_docker_repositories (self ):
160+ parent = f'projects/{ self .project_name } /locations/{ self .region } '
161+ repos = []
162+ page_token = None
163+ while True :
164+ req = self ._ar_resource .projects ().locations ().repositories ().list (
165+ parent = parent , pageToken = page_token
166+ )
167+ res = req .execute ()
168+ for repo in res .get ('repositories' , []):
169+ if repo .get ('format' ) == 'DOCKER' :
170+ repos .append (repo ['name' ].rsplit ('/' , 1 )[- 1 ])
171+ page_token = res .get ('nextPageToken' )
172+ if not page_token :
173+ break
174+ return repos
175+
176+ def _resolve_artifact_registry_repository_fallback (self ):
177+ """
178+ Minimal repository resolution:
179+ - Use configured repository if accessible
180+ - Otherwise fallback to an existing DOCKER repository (prefer gcf-artifacts)
181+ """
182+ repository = self .cr_config .get ('artifact_registry_repository' , 'lithops' )
183+ name = f'projects/{ self .project_name } /locations/{ self .region } /repositories/{ repository } '
184+
185+ try :
186+ self ._ar_resource .projects ().locations ().repositories ().get (name = name ).execute ()
187+ return
188+ except Exception :
189+ pass
190+
191+ try :
192+ docker_repos = self ._list_docker_repositories ()
193+ except Exception :
194+ docker_repos = []
195+
196+ if not docker_repos :
197+ return
198+
199+ fallback = 'gcf-artifacts' if 'gcf-artifacts' in docker_repos else sorted (docker_repos )[0 ]
200+ if fallback != repository :
201+ self .cr_config ['artifact_registry_repository' ] = fallback
202+ logger .info (
203+ f'Using Artifact Registry repository "{ fallback } " '
204+ f'(configured "{ repository } " is not accessible).'
205+ )
206+
207+ def _ensure_artifact_registry_upload_permission (self , image_name ):
208+ """
209+ Check that current principal can upload artifacts to the target repo.
210+ """
211+ parsed = self ._parse_artifact_registry_image_name (image_name )
212+ if not parsed :
213+ return
214+
215+ project , location , repository = parsed
216+ resource = f'projects/{ project } /locations/{ location } /repositories/{ repository } '
217+
218+ try :
219+ result = self ._ar_resource .projects ().locations ().repositories ().testIamPermissions (
220+ resource = resource ,
221+ body = {
222+ 'permissions' : ['artifactregistry.repositories.uploadArtifacts' ]
223+ }
224+ ).execute ()
225+ except HttpError :
226+ # If we cannot test permissions (forbidden/unavailable), fail later on push.
227+ return
228+
229+ granted = set (result .get ('permissions' , []))
230+ if 'artifactregistry.repositories.uploadArtifacts' in granted :
231+ return
232+
233+ principal = self ._artifact_registry_uploader_identity () or 'current credentials principal'
234+ raise Exception (
235+ 'Missing Artifact Registry permission to push runtime image. '
236+ f'Principal "{ principal } " does not have '
237+ f'"artifactregistry.repositories.uploadArtifacts" on "{ resource } ". '
238+ 'Grant role "roles/artifactregistry.writer" on the repository/project or configure '
239+ 'a repository where this principal has write access via '
240+ '`gcp_cloudrun.artifact_registry_repository`.'
241+ )
242+
128243 def _get_url_and_token (self , service_name ):
129244 """
130245 Generates a connection token
@@ -207,17 +322,24 @@ def invoke(self, runtime_name, runtime_memory, payload, return_result=False):
207322 else :
208323 logger .debug ('Invoking function' )
209324
210- req = urllib .request .Request (service_url + route , data = json .dumps (payload , default = str ).encode ('utf-8' ))
211- req .add_header ("Authorization" , f"Bearer { id_token } " )
212- res = urllib .request .urlopen (req )
325+ headers = {
326+ "Authorization" : f"Bearer { id_token } " ,
327+ "Content-Type" : "application/json"
328+ }
329+ response = requests .post (
330+ service_url + route ,
331+ data = json .dumps (payload , default = str ),
332+ headers = headers ,
333+ timeout = 120
334+ )
213335
214- if res . getcode () in (200 , 202 ):
215- data = json . loads ( res . read () )
336+ if response . status_code in (200 , 202 ):
337+ data = response . json ( )
216338 if return_result :
217339 return data
218340 return data ["activationId" ]
219341 else :
220- raise Exception (res .text )
342+ raise Exception (response .text )
221343
222344 def build_runtime (self , runtime_name , dockerfile , extra_args = []):
223345 """
@@ -253,11 +375,19 @@ def build_runtime(self, runtime_name, dockerfile, extra_args=[]):
253375 raise Exception (f'There was an error authorizing Docker for push to { registry_host } ' )
254376
255377 logger .debug (f'Pushing runtime { image_name } to { registry_host } ' )
378+ self ._ensure_artifact_registry_upload_permission (image_name )
256379 if utils .is_podman (docker_path ):
257380 cmd = f'{ docker_path } push { image_name } --format docker --remove-signatures'
258381 else :
259382 cmd = f'{ docker_path } push { image_name } '
260- utils .run_command (cmd )
383+ try :
384+ utils .run_command (cmd )
385+ except Exception as e :
386+ raise Exception (
387+ f'Unable to push runtime image to Artifact Registry ({ image_name } ). '
388+ 'Verify the repository exists and that your identity has Artifact Registry write permissions '
389+ '(artifactregistry.repositories.uploadArtifacts).'
390+ ) from e
261391
262392 def _create_service (self , runtime_name , runtime_memory , timeout ):
263393 """
@@ -292,9 +422,19 @@ def _create_service(self, runtime_name, runtime_memory, timeout):
292422 container ['resources' ]['requests' ]['cpu' ] = str (self .cr_config ['runtime_cpu' ])
293423
294424 logger .debug (f"Creating service: { service_name } " )
295- res = self ._api_resource .namespaces ().services ().create (
296- parent = f'namespaces/{ self .project_name } ' , body = svc_res
297- ).execute ()
425+ try :
426+ res = self ._api_resource .namespaces ().services ().create (
427+ parent = f'namespaces/{ self .project_name } ' , body = svc_res
428+ ).execute ()
429+ except HttpError as e :
430+ if e .resp .status == 409 :
431+ logger .debug (f'Service { service_name } already exists. Recreating it' )
432+ self ._delete_service (service_name )
433+ res = self ._api_resource .namespaces ().services ().create (
434+ parent = f'namespaces/{ self .project_name } ' , body = svc_res
435+ ).execute ()
436+ else :
437+ raise
298438 logger .debug (f'Ok -- service created { service_name } ' )
299439
300440 # Wait until service is up
0 commit comments