Skip to content

Commit 9d1bb4b

Browse files
haiyuan-eng-googlecopybara-github
authored andcommitted
fix: fix fork detection, correct offload limits, and add response logging in BigQuery plugin
This PR addresses three distinct issues in the BigQuery Agent Analytics Plugin: 1. Fix false-positive fork detection: When the plugin is deployed via Vertex AI Agent Engine, it undergoes a pickle/unpickle lifecycle which resets `_init_pid` to 0. Previously, `_ensure_started()` would incorrectly detect this as a fork since `os.getpid()` is never 0, causing unnecessary cold-start latency and log noise. The PID check now distinguishes `_init_pid == 0` (unpickled) from a real fork. 2. Correct GCS offload unit mismatch: Separates the evaluation limits for offloading text content to GCS. It evaluates the byte-based storage guard (`inline_text_limit`) and the character-based truncation limit (`max_length`) independently, preventing mismatched unit comparisons. 3. Add AGENT_RESPONSE logging: Logs final response events emitted by agents to BigQuery. This explicitly filters out intermediate steps such as function calls/responses, streaming partials, and invisible internal reasoning ("thoughts") so that only the final visible response text is captured. Co-authored-by: Haiyuan Cao <haiyuan@google.com> PiperOrigin-RevId: 910770002
1 parent fc27203 commit 9d1bb4b

2 files changed

Lines changed: 507 additions & 10 deletions

File tree

src/google/adk/plugins/bigquery_agent_analytics_plugin.py

Lines changed: 84 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1430,14 +1430,18 @@ async def _parse_content_object(
14301430

14311431
# CASE C: Text
14321432
elif hasattr(part, "text") and part.text:
1433-
text_len = len(part.text.encode("utf-8"))
1434-
# If max_length is set and smaller than inline limit, use it as threshold
1435-
# to prefer offloading over truncation.
1436-
offload_threshold = self.inline_text_limit
1437-
if self.max_length != -1 and self.max_length < offload_threshold:
1438-
offload_threshold = self.max_length
1439-
1440-
if self.offloader and text_len > offload_threshold:
1433+
char_len = len(part.text)
1434+
byte_len = len(part.text.encode("utf-8"))
1435+
1436+
# Decide whether to offload using each limit in its own
1437+
# unit. inline_text_limit is a byte-based storage guard;
1438+
# max_length is a character-based truncation limit.
1439+
exceeds_inline_byte_limit = byte_len > self.inline_text_limit
1440+
exceeds_char_limit = (
1441+
self.max_length != -1 and char_len > self.max_length
1442+
)
1443+
1444+
if self.offloader and (exceeds_inline_byte_limit or exceeds_char_limit):
14411445
# Text is too big, treat as file
14421446
path = f"{datetime.now().date()}/{self.trace_id}/{self.span_id}_p{idx}.txt"
14431447
try:
@@ -1906,6 +1910,18 @@ def _get_events_schema() -> list[bigquery.SchemaField]:
19061910
" '$.a2a_metadata.\"a2a:response\"') AS a2a_response"
19071911
),
19081912
],
1913+
"AGENT_RESPONSE": [
1914+
"JSON_VALUE(content, '$.response') AS response_text",
1915+
"JSON_VALUE(attributes, '$.source_event_id') AS source_event_id",
1916+
(
1917+
"JSON_VALUE(attributes,"
1918+
" '$.source_event_author') AS source_event_author"
1919+
),
1920+
(
1921+
"JSON_VALUE(attributes,"
1922+
" '$.source_event_branch') AS source_event_branch"
1923+
),
1924+
],
19091925
}
19101926

19111927
_VIEW_SQL_TEMPLATE = """\
@@ -2653,7 +2669,14 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
26532669

26542670
async def _ensure_started(self, **kwargs) -> None:
26552671
"""Ensures that the plugin is started and initialized."""
2656-
if os.getpid() != self._init_pid:
2672+
# _init_pid == 0 means the plugin was unpickled and has never been
2673+
# initialized in this process (the pickle sentinel set by
2674+
# __getstate__). Skip the fork reset in that case — no fork
2675+
# happened, and _started is already False so _lazy_setup will run.
2676+
# Real forks are caught by os.register_at_fork (line 108) and by
2677+
# this check when _init_pid is a real (non-zero) PID from a
2678+
# different process.
2679+
if self._init_pid != 0 and os.getpid() != self._init_pid:
26572680
self._reset_runtime_state()
26582681
if not self._started:
26592682
# Kept original lock name as it was not explicitly changed.
@@ -2665,6 +2688,10 @@ async def _ensure_started(self, **kwargs) -> None:
26652688
await self._lazy_setup(**kwargs)
26662689
self._started = True
26672690
self._startup_error = None
2691+
# Record the current PID so fork detection works for
2692+
# the rest of this instance's lifetime.
2693+
if self._init_pid == 0:
2694+
self._init_pid = os.getpid()
26682695
except Exception as e:
26692696
self._startup_error = e
26702697
logger.error("Failed to initialize BigQuery Plugin: %s", e)
@@ -2966,7 +2993,7 @@ async def on_event_callback(
29662993
invocation_context: InvocationContext,
29672994
event: "Event",
29682995
) -> None:
2969-
"""Logs state changes, HITL events, and A2A interactions.
2996+
"""Logs state changes, HITL events, A2A interactions, and agent responses.
29702997
29712998
- Checks each event for a non-empty state_delta and logs it as a
29722999
STATE_DELTA event.
@@ -2978,6 +3005,9 @@ async def on_event_callback(
29783005
and logs them as ``A2A_INTERACTION`` events so the remote
29793006
agent's response and cross-reference IDs (``a2a:task_id``,
29803007
``a2a:context_id``) are visible in BigQuery.
3008+
- Detects final response events emitted by agents and logs
3009+
them as ``AGENT_RESPONSE`` so the visible response text
3010+
(after all callback modifications) is captured in BigQuery.
29813011
29823012
The HITL detection must happen here (not in tool callbacks) because
29833013
``adk_request_credential``, ``adk_request_confirmation``, and
@@ -3080,6 +3110,50 @@ async def on_event_callback(
30803110
),
30813111
)
30823112

3113+
# --- Final agent response logging ---
3114+
# Captures final response events emitted by agents (after all
3115+
# after_model_callback modifications). Uses a strict guard to
3116+
# avoid false positives from skip_summarization function
3117+
# responses, long-running tool pause events, and thought-only
3118+
# events (which ADK treats as invisible internal reasoning).
3119+
is_agent_response = (
3120+
event.content
3121+
and event.content.parts
3122+
and event.is_final_response()
3123+
and event.partial is not True
3124+
and not event.get_function_calls()
3125+
and not event.get_function_responses()
3126+
and not event.long_running_tool_ids
3127+
)
3128+
if is_agent_response:
3129+
# Filter to visible text parts only. Exclude thoughts
3130+
# (internal reasoning, A2A working/submitted updates),
3131+
# empty parts, and non-text parts (executable_code, etc.)
3132+
# that would render as "other" in _format_content.
3133+
visible_parts = [
3134+
p
3135+
for p in event.content.parts
3136+
if p.text and not getattr(p, "thought", None)
3137+
]
3138+
if visible_parts:
3139+
visible_content = types.Content(
3140+
role=event.content.role, parts=visible_parts
3141+
)
3142+
formatted, truncated = self._format_content_safely(visible_content)
3143+
await self._log_event(
3144+
"AGENT_RESPONSE",
3145+
callback_ctx,
3146+
raw_content={"response": formatted},
3147+
is_truncated=truncated,
3148+
event_data=EventData(
3149+
extra_attributes={
3150+
"source_event_id": event.id,
3151+
"source_event_author": event.author,
3152+
"source_event_branch": event.branch,
3153+
},
3154+
),
3155+
)
3156+
30833157
return None
30843158

30853159
async def on_state_change_callback(

0 commit comments

Comments
 (0)