Skip to content

Commit a69f861

Browse files
haiyuan-eng-googlecopybara-github
authored andcommitted
fix: fix lifecycle issues with credentials in BigQuery Agent Analytics Plugin
### Changes **1. Pickle safety — try-preserve, fallback-drop** `__getstate__` tests whether `_user_credentials` is picklable: - **Picklable** (service-account, `AnonymousCredentials`): preserved — survives pickle and restored via `__setstate__` so the plugin uses the user's identity after unpickle. - **Non-picklable** (`compute_engine.Credentials` with `requests.Session`): dropped gracefully — falls back to ADC after unpickle. `_credentials` (the active/resolved credentials) is always cleared since it may hold resolved ADC state. On unpickle, `__setstate__` restores it from `_user_credentials` when available. **2. Fork safety — documented user-provided credential limitation** `_reset_runtime_state()` sets `_credentials = _user_credentials`. For ADC-resolved credentials (`_user_credentials is None`), this clears stale credentials for re-resolution. For user-provided credentials, the original object is kept — we cannot re-create it. The comment documents this: the user is responsible for providing fork-safe credentials. **3. GCS client — credentials passed correctly** `GCSOffloader.__init__` always creates a `storage.Client` eagerly (`storage_client or storage.Client(...)` at line 1329). This was also true before the credentials commit. The fix passes explicit credentials when available and lets ADC resolve when not, matching the `bigquery.Client` and `BigQueryWriteAsyncClient` patterns. **4. Benign race on `_credentials` resolution documented** When multiple event loops call `_create_loop_state()` concurrently, both can resolve ADC redundantly. This is idempotent and benign, now documented. Co-authored-by: Haiyuan Cao <haiyuan@google.com> PiperOrigin-RevId: 905111076
1 parent b848390 commit a69f861

2 files changed

Lines changed: 87 additions & 3 deletions

File tree

src/google/adk/plugins/bigquery_agent_analytics_plugin.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1988,6 +1988,7 @@ def __init__(
19881988
self._startup_error: Optional[Exception] = None
19891989
self._is_shutting_down = False
19901990
self._setup_lock = None
1991+
self._user_credentials = credentials
19911992
self._credentials = credentials
19921993
self.client = None
19931994
self._loop_state_by_loop: dict[asyncio.AbstractEventLoop, _LoopState] = {}
@@ -2106,6 +2107,10 @@ def get_credentials():
21062107
)
21072108
return creds
21082109

2110+
# Note: this read-then-write is not locked. If two event loops
2111+
# race here both will resolve ADC and write back the same creds.
2112+
# This is benign — the result is idempotent — so we accept the
2113+
# race rather than adding a lock for a one-time init path.
21092114
if self._credentials is None:
21102115
self._credentials = await loop.run_in_executor(
21112116
self._executor, get_credentials
@@ -2196,13 +2201,18 @@ async def _lazy_setup(self, **kwargs) -> None:
21962201

21972202
self.offloader = None
21982203
if self.config.gcs_bucket_name:
2204+
# GCSOffloader always creates a storage.Client eagerly
2205+
# (line 1329: storage_client or storage.Client(...)).
2206+
# Pass credentials so it uses the same auth as the other
2207+
# clients; omit when None to let it use ADC.
2208+
gcs_kwargs = {"project": self.project_id}
2209+
if self._credentials is not None:
2210+
gcs_kwargs["credentials"] = self._credentials
21992211
self.offloader = GCSOffloader(
22002212
self.project_id,
22012213
self.config.gcs_bucket_name,
22022214
self._executor,
2203-
storage_client=storage.Client(
2204-
project=self.project_id, credentials=self._credentials
2205-
),
2215+
storage_client=storage.Client(**gcs_kwargs),
22062216
)
22072217

22082218
self.parser = HybridContentParser(
@@ -2536,13 +2546,32 @@ def __getstate__(self):
25362546
state["_startup_error"] = None
25372547
state["_is_shutting_down"] = False
25382548
state["_init_pid"] = 0
2549+
# _credentials is always runtime-resolved; clear unconditionally.
2550+
state["_credentials"] = None
2551+
# Preserve _user_credentials if they are picklable (e.g.,
2552+
# service-account, AnonymousCredentials). Drop only when
2553+
# pickle would fail (e.g., compute_engine.Credentials holding
2554+
# a requests.Session).
2555+
import pickle as _pickle
2556+
2557+
try:
2558+
_pickle.dumps(state.get("_user_credentials"))
2559+
except Exception:
2560+
state["_user_credentials"] = None
25392561
return state
25402562

25412563
def __setstate__(self, state):
25422564
"""Custom unpickling to restore state."""
25432565
# Backfill keys that may be absent in pickled state from older
25442566
# code versions so _ensure_started does not raise AttributeError.
25452567
state.setdefault("_init_pid", 0)
2568+
state.setdefault("_user_credentials", None)
2569+
state.setdefault("_credentials", None)
2570+
# Restore _credentials from _user_credentials if available so
2571+
# _create_loop_state uses the user's identity. When both are
2572+
# None (non-picklable credentials were dropped), ADC is used.
2573+
if state["_credentials"] is None and state["_user_credentials"] is not None:
2574+
state["_credentials"] = state["_user_credentials"]
25462575
self.__dict__.update(state)
25472576

25482577
def _reset_runtime_state(self) -> None:
@@ -2597,6 +2626,11 @@ def _reset_runtime_state(self) -> None:
25972626
self._startup_error = None
25982627
self._is_shutting_down = False
25992628
self._init_pid = os.getpid()
2629+
# For ADC-resolved credentials, clear so they are re-resolved
2630+
# in the child process. For user-provided credentials, keep
2631+
# the original object — we cannot re-create it. The user is
2632+
# responsible for providing fork-safe credentials if needed.
2633+
self._credentials = self._user_credentials
26002634

26012635
async def __aenter__(self) -> BigQueryAgentAnalyticsPlugin:
26022636
await self._ensure_started()

tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2093,6 +2093,56 @@ async def test_pickle_safety(self, mock_auth_default, mock_bq_client):
20932093
finally:
20942094
await plugin.shutdown()
20952095

2096+
@pytest.mark.asyncio
2097+
async def test_pickle_preserves_picklable_credentials(
2098+
self, mock_auth_default, mock_bq_client
2099+
):
2100+
"""Picklable user credentials survive pickle/unpickle."""
2101+
import pickle
2102+
2103+
picklable_creds = FakeCredentials()
2104+
plugin = bigquery_agent_analytics_plugin.BigQueryAgentAnalyticsPlugin(
2105+
PROJECT_ID,
2106+
DATASET_ID,
2107+
table_id=TABLE_ID,
2108+
credentials=picklable_creds,
2109+
)
2110+
pickled = pickle.dumps(plugin)
2111+
unpickled = pickle.loads(pickled)
2112+
# User-provided picklable credentials are preserved.
2113+
assert unpickled._user_credentials is not None
2114+
assert unpickled._credentials is not None
2115+
await plugin.shutdown()
2116+
2117+
@pytest.mark.asyncio
2118+
async def test_pickle_drops_non_picklable_credentials(
2119+
self, mock_auth_default, mock_bq_client
2120+
):
2121+
"""Non-picklable user credentials are dropped gracefully."""
2122+
import pickle
2123+
2124+
class NonPicklableCreds(google.auth.credentials.Credentials):
2125+
2126+
def refresh(self, request):
2127+
pass
2128+
2129+
def __getstate__(self):
2130+
raise TypeError("cannot pickle")
2131+
2132+
plugin = bigquery_agent_analytics_plugin.BigQueryAgentAnalyticsPlugin(
2133+
PROJECT_ID,
2134+
DATASET_ID,
2135+
table_id=TABLE_ID,
2136+
credentials=NonPicklableCreds(),
2137+
)
2138+
# Should not raise — non-picklable credentials are dropped.
2139+
pickled = pickle.dumps(plugin)
2140+
unpickled = pickle.loads(pickled)
2141+
# Credentials fall back to None (ADC on next use).
2142+
assert unpickled._user_credentials is None
2143+
assert unpickled._credentials is None
2144+
await plugin.shutdown()
2145+
20962146
@pytest.mark.asyncio
20972147
async def test_span_hierarchy_llm_call(
20982148
self,

0 commit comments

Comments
 (0)