-
Notifications
You must be signed in to change notification settings - Fork 185
Add First-Class Google Gen AI SDK Integration to Contrib #1378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
JasonSteving99
wants to merge
10
commits into
main
Choose a base branch
from
jason-experiment-gemini-sdk-integration
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 3 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
bf58318
Add First-Class Gemini SDK Integration to Contrib
JasonSteving99 2ec1474
Merge branch 'main' into jason-experiment-gemini-sdk-integration
JasonSteving99 01c0bd7
update lock
JasonSteving99 48264dc
address PR feedback
JasonSteving99 e682e3d
Merge remote-tracking branch 'origin/main' into jason-experiment-gemi…
JasonSteving99 82a6179
upper bound google-genai dep
JasonSteving99 55a663a
move to `.../contrib/google_genai/`
JasonSteving99 63a26cc
rename `gemini_client` -> `google_genai_client`
JasonSteving99 bd0221b
Rename `GeminiPlugin` -> `GoogleGenAIPlugin`
JasonSteving99 a89e923
add to codeowners
JasonSteving99 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| """First-class Temporal integration for the Google Gemini SDK. | ||
|
|
||
| .. warning:: | ||
| This module is experimental and may change in future versions. | ||
| Use with caution in production environments. | ||
|
|
||
| This integration lets you use the Gemini SDK's async client with full | ||
| automatic function calling (AFC) support, where every API call and every | ||
| tool invocation is a **durable Temporal activity**. | ||
|
|
||
| No credentials are fetched in the workflow, and no auth material appears in | ||
| Temporal's event history. | ||
|
|
||
| - :class:`GeminiPlugin` — registers the ``gemini_api_client_async_request`` | ||
| activity using a caller-provided ``genai.Client`` on the worker side. | ||
| - :func:`gemini_client` — call from a workflow to get an ``AsyncClient`` | ||
| that routes API calls through activities. | ||
| - :func:`activity_as_tool` — convert any ``@activity.defn`` function into a | ||
| Gemini tool callable; Gemini's AFC invokes it as a Temporal activity. | ||
|
|
||
| Quickstart:: | ||
|
|
||
| # ---- worker setup (outside sandbox) ---- | ||
|
JasonSteving99 marked this conversation as resolved.
Outdated
|
||
| client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) | ||
| plugin = GeminiPlugin(client) | ||
|
|
||
| @activity.defn | ||
| async def get_weather(state: str) -> str: ... | ||
|
|
||
| # ---- workflow (sandbox-safe) ---- | ||
| @workflow.defn | ||
| class AgentWorkflow: | ||
| @workflow.run | ||
| async def run(self, query: str) -> str: | ||
| client = gemini_client() | ||
| response = await client.models.generate_content( | ||
| model="gemini-2.5-flash", | ||
| contents=query, | ||
| config=types.GenerateContentConfig( | ||
| tools=[ | ||
| activity_as_tool( | ||
| get_weather, | ||
| start_to_close_timeout=timedelta(seconds=30), | ||
|
JasonSteving99 marked this conversation as resolved.
Outdated
|
||
| ), | ||
| ], | ||
| ), | ||
| ) | ||
| return response.text | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from temporalio.contrib.google_gemini_sdk._gemini_plugin import GeminiPlugin | ||
| from temporalio.contrib.google_gemini_sdk.workflow import ( | ||
| activity_as_tool, | ||
| gemini_client, | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| "GeminiPlugin", | ||
| "activity_as_tool", | ||
| "gemini_client", | ||
| ] | ||
174 changes: 174 additions & 0 deletions
174
temporalio/contrib/google_gemini_sdk/_gemini_activity.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,174 @@ | ||
| """Temporal activity that executes Gemini SDK API calls with real credentials. | ||
|
|
||
| The ``TemporalApiClient`` in the workflow dispatches calls here. This | ||
| activity holds a user-provided ``genai.Client`` and forwards structured | ||
| requests. Credentials are fetched/refreshed only within the activity — | ||
| they never appear in workflow event history. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Sequence | ||
| from typing import Any, Callable | ||
|
|
||
| import google.auth.credentials | ||
| from google.genai import Client as GeminiClient | ||
| from google.genai import types | ||
| from google.genai.types import HttpOptions | ||
| from google.genai.types import HttpResponse as SdkHttpResponse | ||
|
|
||
| from temporalio import activity | ||
| from temporalio.contrib.google_gemini_sdk._models import ( | ||
| _GeminiApiRequest, | ||
| _GeminiApiResponse, | ||
| _GeminiApiStreamedResponse, | ||
| _GeminiDownloadFileRequest, | ||
| _GeminiRegisterFilesRequest, | ||
| _GeminiUploadFileRequest, | ||
| _GeminiUploadToFileSearchStoreRequest, | ||
| ) | ||
|
|
||
|
|
||
| def _resolve_http_options( | ||
| overrides: Any, | ||
| ) -> HttpOptions | None: | ||
| """Reconstruct ``HttpOptions`` from serializable overrides, or None.""" | ||
| if overrides is None: | ||
| return None | ||
| return HttpOptions.model_validate(overrides.model_dump(exclude_none=True)) | ||
|
|
||
|
|
||
| class GeminiApiCaller: | ||
| """Wraps a ``genai.Client`` and exposes Temporal activities for SDK calls. | ||
|
|
||
| The caller owns a reference to the user-provided ``genai.Client``. | ||
| All credential management, HTTP client configuration, etc. is the | ||
| responsibility of whoever constructs the client. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| client: GeminiClient, | ||
| credentials: google.auth.credentials.Credentials | None = None, | ||
| ) -> None: | ||
| """Initialize with a genai.Client and optional extra credentials.""" | ||
| self._client = client | ||
| self._credentials = credentials | ||
|
|
||
| def activities(self) -> Sequence[Callable]: | ||
| """Return activities that route SDK calls through this client.""" | ||
|
|
||
| @activity.defn(name="gemini_api_client_async_request") | ||
|
JasonSteving99 marked this conversation as resolved.
Outdated
|
||
| async def gemini_api_client_async_request( | ||
| req: _GeminiApiRequest, | ||
| ) -> _GeminiApiResponse: | ||
| """Execute a Gemini SDK API call with real credentials.""" | ||
| response: SdkHttpResponse = ( | ||
| await self._client.aio._api_client.async_request( | ||
| http_method=req.http_method, | ||
| path=req.path, | ||
| request_dict=req.request_dict, | ||
| http_options=_resolve_http_options(req.http_options_overrides), | ||
| ) | ||
| ) | ||
| return _GeminiApiResponse( | ||
| headers=response.headers or {}, | ||
| body=response.body or "", | ||
| ) | ||
|
|
||
| @activity.defn(name="gemini_api_client_async_request_streamed") | ||
| async def gemini_api_client_async_request_streamed( | ||
| req: _GeminiApiRequest, | ||
| ) -> _GeminiApiStreamedResponse: | ||
| """Execute a streamed Gemini SDK API call, collecting all chunks.""" | ||
| stream = await self._client.aio._api_client.async_request_streamed( | ||
| http_method=req.http_method, | ||
| path=req.path, | ||
| request_dict=req.request_dict, | ||
| http_options=_resolve_http_options(req.http_options_overrides), | ||
| ) | ||
| chunks = [] | ||
| async for chunk in stream: | ||
| chunks.append( | ||
| _GeminiApiResponse( | ||
| headers=chunk.headers or {}, | ||
| body=chunk.body or "", | ||
| ) | ||
| ) | ||
| return _GeminiApiStreamedResponse(chunks=chunks) | ||
|
|
||
| @activity.defn(name="gemini_files_upload") | ||
| async def gemini_files_upload( | ||
| req: _GeminiUploadFileRequest, | ||
| ) -> types.File: | ||
| """Upload a file using the real genai.Client on the worker.""" | ||
| if req.file_bytes is not None: | ||
| import io | ||
|
|
||
| file_arg: Any = io.BytesIO(req.file_bytes) | ||
| else: | ||
| file_arg = req.file_path | ||
|
|
||
| return await self._client.aio.files.upload(file=file_arg, config=req.config) | ||
|
|
||
| @activity.defn(name="gemini_files_download") | ||
| async def gemini_files_download( | ||
| req: _GeminiDownloadFileRequest, | ||
| ) -> bytes: | ||
| """Download a file using the real genai.Client on the worker.""" | ||
| return await self._client.aio.files.download( | ||
| file=req.file, config=req.config | ||
| ) | ||
|
|
||
| @activity.defn(name="gemini_files_register") | ||
| async def gemini_files_register( | ||
| req: _GeminiRegisterFilesRequest, | ||
| ) -> types.RegisterFilesResponse: | ||
| """Register GCS files using the real genai.Client on the worker. | ||
|
|
||
| Uses ``credentials`` if provided at plugin init, | ||
| otherwise falls back to the client's own credentials. | ||
| Token refresh happens here on the worker side, so no auth | ||
| material enters the workflow event history. | ||
| """ | ||
| auth = self._credentials or self._client._api_client._credentials | ||
| if auth is None: | ||
| raise ValueError( | ||
| "No credentials available for register_files(). " | ||
| "Pass extra_credentials to GeminiPlugin or initialize " | ||
| "the genai.Client with credentials." | ||
| ) | ||
| return await self._client.aio.files.register_files( | ||
| auth=auth, | ||
| uris=req.uris, | ||
| config=req.config, | ||
| ) | ||
|
|
||
| @activity.defn(name="gemini_file_search_stores_upload") | ||
| async def gemini_file_search_stores_upload( | ||
| req: _GeminiUploadToFileSearchStoreRequest, | ||
| ) -> types.UploadToFileSearchStoreOperation: | ||
| """Upload a file to a file search store on the worker.""" | ||
| if req.file_bytes is not None: | ||
| import io | ||
|
|
||
| file_arg: Any = io.BytesIO(req.file_bytes) | ||
| else: | ||
| file_arg = req.file_path | ||
|
|
||
| return ( | ||
| await self._client.aio.file_search_stores.upload_to_file_search_store( | ||
| file_search_store_name=req.file_search_store_name, | ||
| file=file_arg, | ||
| config=req.config, | ||
| ) | ||
| ) | ||
|
|
||
| return [ | ||
| gemini_api_client_async_request, | ||
| gemini_api_client_async_request_streamed, | ||
| gemini_files_upload, | ||
| gemini_files_download, | ||
| gemini_files_register, | ||
| gemini_file_search_stores_upload, | ||
| ] | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| """Temporal plugin for Google Gemini SDK integration.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import dataclasses | ||
|
|
||
| import google.auth.credentials | ||
| from google.genai import Client as GeminiClient | ||
|
|
||
| from temporalio.contrib.google_gemini_sdk._gemini_activity import GeminiApiCaller | ||
| from temporalio.contrib.pydantic import PydanticPayloadConverter | ||
| from temporalio.converter import DataConverter, DefaultPayloadConverter | ||
| from temporalio.plugin import SimplePlugin | ||
| from temporalio.worker import WorkflowRunner | ||
| from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner | ||
|
|
||
|
|
||
| def _data_converter(converter: DataConverter | None) -> DataConverter: | ||
| if converter is None: | ||
| return DataConverter(payload_converter_class=PydanticPayloadConverter) | ||
| elif converter.payload_converter_class is DefaultPayloadConverter: | ||
| return dataclasses.replace( | ||
| converter, payload_converter_class=PydanticPayloadConverter | ||
| ) | ||
| return converter | ||
|
|
||
|
|
||
| class GeminiPlugin(SimplePlugin): | ||
| """A Temporal Worker Plugin configured for the Google Gemini SDK. | ||
|
|
||
| .. warning:: | ||
| This class is experimental and may change in future versions. | ||
| Use with caution in production environments. | ||
|
|
||
| This plugin registers the ``gemini_api_client_async_request`` activity | ||
| using the provided ``genai.Client`` with real credentials. Workflows use | ||
| :func:`~temporalio.contrib.google_gemini_sdk.workflow.gemini_client` to | ||
| get an ``AsyncClient`` backed by a ``TemporalApiClient`` that routes all | ||
| API calls through this activity. | ||
|
|
||
| No credentials are passed to or from the workflow. Auth material never | ||
| appears in Temporal's event history. | ||
|
|
||
| Example (Gemini Developer API):: | ||
|
|
||
| client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) | ||
| plugin = GeminiPlugin(client) | ||
|
|
||
| Example (Vertex AI):: | ||
|
|
||
| client = genai.Client( | ||
| vertexai=True, project="my-project", location="us-central1", | ||
| ) | ||
| plugin = GeminiPlugin(client) | ||
|
|
||
| Example (with separate GCS credentials for file registration):: | ||
|
|
||
| client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) | ||
| gcs_creds, _ = google.auth.default() | ||
| plugin = GeminiPlugin(client, extra_credentials=gcs_creds) | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| client: GeminiClient, | ||
| extra_credentials: google.auth.credentials.Credentials | None = None, | ||
| ) -> None: | ||
| """Initialize the Gemini plugin. | ||
|
|
||
| Args: | ||
| client: A fully configured ``genai.Client`` instance. | ||
| All credential management, HTTP client configuration, etc. | ||
| is the responsibility of the caller. | ||
| extra_credentials: Optional Google Cloud credentials used for | ||
| operations that require explicit auth (e.g. | ||
| ``files.register_files()``). If not provided, the | ||
| client's own credentials are used. | ||
| """ | ||
| self._api_caller = GeminiApiCaller(client, credentials=extra_credentials) | ||
|
|
||
| def workflow_runner(runner: WorkflowRunner | None) -> WorkflowRunner: | ||
| if not runner: | ||
| raise ValueError("No WorkflowRunner provided to GeminiPlugin.") | ||
| if isinstance(runner, SandboxedWorkflowRunner): | ||
| return dataclasses.replace( | ||
| runner, | ||
| restrictions=runner.restrictions.with_passthrough_modules( | ||
| "google.genai" | ||
| ), | ||
| ) | ||
| return runner | ||
|
|
||
| super().__init__( | ||
| name="GeminiPlugin", | ||
| data_converter=_data_converter, | ||
| activities=self._api_caller.activities(), | ||
| workflow_runner=workflow_runner, | ||
| ) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.