Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ aioboto3 = [
"aioboto3>=10.4.0",
"types-aioboto3[s3]>=10.4.0",
]
google-genai = [
"google-genai>=1.66.0,<2.0.0",
]

[project.urls]
Homepage = "https://github.com/temporalio/sdk-python"
Expand Down
67 changes: 67 additions & 0 deletions temporalio/contrib/google_genai/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""First-class Temporal integration for the Google Gemini SDK.

.. warning::
This module is experimental and may change in future versions.
Use with caution in production environments.

This integration lets you use the Gemini SDK's async client with full
automatic function calling (AFC) support. Every API call becomes a
**durable Temporal activity**. Tools default to plain workflow methods
that run deterministically in-workflow; wrap any ``@activity.defn`` with
:func:`activity_as_tool` to run a tool as a durable activity instead.

No credentials are fetched in the workflow, and no auth material appears in
Temporal's event history.

- :class:`GeminiPlugin` — registers the ``gemini_api_client_async_request``
activity using a caller-provided ``genai.Client`` on the worker side.
- :func:`gemini_client` — call from a workflow to get an ``AsyncClient``
that routes API calls through activities.
- :func:`activity_as_tool` — convert any ``@activity.defn`` function into a
Gemini tool callable; Gemini's AFC invokes it as a Temporal activity.

Quickstart::

# ---- worker setup (outside the Temporal Python Sandbox) ----
client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
plugin = GeminiPlugin(client)

@activity.defn
async def get_weather(state: str) -> str: ...

# ---- workflow (inside the Temporal Python Sandbox) ----
@workflow.defn
class AgentWorkflow:
@workflow.run
async def run(self, query: str) -> str:
client = gemini_client()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if someone forgot to do this and used the client defined outside of the workflow instead? This seems like a common pitfall across a lot of our plugins...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also is there any use case for multiple clients? What if we had two clients and two API keys?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The supported answer today is multiple plugins → multiple workers, each with its own configured genai.Client and API key. If a single worker ever needs to dispatch a workflow to one of several keys, that's an additive future change — e.g., gemini_client(name="foo") correlating to a named client registered on the plugin — but it's outside the scope of this PR.

response = await client.models.generate_content(
model="gemini-2.5-flash",
contents=query,
config=types.GenerateContentConfig(
tools=[
activity_as_tool(
get_weather,
activity_config=ActivityConfig(
start_to_close_timeout=timedelta(seconds=30),
),
),
],
),
)
return response.text
"""

from __future__ import annotations

from temporalio.contrib.google_genai._gemini_plugin import GeminiPlugin
from temporalio.contrib.google_genai.workflow import (
activity_as_tool,
gemini_client,
)

__all__ = [
"GeminiPlugin",
"activity_as_tool",
"gemini_client",
]
174 changes: 174 additions & 0 deletions temporalio/contrib/google_genai/_gemini_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""Temporal activity that executes Gemini SDK API calls with real credentials.

The ``TemporalApiClient`` in the workflow dispatches calls here. This
activity holds a user-provided ``genai.Client`` and forwards structured
requests. Credentials are fetched/refreshed only within the activity —
they never appear in workflow event history.
"""

from __future__ import annotations

from collections.abc import Sequence
from typing import Any, Callable

import google.auth.credentials
from google.genai import Client as GeminiClient
from google.genai import types
from google.genai.types import HttpOptions
from google.genai.types import HttpResponse as SdkHttpResponse

from temporalio import activity
from temporalio.contrib.google_genai._models import (
_GeminiApiRequest,
_GeminiApiResponse,
_GeminiApiStreamedResponse,
_GeminiDownloadFileRequest,
_GeminiRegisterFilesRequest,
_GeminiUploadFileRequest,
_GeminiUploadToFileSearchStoreRequest,
)


def _resolve_http_options(
overrides: Any,
) -> HttpOptions | None:
"""Reconstruct ``HttpOptions`` from serializable overrides, or None."""
if overrides is None:
return None
return HttpOptions.model_validate(overrides.model_dump(exclude_none=True))


class GeminiApiCaller:
"""Wraps a ``genai.Client`` and exposes Temporal activities for SDK calls.

The caller owns a reference to the user-provided ``genai.Client``.
All credential management, HTTP client configuration, etc. is the
responsibility of whoever constructs the client.
"""

def __init__(
self,
client: GeminiClient,
credentials: google.auth.credentials.Credentials | None = None,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these optional because you can use environment variables instead? Or can you actually use Gemini SDK without credentials?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These specific credentials are only consumed by the register_files activity, which talks to GCS to register Google Cloud Storage URIs with a Gemini file search store. So the parameter is optional because most workflows never touch file search stores — they only need to pass extra_credentials if they're registering GCS-hosted files that the gemini sdk needs to read for them.

For the normal Gemini API surface, auth is whatever you configured on the genai.Client you handed to the plugin (API key, ADC, env vars, etc.).

) -> None:
"""Initialize with a genai.Client and optional extra credentials."""
self._client = client
self._credentials = credentials

def activities(self) -> Sequence[Callable]:
"""Return activities that route SDK calls through this client."""

@activity.defn
async def gemini_api_client_async_request(
req: _GeminiApiRequest,
) -> _GeminiApiResponse:
"""Execute a Gemini SDK API call with real credentials."""
response: SdkHttpResponse = (
await self._client.aio._api_client.async_request(
http_method=req.http_method,
path=req.path,
request_dict=req.request_dict,
http_options=_resolve_http_options(req.http_options_overrides),
)
)
return _GeminiApiResponse(
headers=response.headers or {},
body=response.body or "",
)

@activity.defn
async def gemini_api_client_async_request_streamed(
req: _GeminiApiRequest,
) -> _GeminiApiStreamedResponse:
"""Execute a streamed Gemini SDK API call, collecting all chunks."""
stream = await self._client.aio._api_client.async_request_streamed(
http_method=req.http_method,
path=req.path,
request_dict=req.request_dict,
http_options=_resolve_http_options(req.http_options_overrides),
)
chunks = []
async for chunk in stream:
chunks.append(
_GeminiApiResponse(
headers=chunk.headers or {},
body=chunk.body or "",
)
)
return _GeminiApiStreamedResponse(chunks=chunks)

@activity.defn
async def gemini_files_upload(
req: _GeminiUploadFileRequest,
) -> types.File:
"""Upload a file using the real genai.Client on the worker."""
if req.file_bytes is not None:
import io

file_arg: Any = io.BytesIO(req.file_bytes)
else:
file_arg = req.file_path

return await self._client.aio.files.upload(file=file_arg, config=req.config)

@activity.defn
async def gemini_files_download(
req: _GeminiDownloadFileRequest,
) -> bytes:
"""Download a file using the real genai.Client on the worker."""
return await self._client.aio.files.download(
file=req.file, config=req.config
)

@activity.defn
async def gemini_files_register(
req: _GeminiRegisterFilesRequest,
) -> types.RegisterFilesResponse:
"""Register GCS files using the real genai.Client on the worker.

Uses ``credentials`` if provided at plugin init,
otherwise falls back to the client's own credentials.
Token refresh happens here on the worker side, so no auth
material enters the workflow event history.
"""
auth = self._credentials or self._client._api_client._credentials
if auth is None:
raise ValueError(
"No credentials available for register_files(). "
"Pass extra_credentials to GeminiPlugin or initialize "
"the genai.Client with credentials."
)
return await self._client.aio.files.register_files(
auth=auth,
uris=req.uris,
config=req.config,
)

@activity.defn
async def gemini_file_search_stores_upload(
req: _GeminiUploadToFileSearchStoreRequest,
) -> types.UploadToFileSearchStoreOperation:
"""Upload a file to a file search store on the worker."""
if req.file_bytes is not None:
import io

file_arg: Any = io.BytesIO(req.file_bytes)
else:
file_arg = req.file_path

return (
await self._client.aio.file_search_stores.upload_to_file_search_store(
file_search_store_name=req.file_search_store_name,
file=file_arg,
config=req.config,
)
)

return [
gemini_api_client_async_request,
gemini_api_client_async_request_streamed,
gemini_files_upload,
gemini_files_download,
gemini_files_register,
gemini_file_search_stores_upload,
]
98 changes: 98 additions & 0 deletions temporalio/contrib/google_genai/_gemini_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Temporal plugin for Google Gemini SDK integration."""

from __future__ import annotations

import dataclasses

import google.auth.credentials
from google.genai import Client as GeminiClient

from temporalio.contrib.google_genai._gemini_activity import GeminiApiCaller
from temporalio.contrib.pydantic import PydanticPayloadConverter
from temporalio.converter import DataConverter, DefaultPayloadConverter
from temporalio.plugin import SimplePlugin
from temporalio.worker import WorkflowRunner
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner


def _data_converter(converter: DataConverter | None) -> DataConverter:
if converter is None:
return DataConverter(payload_converter_class=PydanticPayloadConverter)
elif converter.payload_converter_class is DefaultPayloadConverter:
return dataclasses.replace(
converter, payload_converter_class=PydanticPayloadConverter
)
return converter


class GeminiPlugin(SimplePlugin):
"""A Temporal Worker Plugin configured for the Google Gemini SDK.

.. warning::
This class is experimental and may change in future versions.
Use with caution in production environments.

This plugin registers the ``gemini_api_client_async_request`` activity
using the provided ``genai.Client`` with real credentials. Workflows use
:func:`~temporalio.contrib.google_genai.workflow.gemini_client` to
get an ``AsyncClient`` backed by a ``TemporalApiClient`` that routes all
API calls through this activity.

No credentials are passed to or from the workflow. Auth material never
appears in Temporal's event history.

Example (Gemini Developer API)::
Comment thread
JasonSteving99 marked this conversation as resolved.

client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
plugin = GeminiPlugin(client)

Example (Vertex AI)::

client = genai.Client(
vertexai=True, project="my-project", location="us-central1",
)
plugin = GeminiPlugin(client)

Example (with separate GCS credentials for file registration)::

client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
gcs_creds, _ = google.auth.default()
plugin = GeminiPlugin(client, extra_credentials=gcs_creds)
"""

def __init__(
self,
client: GeminiClient,
extra_credentials: google.auth.credentials.Credentials | None = None,
) -> None:
"""Initialize the Gemini plugin.

Args:
client: A fully configured ``genai.Client`` instance.
All credential management, HTTP client configuration, etc.
is the responsibility of the caller.
extra_credentials: Optional Google Cloud credentials used for
operations that require explicit auth (e.g.
``files.register_files()``). If not provided, the
client's own credentials are used.
"""
self._api_caller = GeminiApiCaller(client, credentials=extra_credentials)

def workflow_runner(runner: WorkflowRunner | None) -> WorkflowRunner:
if not runner:
raise ValueError("No WorkflowRunner provided to GeminiPlugin.")
if isinstance(runner, SandboxedWorkflowRunner):
return dataclasses.replace(
runner,
restrictions=runner.restrictions.with_passthrough_modules(
"google.genai"
),
)
return runner

super().__init__(
name="GeminiPlugin",
data_converter=_data_converter,
activities=self._api_caller.activities(),
workflow_runner=workflow_runner,
)
Loading
Loading