Skip to content

RuntimeError: Event loop is closed caused by Gemini.api_client @cached_property #5538

@ypenn21

Description

@ypenn21

🔴 Required Information

Please ensure all items in this section are completed to allow for efficient
triaging. Requests without complete information may be rejected / deprioritized.
If an item is not applicable to you - please mark it as N/A

Describe the Bug:

My work flow is cloud run backend api call down stream agent engine agents. This is an architecture with workflow:

iap for auth-> cloud run for ui & backend -> agent engine agent

In the agent stack trace:

File "/usr/local/lib/python3.12/asyncio/base_events.py", line 545, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed"

This happens constantly after I utilize the agent engine api to prompt agent after sending 3-5 requests.

Issue Summary:

When deployed to multi-threaded environments like Vertex AI Agent Engine, the ADK Runner spawns a fresh OS thread and a brand-new asyncio event loop for every incoming request.

  1. Request 1: Creates the api_client (and its underlying aiohttp session). This session is permanently bound to Loop A.
  2. Request 2: ADK reuses the same Agent instance. The Gemini model returns the cached api_client.
  3. Failure: The client attempts to perform I/O using the now-closed Loop A while running inside the new Loop B, resulting in RuntimeError: Event loop is closed.

Steps to Reproduce:
Please provide a numbered list of steps to reproduce the behavior:

  1. create agent..
Agent(
        name="shoper agent",
        model=Gemini(
            model="gemini-3-flash-preview",
            retry_options=types.HttpRetryOptions(attempts=3),
            api_client=None, # Force fresh client creation for this instance
        ),
        description="shopper management agent. Use this agent to look up shopping mall.",
        instruction="""You are the main customer orchestrator. Look up shopping mall using the `get_store` tool.
        """,
        tools=[
            get_store,
            LongRunningFunctionTool(func=request_user_input),
        ],
    )
  1. initiate adk agent to be deployed with AdkApp wrapper:
from vertexai.agent_engines import AdkApp
from customers.agent import root_agent

app = AdkApp(agent=root_agent)
  1. deploy to agent engine..
import vertexai
from vertexai import agent_engines

REQUIREMENTS = [
    "google-cloud-aiplatform[evaluation,agent-engines]>=1.130.0",
    "google-adk>=1.31.1,<2.0.0",
    "a2a-sdk~=0.3.22",
    "google-auth>=2.29.0",
    "nest-asyncio>=1.6.0,<2.0.0",
    "opentelemetry-instrumentation-google-genai>=0.1.0,<1.0.0",
    "opentelemetry-instrumentation-fastapi",
    "opentelemetry-instrumentation-grpc",
    "opentelemetry-instrumentation-httpx",
    "gcsfs>=2024.11.0",
    "google-cloud-logging>=3.12.0,<4.0.0",
    "protobuf>=6.31.1,<7.0.0",
    "starlette>=0.37.0",
    "PyJWT>=2.8.0",
    "pydantic==2.12.5",
    "cloudpickle==3.1.2",
]

    config = {
        "display_name": DISPLAY_NAME,
        "description": DESCRIPTION,
        "requirements": REQUIREMENTS,
        "extra_packages": EXTRA_PACKAGES,
        "env_vars": environment_variables,
        "identity_type": types.IdentityType.AGENT_IDENTITY,
        "staging_bucket": staging_bucket,
        "max_instances": 3,
    }
    client = vertexai.Client(project=project_id, location=location, credentials=credentials)
        remote_app = client.agent_engines.create(
            agent=app,
            config=config
        )
  1. create session and prompt the agent in agent engine using api
import vertexai

logger = logging.getLogger(__name__)

PROJECT_ID = os.environ.get("PROJECT_ID")
LOCATION   = os.environ.get("LOCATION", "us-central1")
ENGINE_ID  = os.environ.get(
    "CUSTOMERS_ENGINE_ID"
)

async def query_agent(
    user_message: str,
    user_id: str = "web_user",
    session_id: Optional[str] = None,
    force_new: bool = False,
    iap_token: Optional[str] = None,
    user_timezone: str = "UTC",
) -> tuple[str, str]:
    try:
        grpc_client = get_grpc_client()

        # 1. Handle Metadata (Headers)
        # In gRPC, headers are passed as a list of tuples (key, value)
        # Key MUST be lowercase
        token_str = iap_token
        if isinstance(token_str, bytes):
            token_str = token_str.decode("utf-8")

        metadata = []
        if token_str:
            logger.info("Raw x-user-identity value setting for customer agent: %s", token_str)
            metadata.append(("x-user-identity", token_str))
        if user_timezone:
            metadata.append(("x-user-timezone", user_timezone))

        # 2. Ensure we have a session ID
        if not session_id or force_new:
            session_client = _get_session_client()
            request = aiplatform_v1.CreateSessionRequest(
                parent=ENGINE_ID,
                session=aiplatform_v1.Session(user_id=user_id)
            )
            # Pass metadata to create_session so the agent receives the identity header
            operation = await session_client.create_session(request=request, metadata=metadata)
            if hasattr(operation, "result"):
                 session_obj = await operation.result()
            else:
                 session_obj = operation
            
            session_id = session_obj.name.split("/")[-1]
            logger.info("Created new session: %s", session_id)

        # 3. Prepare the Input Struct
        input_dict = {"message": user_message, "user_id": user_id}
        if session_id:
            input_dict["session_id"] = session_id
        
        # Explicitly pass run_config in the input struct to ensure ADK receives it
        run_config_metadata = {}
        if token_str:
            run_config_metadata["x-user-identity"] = token_str
        if user_timezone:
            run_config_metadata["x-user-timezone"] = user_timezone

        if run_config_metadata:
            input_dict["run_config"] = {
                "custom_metadata": run_config_metadata
            }
        
        input_struct = struct_pb2.Struct()
        input_struct.update(input_dict)

        # 3. Build the Request
        # Note: StreamQueryReasoningEngineRequest uses the reasoning_engine name
        resource_name = ENGINE_ID

        request = aiplatform_v1.StreamQueryReasoningEngineRequest(
            name=resource_name,
            input=input_struct,
        )

Expected Behavior:
prompt and response without event loop closed error after 3-5 user messages send to the agent engine agent..

Observed Behavior:
What actually happened? Include error messages or crash stack traces here.
Agent engine agent gets stack trace:

RuntimeError: Event loop is closed

(Note* more detailed error message below at Logs section)

Environment Details:

  • ADK Library Version (pip show google-adk): "google-adk==1.31.1" (also tried 1.27.4 and 1.28.0)
  • Agent Engine Run time in GCP
  • Python Version (python -V): 3.12

Model Information:

  • Are you using LiteLLM: no
  • Which model is being used: gemini-3.0-flash

Work Around

added pre llm call hook in the agent:

from google.adk.agents import Agent
from google.adk.apps import App
from google.adk.models import Gemini
from google.adk.tools import LongRunningFunctionTool
from google.adk.tools.tool_context import ToolContext
from google.genai import types
async def detect_loop_mismatch(llm_request: LlmRequest, callback_context: CallbackContext, **kwargs) -> LlmResponse | None:
    """Callback to detect and log potential event loop mismatches before model calls."""
    exec_ctx = get_execution_context()
    logger.info(f"{exec_ctx} Starting model call for agent '{callback_context.agent_name}'")
    
    # In Reasoning Engine, the event loop can change between requests if the runner
    # uses a thread pool. Since the Gemini model's api_client (and its aiohttp session)
    # is cached on the first access, we must ensure it is reset if we are in a new loop
    # to avoid "RuntimeError: ... attached to a different loop".
    try:
        agent = callback_context._invocation_context.agent
        model = agent.model
        if hasattr(model, 'api_client'):
            # Force recreation of the cached api_client if it exists
            if 'api_client' in model.__dict__:
                logger.info(f"{exec_ctx} Resetting cached api_client for model '{callback_context.agent_name}' to avoid loop mismatch.")
                del model.__dict__['api_client']
            if '_api_backend' in model.__dict__:
                 del model.__dict__['_api_backend']
            if '_live_api_client' in model.__dict__:
                 del model.__dict__['_live_api_client']
    except Exception as e:
        logger.warning(f"Failed to reset api_client cache: {e}")

    return None

agent = Agent(
        name="shoper agent",
        model=Gemini(
            model="gemini-3-flash-preview",
            retry_options=types.HttpRetryOptions(attempts=3),
            api_client=None, # Force fresh client creation for this instance
        ),
        description="shopper management agent. Use this agent to look up shopping mall.",
        instruction="""You are the main customer orchestrator. Look up shopping mall using the `get_store` tool.
        """,
        tools=[
            get_store,
            LongRunningFunctionTool(func=request_user_input),
        ],
        before_model_callback=detect_loop_mismatch,
    )

🟡 Optional Information

Providing this information greatly speeds up the resolution process.

Regression:
Did this work in a previous version of ADK? If so, which one?
N/A

Logs:
Please attach relevant logs. Wrap them in code blocks (```) or attach a
text file.

{
insertId: "69f14a6c0009a084e90c708f"
logName: "projects/agent-security-patterns/logs/aiplatform.googleapis.com%2Freasoning_engine_stderr"
receiveTimestamp: "2026-04-29T00:01:48.682175301Z"
resource: {2}
severity: "ERROR"
textPayload: "Traceback (most recent call last):
  File "/usr/local/lib/python3.12/threading.py", line 1075, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.12/threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "/code/.venv/lib/python3.12/site-packages/google/adk/runners.py", line 486, in _asyncio_thread_main
    asyncio.run(_invoke_run_async())
  File "/usr/local/lib/python3.12/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 691, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/google/adk/runners.py", line 479, in _invoke_run_async
    async for event in agen:
  File "/code/.venv/lib/python3.12/site-packages/google/adk/runners.py", line 632, in run_async
    async for event in agen:
  File "/code/.venv/lib/python3.12/site-packages/google/adk/runners.py", line 617, in _run_with_trace
    async for event in agen:
  File "/code/.venv/lib/python3.12/site-packages/google/adk/runners.py", line 881, in _exec_with_plugin
    async for event in agen:
  File "/code/.venv/lib/python3.12/site-packages/google/adk/runners.py", line 606, in execute
    async for event in agen:
  File "/code/.venv/lib/python3.12/site-packages/google/adk/agents/base_agent.py", line 297, in run_async
    async for event in agen:
  File "/code/.venv/lib/python3.12/site-packages/google/adk/agents/llm_agent.py", line 487, in _run_async_impl
    async for event in agen:
  File "/code/.venv/lib/python3.12/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 804, in run_async
    async for event in agen:
  File "/code/.venv/lib/python3.12/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 881, in _run_one_step_async
    async for llm_response in agen:
  File "/code/.venv/lib/python3.12/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 1261, in _call_llm_async
    async for event in agen:
  File "/code/.venv/lib/python3.12/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 1239, in _call_llm_with_tracing
    async for llm_response in agen:
  File "/code/.venv/lib/python3.12/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 1322, in _run_and_handle_error
    async for response in agen:
  File "/code/.venv/lib/python3.12/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 406, in _run_and_handle_error
    raise model_error
  File "/code/.venv/lib/python3.12/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 379, in _run_and_handle_error
    async for llm_response in agen:
  File "/code/.venv/lib/python3.12/site-packages/google/adk/models/google_llm.py", line 245, in generate_content_async
    response = await self.api_client.aio.models.generate_content(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/opentelemetry/instrumentation/google_genai/generate_content.py", line 1180, in instrumented_generate_content
    response = await wrapped_func(
               ^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/google/genai/models.py", line 8337, in generate_content
    return await self._generate_content(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/google/genai/models.py", line 6897, in _generate_content
    response = await self._api_client.async_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/google/genai/_api_client.py", line 1583, in async_request
    result = await self._async_request(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/google/genai/_api_client.py", line 1516, in _async_request
    return await self._async_retry(  # type: ignore[no-any-return]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 112, in __call__
    do = await self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 157, in iter
    result = await action(retry_state)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/tenacity/_utils.py", line 111, in inner
    return call(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/tenacity/__init__.py", line 393, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/code/.venv/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 116, in __call__
    result = await fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/google/genai/_api_client.py", line 1440, in _async_request_once
    response = await self._aiohttp_session.request(  # type: ignore[union-attr]
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/google/auth/aio/transport/sessions.py", line 293, in request
    response = await with_timeout(
               ^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/google/auth/aio/transport/sessions.py", line 78, in with_timeout
    response = await asyncio.wait_for(coro, remaining)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/tasks.py", line 520, in wait_for
    return await fut
           ^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/google/auth/aio/transport/aiohttp.py", line 174, in __call__
    response = await self._session.request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/aiohttp/client.py", line 788, in _request
    resp = await handler(req)
           ^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/aiohttp/client.py", line 742, in _connect_and_send_request
    conn = await self._connector.connect(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/aiohttp/connector.py", line 672, in connect
    proto = await self._create_connection(req, traces, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/aiohttp/connector.py", line 1251, in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/aiohttp/connector.py", line 1574, in _create_direct_connection
    hosts = await self._resolve_host(host, port, traces=traces)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/aiohttp/connector.py", line 1190, in _resolve_host
    return await asyncio.shield(resolved_host_task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/aiohttp/connector.py", line 1221, in _resolve_host_with_throttle
    addrs = await self._resolver.resolve(host, port, family=self._family)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/code/.venv/lib/python3.12/site-packages/aiohttp/resolver.py", line 40, in resolve
    infos = await self._loop.getaddrinfo(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 905, in getaddrinfo
    return await self.run_in_executor(
                 ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 854, in run_in_executor
    self._check_closed()
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 545, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed"
timestamp: "2026-04-29T00:01:48.630916Z"
}

Additional Context:
Add any other context about the problem here. This is an architecture with workflow:
iap for auth-> cloud run for ui & backend -> agent engine agent

sequenceDiagram
    participant User as User (Browser)
    participant IAP as Identity-Aware Proxy
    participant FastAPI as fast-api-fe (Cloud Run)

    User->>IAP: Access Chat UI / API
    alt Not Authenticated
        IAP-->>User: Redirect to Google Login
        User->>IAP: Authenticate
    end
    IAP->>FastAPI: Forward authenticated request

    User->>FastAPI: Chat Message (UI)
    Note over FastAPI: POST /v1/chat/completions
    FastAPI->>ShoppingAgent: query_agent(message)
    Note over ShoppingAgent: Handles greeting, lookup,<br/>or detects store intent

    alt Needs StoreAgent
        ShoppingAgent->>StoreAgent: Delegate task (Agent Engine REST API)
        StoreAgent-->>ShoppingAgent: Store result
    end

    ShoppingAgent-->>FastAPI: Final text response
    FastAPI-->>User: Chat Bubble
Loading

How often has this issue occurred?:

  • Always (100%) after 3-5 user prompts sent to the agent in gcp agent engine

Metadata

Metadata

Labels

core[Component] This issue is related to the core interface and implementationrequest clarification[Status] The maintainer need clarification or more information from the author

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions