This repository was archived by the owner on Apr 1, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 67
feat: Add cloud_function_cpus option to remote_function #2475
Merged
Merged
Changes from 5 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
8a7ca8c
feat: Add concurrency options to udf
TrevorBergeron e64d9ec
expose cpu instead of concurrency
TrevorBergeron efb54ac
use 4 threads per worker
TrevorBergeron cca8001
delete notebook
TrevorBergeron c8a6394
remove dead params
TrevorBergeron f8f4f11
address comments
TrevorBergeron 823ffbc
fix concurrency to 1 if cpus < 1
TrevorBergeron 33cd4b4
fix cpu<1 cases
TrevorBergeron c77023f
error fast on too little memory
TrevorBergeron 16cca4b
Merge remote-tracking branch 'github/main' into rf_concur
TrevorBergeron 93747c1
fix thresholds
TrevorBergeron File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,6 +65,8 @@ | |
| # BQ managed functions (@udf) currently only support Python 3.11. | ||
| _MANAGED_FUNC_PYTHON_VERSION = "python-3.11" | ||
|
|
||
| _DEFAULT_FUNCTION_MEMORY_MIB = 1024 | ||
|
|
||
|
|
||
| class FunctionClient: | ||
| # Wait time (in seconds) for an IAM binding to take effect after creation. | ||
|
|
@@ -402,8 +404,12 @@ def create_cloud_function( | |
| is_row_processor=False, | ||
| vpc_connector=None, | ||
| vpc_connector_egress_settings="private-ranges-only", | ||
| memory_mib=1024, | ||
| memory_mib=None, | ||
| cpus=None, | ||
| ingress_settings="internal-only", | ||
| workers=None, | ||
| threads=None, | ||
| concurrency=None, | ||
| ): | ||
| """Create a cloud function from the given user defined function.""" | ||
|
|
||
|
|
@@ -486,6 +492,8 @@ def create_cloud_function( | |
| function.service_config = functions_v2.ServiceConfig() | ||
| if memory_mib is not None: | ||
| function.service_config.available_memory = f"{memory_mib}Mi" | ||
| if cpus is not None: | ||
| function.service_config.available_cpu = str(cpus) | ||
| if timeout_seconds is not None: | ||
| if timeout_seconds > 1200: | ||
| raise bf_formatting.create_exception_with_feedback_link( | ||
|
|
@@ -517,6 +525,17 @@ def create_cloud_function( | |
| function.service_config.service_account_email = ( | ||
| self._cloud_function_service_account | ||
| ) | ||
| if concurrency: | ||
| function.service_config.max_instance_request_concurrency = concurrency | ||
|
|
||
| env_vars = {} | ||
| if workers: | ||
| env_vars["WORKERS"] = str(workers) | ||
| if threads: | ||
| env_vars["THREADS"] = str(threads) | ||
| if env_vars: | ||
| function.service_config.environment_variables = env_vars | ||
|
|
||
| if ingress_settings not in _INGRESS_SETTINGS_MAP: | ||
| raise bf_formatting.create_exception_with_feedback_link( | ||
| ValueError, | ||
|
|
@@ -581,6 +600,7 @@ def provision_bq_remote_function( | |
| cloud_function_vpc_connector, | ||
| cloud_function_vpc_connector_egress_settings, | ||
| cloud_function_memory_mib, | ||
| cloud_function_cpus, | ||
| cloud_function_ingress_settings, | ||
| bq_metadata, | ||
| ): | ||
|
|
@@ -616,6 +636,18 @@ def provision_bq_remote_function( | |
| ) | ||
| cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) | ||
|
|
||
| if (cloud_function_cpus is None) and (cloud_function_memory_mib is None): | ||
| cloud_function_memory_mib = _DEFAULT_FUNCTION_MEMORY_MIB | ||
|
|
||
| # assumption is most bigframes functions are cpu bound, single-threaded and many won't release GIL | ||
| # therefore, want to allocate a worker for each cpu, and allow a concurrent request per worker | ||
| expected_cpus = cloud_function_cpus or _infer_cpus_from_memory( | ||
| cloud_function_memory_mib | ||
| ) | ||
| workers = expected_cpus | ||
| threads = 4 # (per worker) | ||
| concurrency = workers * threads | ||
|
|
||
| # Create the cloud function if it does not exist | ||
| if not cf_endpoint: | ||
| cf_endpoint = self.create_cloud_function( | ||
|
|
@@ -630,7 +662,11 @@ def provision_bq_remote_function( | |
| vpc_connector=cloud_function_vpc_connector, | ||
| vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings, | ||
| memory_mib=cloud_function_memory_mib, | ||
| cpus=cloud_function_cpus, | ||
| ingress_settings=cloud_function_ingress_settings, | ||
| workers=workers, | ||
| threads=threads, | ||
| concurrency=concurrency, | ||
| ) | ||
| else: | ||
| logger.info(f"Cloud function {cloud_function_name} already exists.") | ||
|
|
@@ -696,3 +732,19 @@ def get_remote_function_specs(self, remote_function_name): | |
| # Note: list_routines doesn't make an API request until we iterate on the response object. | ||
| pass | ||
| return (http_endpoint, bq_connection) | ||
|
|
||
|
|
||
| def _infer_cpus_from_memory(memory_mib: int) -> int: | ||
| if memory_mib <= 2048: | ||
| # in actuality, will be 0.583 for 1024mb, 0.33 for 512mb, etc, but we round up to 1 | ||
| return 1 | ||
| elif memory_mib <= 8192: | ||
| return 2 | ||
| elif memory_mib <= 8192: | ||
|
||
| return 2 | ||
| elif memory_mib <= 16384: | ||
| return 4 | ||
| elif memory_mib <= 32768: | ||
| return 8 | ||
| else: | ||
| raise ValueError("Cloud run support at most 32768MiB per instance") | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you know where to put these configs to? Like this function.service_config.environment_variables? Is there a cloud run doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uhh, I just found it with ide auto-complete.
The specific environment variables are from functions framework hooks: see GoogleCloudPlatform/functions-framework-python#241