Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
54 changes: 53 additions & 1 deletion bigframes/functions/_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
# BQ managed functions (@udf) currently only support Python 3.11.
_MANAGED_FUNC_PYTHON_VERSION = "python-3.11"

_DEFAULT_FUNCTION_MEMORY_MIB = 1024


class FunctionClient:
# Wait time (in seconds) for an IAM binding to take effect after creation.
Expand Down Expand Up @@ -402,8 +404,12 @@ def create_cloud_function(
is_row_processor=False,
vpc_connector=None,
vpc_connector_egress_settings="private-ranges-only",
memory_mib=1024,
memory_mib=None,
cpus=None,
ingress_settings="internal-only",
workers=None,
threads=None,
concurrency=None,
):
"""Create a cloud function from the given user defined function."""

Expand Down Expand Up @@ -486,6 +492,8 @@ def create_cloud_function(
function.service_config = functions_v2.ServiceConfig()
if memory_mib is not None:
function.service_config.available_memory = f"{memory_mib}Mi"
if cpus is not None:
function.service_config.available_cpu = str(cpus)
if timeout_seconds is not None:
if timeout_seconds > 1200:
raise bf_formatting.create_exception_with_feedback_link(
Expand Down Expand Up @@ -517,6 +525,17 @@ def create_cloud_function(
function.service_config.service_account_email = (
self._cloud_function_service_account
)
if concurrency:
function.service_config.max_instance_request_concurrency = concurrency

env_vars = {}
if workers:
env_vars["WORKERS"] = str(workers)
if threads:
env_vars["THREADS"] = str(threads)
if env_vars:
function.service_config.environment_variables = env_vars
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you know where to put these configs to? Like this function.service_config.environment_variables? Is there a cloud run doc?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhh, I just found it with ide auto-complete.
The specific environment variables are from functions framework hooks: see GoogleCloudPlatform/functions-framework-python#241


if ingress_settings not in _INGRESS_SETTINGS_MAP:
raise bf_formatting.create_exception_with_feedback_link(
ValueError,
Expand Down Expand Up @@ -581,6 +600,7 @@ def provision_bq_remote_function(
cloud_function_vpc_connector,
cloud_function_vpc_connector_egress_settings,
cloud_function_memory_mib,
cloud_function_cpus,
cloud_function_ingress_settings,
bq_metadata,
):
Expand Down Expand Up @@ -616,6 +636,18 @@ def provision_bq_remote_function(
)
cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name)

if (cloud_function_cpus is None) and (cloud_function_memory_mib is None):
cloud_function_memory_mib = _DEFAULT_FUNCTION_MEMORY_MIB

# assumption is most bigframes functions are cpu bound, single-threaded and many won't release GIL
# therefore, want to allocate a worker for each cpu, and allow a concurrent request per worker
expected_cpus = cloud_function_cpus or _infer_cpus_from_memory(
cloud_function_memory_mib
)
workers = expected_cpus
threads = 4 # (per worker)
concurrency = workers * threads

# Create the cloud function if it does not exist
if not cf_endpoint:
cf_endpoint = self.create_cloud_function(
Expand All @@ -630,7 +662,11 @@ def provision_bq_remote_function(
vpc_connector=cloud_function_vpc_connector,
vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
memory_mib=cloud_function_memory_mib,
cpus=cloud_function_cpus,
ingress_settings=cloud_function_ingress_settings,
workers=workers,
threads=threads,
concurrency=concurrency,
)
else:
logger.info(f"Cloud function {cloud_function_name} already exists.")
Expand Down Expand Up @@ -696,3 +732,19 @@ def get_remote_function_specs(self, remote_function_name):
# Note: list_routines doesn't make an API request until we iterate on the response object.
pass
return (http_endpoint, bq_connection)


def _infer_cpus_from_memory(memory_mib: int) -> int:
if memory_mib <= 2048:
# in actuality, will be 0.583 for 1024mb, 0.33 for 512mb, etc, but we round up to 1
return 1
elif memory_mib <= 8192:
return 2
elif memory_mib <= 8192:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dup

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return 2
elif memory_mib <= 16384:
return 4
elif memory_mib <= 32768:
return 8
else:
raise ValueError("Cloud run support at most 32768MiB per instance")
8 changes: 7 additions & 1 deletion bigframes/functions/_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ def remote_function(
cloud_function_vpc_connector_egress_settings: Optional[
Literal["all", "private-ranges-only", "unspecified"]
] = None,
cloud_function_memory_mib: Optional[int] = 1024,
cloud_function_memory_mib: Optional[int] = None,
cloud_function_cpus: Optional[int] = None,
cloud_function_ingress_settings: Literal[
"all", "internal-only", "internal-and-gclb"
] = "internal-only",
Expand Down Expand Up @@ -444,6 +445,10 @@ def remote_function(
default memory of cloud functions be allocated, pass `None`. See
for more details
https://cloud.google.com/functions/docs/configuring/memory.
cloud_function_cpus (int, Optional):
The amounts of memory (in mebibytes) to allocate for the cloud
function (2nd gen) created.
https://docs.cloud.google.com/run/docs/configuring/services/cpu.
cloud_function_ingress_settings (str, Optional):
Ingress settings controls dictating what traffic can reach the
function. Options are: `all`, `internal-only`, or `internal-and-gclb`.
Expand Down Expand Up @@ -638,6 +643,7 @@ def wrapper(func):
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
cloud_function_memory_mib=cloud_function_memory_mib,
cloud_function_cpus=cloud_function_cpus,
cloud_function_ingress_settings=cloud_function_ingress_settings,
bq_metadata=bqrf_metadata,
)
Expand Down
4 changes: 3 additions & 1 deletion bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ def remote_function(
cloud_function_vpc_connector_egress_settings: Optional[
Literal["all", "private-ranges-only", "unspecified"]
] = None,
cloud_function_memory_mib: Optional[int] = 1024,
cloud_function_memory_mib: Optional[int] = None,
cloud_function_cpus: Optional[int] = None,
cloud_function_ingress_settings: Literal[
"all", "internal-only", "internal-and-gclb"
] = "internal-only",
Expand All @@ -112,6 +113,7 @@ def remote_function(
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
cloud_function_memory_mib=cloud_function_memory_mib,
cloud_function_cpus=cloud_function_cpus,
cloud_function_ingress_settings=cloud_function_ingress_settings,
cloud_build_service_account=cloud_build_service_account,
)
Expand Down
8 changes: 7 additions & 1 deletion bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1537,7 +1537,8 @@ def remote_function(
cloud_function_vpc_connector_egress_settings: Optional[
Literal["all", "private-ranges-only", "unspecified"]
] = None,
cloud_function_memory_mib: Optional[int] = 1024,
cloud_function_memory_mib: Optional[int] = None,
cloud_function_cpus: Optional[int] = None,
cloud_function_ingress_settings: Literal[
"all", "internal-only", "internal-and-gclb"
] = "internal-only",
Expand Down Expand Up @@ -1718,6 +1719,10 @@ def remote_function(
default memory of cloud functions be allocated, pass `None`. See
for more details
https://cloud.google.com/functions/docs/configuring/memory.
cloud_function_cpus (int, Optional):
The amounts of memory (in mebibytes) to allocate for the cloud
function (2nd gen) created.
https://docs.cloud.google.com/run/docs/configuring/services/cpu.
cloud_function_ingress_settings (str, Optional):
Ingress settings controls dictating what traffic can reach the
function. Options are: `all`, `internal-only`, or `internal-and-gclb`.
Expand Down Expand Up @@ -1768,6 +1773,7 @@ def remote_function(
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
cloud_function_memory_mib=cloud_function_memory_mib,
cloud_function_cpus=cloud_function_cpus,
cloud_function_ingress_settings=cloud_function_ingress_settings,
cloud_build_service_account=cloud_build_service_account,
)
Expand Down
Loading