diff --git a/tests/unit/vertexai/genai/replays/test_generate_loss_clusters.py b/tests/unit/vertexai/genai/replays/test_generate_loss_clusters.py new file mode 100644 index 0000000000..f585feb9d5 --- /dev/null +++ b/tests/unit/vertexai/genai/replays/test_generate_loss_clusters.py @@ -0,0 +1,76 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pylint: disable=protected-access,bad-continuation,missing-function-docstring + +from tests.unit.vertexai.genai.replays import pytest_helper +from vertexai import types +import pytest + + +def test_gen_loss_clusters(client): + """Tests that generate_loss_clusters() returns GenerateLossClustersResponse.""" + eval_result = types.EvaluationResult() + response = client.evals.generate_loss_clusters( + eval_result=eval_result, + config=types.LossAnalysisConfig( + metric="multi_turn_task_success_v1", + candidate="travel-agent", + ), + ) + assert isinstance(response, types.GenerateLossClustersResponse) + assert len(response.results) == 1 + result = response.results[0] + assert result.config.metric == "multi_turn_task_success_v1" + assert result.config.candidate == "travel-agent" + assert len(result.clusters) == 2 + assert result.clusters[0].cluster_id == "cluster-1" + assert result.clusters[0].taxonomy_entry.l1_category == "Tool Calling" + assert ( + result.clusters[0].taxonomy_entry.l2_category == "Missing Tool Invocation" + ) + assert result.clusters[0].item_count == 3 + assert result.clusters[1].cluster_id == "cluster-2" + assert result.clusters[1].taxonomy_entry.l1_category == "Hallucination" + assert result.clusters[1].item_count == 2 + + +pytest_plugins = ("pytest_asyncio",) + + +@pytest.mark.asyncio +async def test_gen_loss_clusters_async(client): + """Tests that generate_loss_clusters() async returns GenerateLossClustersResponse.""" + eval_result = types.EvaluationResult() + response = await client.aio.evals.generate_loss_clusters( + eval_result=eval_result, + config=types.LossAnalysisConfig( + metric="multi_turn_task_success_v1", + candidate="travel-agent", + ), + ) + assert isinstance(response, types.GenerateLossClustersResponse) + assert len(response.results) == 1 + result = response.results[0] + assert result.config.metric == "multi_turn_task_success_v1" + assert len(result.clusters) == 2 + assert result.clusters[0].cluster_id == "cluster-1" + assert result.clusters[1].cluster_id == "cluster-2" + + +pytestmark = pytest_helper.setup( + file=__file__, + globals_for_file=globals(), + test_method="evals.generate_loss_clusters", +) diff --git a/tests/unit/vertexai/genai/test_evals.py b/tests/unit/vertexai/genai/test_evals.py index 573f7f04d4..281818b116 100644 --- a/tests/unit/vertexai/genai/test_evals.py +++ b/tests/unit/vertexai/genai/test_evals.py @@ -29,6 +29,7 @@ from google.cloud.aiplatform import initializer as aiplatform_initializer from vertexai import _genai from vertexai._genai import _evals_data_converters +from vertexai._genai import _evals_utils from vertexai._genai import _evals_metric_handlers from vertexai._genai import _evals_visualization from vertexai._genai import _evals_metric_loaders @@ -265,6 +266,351 @@ def test_t_inline_results(self): assert payload[0]["candidate_results"][0]["score"] == 0.0 +class TestLossAnalysis: + """Unit tests for loss analysis types and visualization.""" + + def test_response_structure(self): + response = common_types.GenerateLossClustersResponse( + analysis_time="2026-04-01T10:00:00Z", + results=[ + common_types.LossAnalysisResult( + config=common_types.LossAnalysisConfig( + metric="multi_turn_task_success_v1", + candidate="travel-agent", + ), + analysis_time="2026-04-01T10:00:00Z", + clusters=[ + common_types.LossCluster( + cluster_id="cluster-1", + taxonomy_entry=common_types.LossTaxonomyEntry( + l1_category="Tool Calling", + l2_category="Missing Tool Invocation", + description="The agent failed to invoke a required tool.", + ), + item_count=3, + ), + common_types.LossCluster( + cluster_id="cluster-2", + taxonomy_entry=common_types.LossTaxonomyEntry( + l1_category="Hallucination", + l2_category="Hallucination of Action", + description="Verbally confirmed action without tool.", + ), + item_count=2, + ), + ], + ) + ], + ) + assert len(response.results) == 1 + assert response.analysis_time == "2026-04-01T10:00:00Z" + result = response.results[0] + assert result.config.metric == "multi_turn_task_success_v1" + assert len(result.clusters) == 2 + assert result.clusters[0].cluster_id == "cluster-1" + assert result.clusters[0].item_count == 3 + assert result.clusters[1].cluster_id == "cluster-2" + + def test_get_loss_analysis_html(self): + """Tests that _get_loss_analysis_html generates valid HTML with data.""" + from vertexai._genai import _evals_visualization + import json + + data = { + "results": [ + { + "config": { + "metric": "test_metric", + "candidate": "test-candidate", + }, + "clusters": [ + { + "cluster_id": "c1", + "taxonomy_entry": { + "l1_category": "Tool Calling", + "l2_category": "Missing Invocation", + "description": "Agent failed to call the tool.", + }, + "item_count": 5, + "examples": [ + { + "evaluation_result": { + "request": { + "prompt": { + "agent_data": { + "turns": [{ + "turn_index": 0, + "events": [{ + "author": "user", + "content": { + "parts": [{"text": "Find flights to Paris"}], + }, + }], + }], + }, + }, + }, + }, + "failed_rubrics": [ + { + "rubric_id": "tool_use", + "classification_rationale": "Did not invoke find_flights.", + } + ], + } + ], + }, + ], + } + ] + } + html = _evals_visualization._get_loss_analysis_html(json.dumps(data)) + assert "Loss Pattern Analysis" in html + assert "test_metric" not in html # data is Base64-encoded in the HTML + assert "" in html + assert "extractScenarioPreview" in html + assert "example-scenario" in html + + def test_display_loss_clusters_response_no_ipython(self): + """Tests graceful fallback when not in IPython.""" + from vertexai._genai import _evals_visualization + from unittest import mock + + response = common_types.GenerateLossClustersResponse( + results=[ + common_types.LossAnalysisResult( + config=common_types.LossAnalysisConfig( + metric="test_metric", + candidate="test-candidate", + ), + clusters=[ + common_types.LossCluster( + cluster_id="c1", + taxonomy_entry=common_types.LossTaxonomyEntry( + l1_category="Cat1", + l2_category="SubCat1", + ), + item_count=5, + ), + ], + ) + ], + ) + with mock.patch.object( + _evals_visualization, "_is_ipython_env", return_value=False + ): + # Should not raise, just log a warning + response.show() + + def test_display_loss_analysis_result_no_ipython(self): + """Tests graceful fallback for individual result when not in IPython.""" + from vertexai._genai import _evals_visualization + from unittest import mock + + result = common_types.LossAnalysisResult( + config=common_types.LossAnalysisConfig( + metric="test_metric", + candidate="test-candidate", + ), + clusters=[ + common_types.LossCluster( + cluster_id="c1", + taxonomy_entry=common_types.LossTaxonomyEntry( + l1_category="DirectCat", + l2_category="DirectSubCat", + ), + item_count=7, + ), + ], + ) + with mock.patch.object( + _evals_visualization, "_is_ipython_env", return_value=False + ): + result.show() + + +def _make_eval_result( + metrics=None, + candidate_names=None, +): + """Helper to create an EvaluationResult with the given metrics and candidates.""" + metrics = metrics or ["task_success_v1"] + candidate_names = candidate_names or ["agent-1"] + + metric_results = {} + for m in metrics: + metric_results[m] = common_types.EvalCaseMetricResult(metric_name=m) + + eval_case_results = [ + common_types.EvalCaseResult( + eval_case_index=0, + response_candidate_results=[ + common_types.ResponseCandidateResult( + response_index=0, + metric_results=metric_results, + ) + ], + ) + ] + metadata = common_types.EvaluationRunMetadata( + candidate_names=candidate_names, + ) + return common_types.EvaluationResult( + eval_case_results=eval_case_results, + metadata=metadata, + ) + + +class TestResolveMetricName: + """Unit tests for _resolve_metric_name.""" + + def test_none_returns_none(self): + assert _evals_utils._resolve_metric_name(None) is None + + def test_string_passes_through(self): + assert _evals_utils._resolve_metric_name("task_success_v1") == "task_success_v1" + + def test_metric_object_extracts_name(self): + metric = common_types.Metric(name="multi_turn_task_success_v1") + assert ( + _evals_utils._resolve_metric_name(metric) + == "multi_turn_task_success_v1" + ) + + def test_object_with_name_attr(self): + """Tests that any object with a .name attribute works (e.g., LazyLoadedPrebuiltMetric).""" + + class FakeMetric: + name = "tool_use_quality_v1" + + assert _evals_utils._resolve_metric_name(FakeMetric()) == "tool_use_quality_v1" + + def test_lazy_loaded_prebuilt_metric_resolves_versioned_name(self): + """Tests that LazyLoadedPrebuiltMetric resolves to the versioned API spec name.""" + + class FakeLazyMetric: + name = "MULTI_TURN_TASK_SUCCESS" + + def _get_api_metric_spec_name(self): + return "multi_turn_task_success_v1" + + assert ( + _evals_utils._resolve_metric_name(FakeLazyMetric()) + == "multi_turn_task_success_v1" + ) + + def test_lazy_loaded_prebuilt_metric_falls_back_to_name(self): + """Tests fallback to .name when _get_api_metric_spec_name returns None.""" + + class FakeLazyMetricNoSpec: + name = "CUSTOM_METRIC" + + def _get_api_metric_spec_name(self): + return None + + assert ( + _evals_utils._resolve_metric_name(FakeLazyMetricNoSpec()) + == "CUSTOM_METRIC" + ) + + +class TestResolveLossAnalysisConfig: + """Unit tests for _resolve_loss_analysis_config.""" + + def test_auto_infer_single_metric_and_candidate(self): + eval_result = _make_eval_result( + metrics=["task_success_v1"], candidate_names=["agent-1"] + ) + resolved = _evals_utils._resolve_loss_analysis_config( + eval_result=eval_result + ) + assert resolved.metric == "task_success_v1" + assert resolved.candidate == "agent-1" + + def test_explicit_metric_and_candidate(self): + eval_result = _make_eval_result( + metrics=["m1", "m2"], candidate_names=["c1", "c2"] + ) + resolved = _evals_utils._resolve_loss_analysis_config( + eval_result=eval_result, metric="m1", candidate="c2" + ) + assert resolved.metric == "m1" + assert resolved.candidate == "c2" + + def test_config_provides_metric_and_candidate(self): + eval_result = _make_eval_result( + metrics=["m1"], candidate_names=["c1"] + ) + config = common_types.LossAnalysisConfig( + metric="m1", candidate="c1", predefined_taxonomy="my_taxonomy" + ) + resolved = _evals_utils._resolve_loss_analysis_config( + eval_result=eval_result, config=config + ) + assert resolved.metric == "m1" + assert resolved.candidate == "c1" + assert resolved.predefined_taxonomy == "my_taxonomy" + + def test_explicit_args_override_config(self): + eval_result = _make_eval_result( + metrics=["m1", "m2"], candidate_names=["c1", "c2"] + ) + config = common_types.LossAnalysisConfig(metric="m1", candidate="c1") + resolved = _evals_utils._resolve_loss_analysis_config( + eval_result=eval_result, config=config, metric="m2", candidate="c2" + ) + assert resolved.metric == "m2" + assert resolved.candidate == "c2" + + def test_error_multiple_metrics_no_explicit(self): + eval_result = _make_eval_result( + metrics=["m1", "m2"], candidate_names=["c1"] + ) + with pytest.raises(ValueError, match="multiple metrics"): + _evals_utils._resolve_loss_analysis_config(eval_result=eval_result) + + def test_error_multiple_candidates_no_explicit(self): + eval_result = _make_eval_result( + metrics=["m1"], candidate_names=["c1", "c2"] + ) + with pytest.raises(ValueError, match="multiple candidates"): + _evals_utils._resolve_loss_analysis_config(eval_result=eval_result) + + def test_error_invalid_metric(self): + eval_result = _make_eval_result( + metrics=["m1"], candidate_names=["c1"] + ) + with pytest.raises(ValueError, match="not found in eval_result"): + _evals_utils._resolve_loss_analysis_config( + eval_result=eval_result, metric="nonexistent" + ) + + def test_error_invalid_candidate(self): + eval_result = _make_eval_result( + metrics=["m1"], candidate_names=["c1"] + ) + with pytest.raises(ValueError, match="not found in eval_result"): + _evals_utils._resolve_loss_analysis_config( + eval_result=eval_result, candidate="nonexistent" + ) + + def test_no_candidates_defaults_to_candidate_1(self): + eval_result = _make_eval_result(metrics=["m1"], candidate_names=[]) + eval_result = eval_result.model_copy( + update={"metadata": common_types.EvaluationRunMetadata()} + ) + resolved = _evals_utils._resolve_loss_analysis_config( + eval_result=eval_result + ) + assert resolved.metric == "m1" + assert resolved.candidate == "candidate_1" + + def test_no_eval_case_results_raises(self): + eval_result = common_types.EvaluationResult() + with pytest.raises(ValueError, match="no metric results"): + _evals_utils._resolve_loss_analysis_config(eval_result=eval_result) + + class TestEvals: """Unit tests for the GenAI client.""" diff --git a/vertexai/_genai/_evals_utils.py b/vertexai/_genai/_evals_utils.py index 9d4dd4fc71..184fd2679b 100644 --- a/vertexai/_genai/_evals_utils.py +++ b/vertexai/_genai/_evals_utils.py @@ -15,9 +15,11 @@ """Utility functions for evals.""" import abc +import asyncio +import json import logging import os -import json +import time from typing import Any, Optional, Union from google.genai._api_client import BaseApiClient @@ -366,6 +368,227 @@ def _postprocess_user_scenarios_response( ) + + + + +def _resolve_metric_name( + metric: Optional[Any], +) -> Optional[str]: + """Extracts a metric name string from a metric argument. + + Accepts a string, a Metric object, or a LazyLoadedPrebuiltMetric + (RubricMetric) and returns the metric name as a string. + + For LazyLoadedPrebuiltMetric (e.g., RubricMetric.MULTI_TURN_TASK_SUCCESS), + this resolves to the API metric spec name (e.g., + "multi_turn_task_success_v1") so it matches the keys in eval results. + + Args: + metric: A metric name string, Metric object, RubricMetric enum value, + or None. + + Returns: + The metric name as a string, or None if metric is None. + """ + if metric is None: + return None + if isinstance(metric, str): + return metric + # LazyLoadedPrebuiltMetric: resolve to versioned API spec name. + if hasattr(metric, "_get_api_metric_spec_name"): + spec_name = metric._get_api_metric_spec_name() + if spec_name: + return spec_name + # Metric objects and other types with a .name attribute. + if hasattr(metric, "name"): + return metric.name + return str(metric) + + +def _resolve_loss_analysis_config( + eval_result: types.EvaluationResult, + config: Optional[types.LossAnalysisConfig] = None, + metric: Optional[str] = None, + candidate: Optional[str] = None, +) -> types.LossAnalysisConfig: + """Resolves and validates the LossAnalysisConfig for generate_loss_clusters. + + Auto-infers `metric` and `candidate` from the EvaluationResult when not + explicitly provided. Validates that provided values exist in the eval result. + + Args: + eval_result: The EvaluationResult from client.evals.evaluate(). + config: Optional explicit LossAnalysisConfig. If provided, metric and + candidate from config take precedence over the separate arguments. + metric: Optional metric name override. + candidate: Optional candidate name override. + + Returns: + A resolved LossAnalysisConfig with metric and candidate populated. + + Raises: + ValueError: If metric/candidate cannot be inferred or are invalid. + """ + # Start from config if provided, otherwise create a new one. + if config is not None: + resolved_metric = metric or config.metric + resolved_candidate = candidate or config.candidate + resolved_config = config.model_copy( + update={"metric": resolved_metric, "candidate": resolved_candidate} + ) + else: + resolved_config = types.LossAnalysisConfig( + metric=metric, candidate=candidate + ) + + # Collect available metric names from the eval result. + available_metrics: set[str] = set() + if eval_result.eval_case_results: + for case_result in eval_result.eval_case_results: + for resp_cand in case_result.response_candidate_results or []: + for m_name in (resp_cand.metric_results or {}).keys(): + available_metrics.add(m_name) + + # Collect available candidate names from metadata. + available_candidates: list[str] = [] + if eval_result.metadata and eval_result.metadata.candidate_names: + available_candidates = list(eval_result.metadata.candidate_names) + + # Auto-infer metric if not provided. + if not resolved_config.metric: + if len(available_metrics) == 1: + resolved_config = resolved_config.model_copy( + update={"metric": next(iter(available_metrics))} + ) + elif len(available_metrics) == 0: + raise ValueError( + "Cannot infer metric: no metric results found in eval_result." + " Please provide metric explicitly via" + " config=types.LossAnalysisConfig(metric='...')." + ) + else: + raise ValueError( + "Cannot infer metric: multiple metrics found in eval_result:" + f" {sorted(available_metrics)}. Please provide metric" + " explicitly via config=types.LossAnalysisConfig(metric='...')." + ) + + # Validate metric if provided explicitly. + if available_metrics and resolved_config.metric not in available_metrics: + raise ValueError( + f"Metric '{resolved_config.metric}' not found in eval_result." + f" Available metrics: {sorted(available_metrics)}." + ) + + # Auto-infer candidate if not provided. + if not resolved_config.candidate: + if len(available_candidates) == 1: + resolved_config = resolved_config.model_copy( + update={"candidate": available_candidates[0]} + ) + elif len(available_candidates) == 0: + # Fallback: use default candidate naming convention from SDK. + resolved_config = resolved_config.model_copy( + update={"candidate": "candidate_1"} + ) + logger.warning( + "No candidate names found in eval_result.metadata." + " Defaulting to 'candidate_1'. If this is incorrect, provide" + " candidate explicitly via" + " config=types.LossAnalysisConfig(candidate='...')." + ) + else: + raise ValueError( + "Cannot infer candidate: multiple candidates found in" + f" eval_result: {available_candidates}. Please provide" + " candidate explicitly via" + " config=types.LossAnalysisConfig(candidate='...')." + ) + + # Validate candidate if provided explicitly and candidates are known. + if ( + available_candidates + and resolved_config.candidate not in available_candidates + ): + raise ValueError( + f"Candidate '{resolved_config.candidate}' not found in" + f" eval_result. Available candidates: {available_candidates}." + ) + + return resolved_config + + +def _poll_operation( + api_client: BaseApiClient, + operation: types.GenerateLossClustersOperation, + poll_interval_seconds: float = 5.0, +) -> types.GenerateLossClustersOperation: + """Polls a long-running operation until completion. + + Args: + api_client: The API client to use for polling. + operation: The initial operation returned from the API call. + poll_interval_seconds: Time between polls. + + Returns: + The completed operation. + """ + if operation.done: + return operation + start_time = time.time() + while True: + response = api_client.request("get", operation.name, {}, None) + response_dict = {} if not response.body else json.loads(response.body) + polled = types.GenerateLossClustersOperation._from_response( + response=response_dict, kwargs={} + ) + if polled.done: + return polled + elapsed = int(time.time() - start_time) + logger.info( + "Loss analysis operation still running... Elapsed time: %d seconds", + elapsed, + ) + time.sleep(poll_interval_seconds) + + +async def _poll_operation_async( + api_client: BaseApiClient, + operation: types.GenerateLossClustersOperation, + poll_interval_seconds: float = 5.0, +) -> types.GenerateLossClustersOperation: + """Polls a long-running operation until completion (async). + + Args: + api_client: The API client to use for polling. + operation: The initial operation returned from the API call. + poll_interval_seconds: Time between polls. + + Returns: + The completed operation. + """ + if operation.done: + return operation + start_time = time.time() + while True: + response = await api_client.async_request( + "get", operation.name, {}, None + ) + response_dict = {} if not response.body else json.loads(response.body) + polled = types.GenerateLossClustersOperation._from_response( + response=response_dict, kwargs={} + ) + if polled.done: + return polled + elapsed = int(time.time() - start_time) + logger.info( + "Loss analysis operation still running... Elapsed time: %d seconds", + elapsed, + ) + await asyncio.sleep(poll_interval_seconds) + + def _validate_dataset_agent_data( dataset: types.EvaluationDataset, inference_configs: Optional[dict[str, Any]] = None, diff --git a/vertexai/_genai/_evals_visualization.py b/vertexai/_genai/_evals_visualization.py index d9319f7406..6c04a5880f 100644 --- a/vertexai/_genai/_evals_visualization.py +++ b/vertexai/_genai/_evals_visualization.py @@ -1491,6 +1491,290 @@ def display_evaluation_dataset(eval_dataset_obj: types.EvaluationDataset) -> Non display.display(display.HTML(html_content)) +def _get_loss_analysis_html(loss_analysis_json: str) -> str: + """Returns self-contained HTML for loss pattern analysis visualization.""" + payload_b64 = _encode_to_base64(loss_analysis_json) + return textwrap.dedent( + f""" + + + + + Loss Pattern Analysis + + + +
+
+
+ + + +""" + ) + + +def display_loss_clusters_response( + response_obj: "types.GenerateLossClustersResponse", +) -> None: + """Displays a GenerateLossClustersResponse in an IPython environment.""" + if not _is_ipython_env(): + logger.warning("Skipping display: not in an IPython environment.") + return + else: + from IPython import display + + try: + result_dump = response_obj.model_dump( + mode="json", exclude_none=True + ) + except Exception as e: + logger.error( + "Failed to serialize GenerateLossClustersResponse: %s", + e, + exc_info=True, + ) + raise + + html_content = _get_loss_analysis_html( + json.dumps(result_dump, ensure_ascii=False, default=_pydantic_serializer) + ) + display.display(display.HTML(html_content)) + + +def display_loss_analysis_result( + result_obj: "types.LossAnalysisResult", +) -> None: + """Displays a single LossAnalysisResult in an IPython environment.""" + if not _is_ipython_env(): + logger.warning("Skipping display: not in an IPython environment.") + return + else: + from IPython import display + + try: + # Wrap in a response-like structure for the shared HTML generator + wrapped = {"results": [ + result_obj.model_dump(mode="json", exclude_none=True) + ]} + except Exception as e: + logger.error( + "Failed to serialize LossAnalysisResult: %s", + e, + exc_info=True, + ) + raise + + html_content = _get_loss_analysis_html( + json.dumps(wrapped, ensure_ascii=False, default=_pydantic_serializer) + ) + display.display(display.HTML(html_content)) + + def _get_status_html(status: str, error_message: Optional[str] = None) -> str: """Returns a simple HTML string for displaying a status and optional error.""" error_html = "" diff --git a/vertexai/_genai/_transformers.py b/vertexai/_genai/_transformers.py index 34c471ef85..5a386ca700 100644 --- a/vertexai/_genai/_transformers.py +++ b/vertexai/_genai/_transformers.py @@ -14,6 +14,7 @@ # """Transformers module for Vertex addons.""" +import json import re from typing import Any @@ -260,6 +261,118 @@ def t_metric_for_registry( return metric_payload_item +_ALLOWED_PART_FIELDS = frozenset({ + "text", "inline_data", "file_data", "function_call", "function_response", + "video_metadata", "thought", "code_execution_result", "executable_code", +}) + + +def _sanitize_agent_data(agent_data: dict[str, Any]) -> dict[str, Any]: + """Strips SDK-only fields from agent_data so the API accepts the payload. + + The SDK's AgentData model may contain fields like 'tool_call', + 'tool_response', 'part_metadata', and 'will_continue' that don't exist + in the API's AgentData / Content proto. This function recursively removes + them from content parts. + """ + if not isinstance(agent_data, dict): + return agent_data + + sanitized = {} + for key, value in agent_data.items(): + if key == "turns" and isinstance(value, list): + sanitized["turns"] = [ + _sanitize_turn(t) for t in value if isinstance(t, dict) + ] + elif key == "agents" and isinstance(value, dict): + sanitized["agents"] = { + k: _sanitize_agent_config(v) if isinstance(v, dict) else v + for k, v in value.items() + } + else: + sanitized[key] = value + return sanitized + + +def _sanitize_agent_config(config: dict[str, Any]) -> dict[str, Any]: + """Sanitizes an AgentConfig dict, keeping only API-known fields.""" + allowed = {"agent_id", "agent_type", "description", "instruction", "tools", "sub_agents"} + return {k: v for k, v in config.items() if k in allowed} + + +def _sanitize_turn(turn: dict[str, Any]) -> dict[str, Any]: + """Sanitizes a ConversationTurn dict.""" + sanitized = {} + for key, value in turn.items(): + if key == "events" and isinstance(value, list): + sanitized["events"] = [ + _sanitize_event(e) for e in value if isinstance(e, dict) + ] + else: + sanitized[key] = value + return sanitized + + +def _sanitize_event(event: dict[str, Any]) -> dict[str, Any]: + """Sanitizes an AgentEvent dict.""" + sanitized = {} + for key, value in event.items(): + if key == "content" and isinstance(value, dict): + sanitized["content"] = _sanitize_content(value) + elif key in ("author", "event_time", "state_delta", "active_tools"): + sanitized[key] = value + # Skip unknown event-level fields. + return sanitized + + +def _sanitize_content(content: dict[str, Any]) -> dict[str, Any]: + """Sanitizes a Content dict, stripping unknown fields from parts.""" + sanitized = {} + for key, value in content.items(): + if key == "parts" and isinstance(value, list): + sanitized["parts"] = [ + _sanitize_part(p) for p in value if isinstance(p, dict) + ] + elif key == "role": + sanitized["role"] = value + return sanitized + + +def _sanitize_part(part: dict[str, Any]) -> dict[str, Any]: + """Keeps only API-recognized fields in a Part dict.""" + sanitized = {} + for key, value in part.items(): + if key in _ALLOWED_PART_FIELDS: + if key == "function_response" and isinstance(value, dict): + # Strip unknown sub-fields like 'will_continue'. + sanitized[key] = { + k: v for k, v in value.items() + if k in ("name", "id", "response") + } + else: + sanitized[key] = value + return sanitized + + +def _extract_agent_data_from_df( + eval_dataset: Any, + case_idx: int, +) -> Any: + """Extracts agent_data from a DataFrame-based EvaluationDataset by row index.""" + if not eval_dataset: + return None + ds = eval_dataset[0] if isinstance(eval_dataset, list) else eval_dataset + df = getv(ds, ["eval_dataset_df"]) + if df is None or not hasattr(df, "iloc"): + return None + if case_idx < 0 or case_idx >= len(df): + return None + row = df.iloc[case_idx] + if "agent_data" not in row or row["agent_data"] is None: + return None + return row["agent_data"] + + def t_inline_results( eval_results: list[Any], ) -> list[dict[str, Any]]: @@ -292,7 +405,13 @@ def t_inline_results( if agent_data: if hasattr(agent_data, "model_dump"): - prompt_payload["agent_data"] = agent_data.model_dump() + prompt_payload["agent_data"] = _sanitize_agent_data( + agent_data.model_dump() + ) + elif isinstance(agent_data, dict): + prompt_payload["agent_data"] = _sanitize_agent_data( + agent_data + ) else: prompt_payload["agent_data"] = agent_data elif prompt: @@ -302,6 +421,29 @@ def t_inline_results( if text: prompt_payload["text"] = str(text) + # Fallback: extract agent_data from the DataFrame when eval_cases + # are not available (e.g., run_inference -> evaluate flow). + if not prompt_payload: + df_agent_data = _extract_agent_data_from_df( + eval_dataset, case_idx + ) + if df_agent_data is not None: + if hasattr(df_agent_data, "model_dump"): + prompt_payload["agent_data"] = _sanitize_agent_data( + df_agent_data.model_dump() + ) + elif isinstance(df_agent_data, str): + try: + prompt_payload["agent_data"] = _sanitize_agent_data( + json.loads(df_agent_data) + ) + except (json.JSONDecodeError, ValueError): + pass + elif isinstance(df_agent_data, dict): + prompt_payload["agent_data"] = _sanitize_agent_data( + df_agent_data + ) + cand_results = getv(case_result, ["response_candidate_results"]) or [] for resp_cand_result in cand_results: resp_idx = getv(resp_cand_result, ["response_index"]) or 0 diff --git a/vertexai/_genai/evals.py b/vertexai/_genai/evals.py index 0adf80cd6e..3e7276b338 100644 --- a/vertexai/_genai/evals.py +++ b/vertexai/_genai/evals.py @@ -2454,6 +2454,80 @@ def generate_conversation_scenarios( ) return _evals_utils._postprocess_user_scenarios_response(response) + @_common.experimental_warning( + "The Vertex SDK GenAI evals.generate_loss_clusters module is experimental, " + "and may change in future versions." + ) + def generate_loss_clusters( + self, + *, + eval_result: types.EvaluationResult, + metric: Optional[Union[str, types.MetricOrDict]] = None, + candidate: Optional[str] = None, + config: Optional[types.LossAnalysisConfigOrDict] = None, + ) -> types.GenerateLossClustersResponse: + """Generates loss clusters from evaluation results. + + Analyzes "Pass/Fail" signals from rubric-based autoraters and groups + them into semantic "Loss Patterns" (e.g., "Hallucination of Action"). + + This method calls the GenerateLossClusters LRO and polls until + completion, returning the results directly. + + If ``metric`` or ``candidate`` are not provided, they will be + auto-inferred from ``eval_result`` when unambiguous (i.e., when the + eval result contains exactly one metric or one candidate). For + multi-metric or multi-candidate evaluations, provide them explicitly. + + Available candidate names can be found in + ``eval_result.metadata.candidate_names``. + + Note: This API is only available in the ``global`` region. + + Args: + eval_result: The EvaluationResult object returned from + client.evals.evaluate(). + metric: The metric to analyze. Can be a metric name string + (e.g., "multi_turn_task_success_v1"), a Metric object, or a + RubricMetric enum (e.g., types.RubricMetric.MULTI_TURN_TASK_SUCCESS). + If not provided and config does not specify it, auto-inferred + from eval_result. + candidate: The candidate to analyze. If not provided and config + does not specify it, auto-inferred from eval_result. + config: Optional LossAnalysisConfig with additional options + (predefined_taxonomy, max_top_cluster_count). Can also + specify metric/candidate, but explicit arguments take + precedence. + + Returns: + A GenerateLossClustersResponse containing the analysis results. + Call .show() to visualize, or access .results for individual + LossAnalysisResult objects (each with their own .show()). + """ + metric_name = _evals_utils._resolve_metric_name(metric) + parsed_config = ( + types.LossAnalysisConfig.model_validate(config) + if isinstance(config, dict) + else config + ) + resolved_config = _evals_utils._resolve_loss_analysis_config( + eval_result=eval_result, + config=parsed_config, + metric=metric_name, + candidate=candidate, + ) + operation = self._generate_loss_clusters( + inline_results=[eval_result], + configs=[resolved_config], + ) + completed = _evals_utils._poll_operation( + api_client=self._api_client, + operation=operation, + ) + if completed.error: + raise RuntimeError(f"Loss analysis operation failed: {completed.error}") + return completed.response + @_common.experimental_warning( "The Vertex SDK GenAI evals.create_evaluation_metric method is experimental, " "and may change in future versions." @@ -3731,6 +3805,80 @@ async def generate_conversation_scenarios( ) return _evals_utils._postprocess_user_scenarios_response(response) + @_common.experimental_warning( + "The Vertex SDK GenAI evals.generate_loss_clusters module is experimental, " + "and may change in future versions." + ) + async def generate_loss_clusters( + self, + *, + eval_result: types.EvaluationResult, + metric: Optional[Union[str, types.MetricOrDict]] = None, + candidate: Optional[str] = None, + config: Optional[types.LossAnalysisConfigOrDict] = None, + ) -> types.GenerateLossClustersResponse: + """Generates loss clusters from evaluation results. + + Analyzes "Pass/Fail" signals from rubric-based autoraters and groups + them into semantic "Loss Patterns" (e.g., "Hallucination of Action"). + + This method calls the GenerateLossClusters LRO and polls until + completion, returning the results directly. + + If ``metric`` or ``candidate`` are not provided, they will be + auto-inferred from ``eval_result`` when unambiguous (i.e., when the + eval result contains exactly one metric or one candidate). For + multi-metric or multi-candidate evaluations, provide them explicitly. + + Available candidate names can be found in + ``eval_result.metadata.candidate_names``. + + Note: This API is only available in the ``global`` region. + + Args: + eval_result: The EvaluationResult object returned from + client.evals.evaluate(). + metric: The metric to analyze. Can be a metric name string + (e.g., "multi_turn_task_success_v1"), a Metric object, or a + RubricMetric enum (e.g., types.RubricMetric.MULTI_TURN_TASK_SUCCESS). + If not provided and config does not specify it, auto-inferred + from eval_result. + candidate: The candidate to analyze. If not provided and config + does not specify it, auto-inferred from eval_result. + config: Optional LossAnalysisConfig with additional options + (predefined_taxonomy, max_top_cluster_count). Can also + specify metric/candidate, but explicit arguments take + precedence. + + Returns: + A GenerateLossClustersResponse containing the analysis results. + Call .show() to visualize, or access .results for individual + LossAnalysisResult objects (each with their own .show()). + """ + metric_name = _evals_utils._resolve_metric_name(metric) + parsed_config = ( + types.LossAnalysisConfig.model_validate(config) + if isinstance(config, dict) + else config + ) + resolved_config = _evals_utils._resolve_loss_analysis_config( + eval_result=eval_result, + config=parsed_config, + metric=metric_name, + candidate=candidate, + ) + operation = await self._generate_loss_clusters( + inline_results=[eval_result], + configs=[resolved_config], + ) + completed = await _evals_utils._poll_operation_async( + api_client=self._api_client, + operation=operation, + ) + if completed.error: + raise RuntimeError(f"Loss analysis operation failed: {completed.error}") + return completed.response + @_common.experimental_warning( "The Vertex SDK GenAI evals.create_evaluation_metric module is experimental, " "and may change in future versions." diff --git a/vertexai/_genai/types/common.py b/vertexai/_genai/types/common.py index 2d9f4b50ce..b92577bb9b 100644 --- a/vertexai/_genai/types/common.py +++ b/vertexai/_genai/types/common.py @@ -4920,6 +4920,12 @@ class LossAnalysisResult(_common.BaseModel): default=None, description="""The list of identified loss clusters.""" ) + def show(self) -> None: + """Shows the loss analysis result with rich HTML visualization.""" + from .. import _evals_visualization + + _evals_visualization.display_loss_analysis_result(self) + class LossAnalysisResultDict(TypedDict, total=False): """The top-level result for loss analysis.""" @@ -4948,6 +4954,12 @@ class GenerateLossClustersResponse(_common.BaseModel): description="""The analysis results, one per config provided in the request.""", ) + def show(self) -> None: + """Shows the loss pattern analysis report with rich HTML visualization.""" + from .. import _evals_visualization + + _evals_visualization.display_loss_clusters_response(self) + class GenerateLossClustersResponseDict(TypedDict, total=False): """Response message for EvaluationAnalyticsService.GenerateLossClusters."""