diff --git a/assets/evaluators/builtin/coherence/evaluator/_coherence.py b/assets/evaluators/builtin/coherence/evaluator/_coherence.py index 091fad4c24..bb81bb7ad8 100644 --- a/assets/evaluators/builtin/coherence/evaluator/_coherence.py +++ b/assets/evaluators/builtin/coherence/evaluator/_coherence.py @@ -3,15 +3,28 @@ import math import os +import logging from abc import ABC, abstractmethod from enum import Enum -from typing import Any, Dict, Optional, Union, List +from typing import Any, Dict, Optional, Union, List, Tuple from typing_extensions import overload, override +if os.getenv("AI_EVALS_USE_PF_PROMPTY", "false").lower() == "true": + from promptflow.core._flow import AsyncPrompty +else: + from azure.ai.evaluation._legacy.prompty import AsyncPrompty + from azure.ai.evaluation._evaluators._common import PromptyEvaluatorBase from azure.ai.evaluation._model_configurations import Conversation from azure.ai.evaluation._exceptions import EvaluationException, ErrorBlame, ErrorCategory, ErrorTarget +from azure.ai.evaluation._common.utils import ( + construct_prompty_model_config, + validate_model_config, + _extract_text_from_content, + _get_agent_response, + _pretty_format_conversation_history, +) # region Validators @@ -33,6 +46,71 @@ class MessageRole(str, Enum): ASSISTANT = "assistant" SYSTEM = "system" TOOL = "tool" + DEVELOPER = "developer" + + +class EvaluationLevel(str, Enum): + """Supported evaluation levels for CoherenceEvaluator. + + - ``CONVERSATION``: Force conversation-level evaluation using the multi-turn path. + - ``TURN``: Force turn-level evaluation using the single-turn query/response path. + """ + + CONVERSATION = "conversation" + TURN = "turn" + + +def _merge_query_response_messages(query: List[dict], response: List[dict]) -> List[dict]: + """Merge query and response message lists into a single conversation.""" + return [*query, *response] + + +def _split_messages_at_latest_user(messages: List[dict]) -> Tuple[List[dict], List[dict]]: + """Split messages into query/response slices at the latest user turn.""" + latest_user_index = max(i for i, message in enumerate(messages) if message["role"] == MessageRole.USER) + return messages[: latest_user_index + 1], messages[latest_user_index + 1:] + + +def _wrap_string_messages(query: str, response: str) -> Tuple[List[dict], List[dict]]: + """Wrap string query/response into separate message lists.""" + return ( + [{"role": "user", "content": [{"type": "text", "text": query}]}], + [{"role": "assistant", "content": [{"type": "text", "text": response}]}], + ) + + +def _resolve_evaluation_level( + evaluation_level: Optional[Union[EvaluationLevel, str]], + error_target: ErrorTarget, +) -> Optional[EvaluationLevel]: + """Validate and normalize the evaluation_level parameter.""" + valid = [level.value for level in EvaluationLevel] + if evaluation_level is None or evaluation_level == '': + return None + if isinstance(evaluation_level, EvaluationLevel): + return evaluation_level + if isinstance(evaluation_level, str): + try: + return EvaluationLevel(evaluation_level) + except ValueError: + raise EvaluationException( + message=( + f"Invalid evaluation_level '{evaluation_level}'. " + f"Must be one of: {valid}." + ), + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.INVALID_VALUE, + target=error_target, + ) + raise EvaluationException( + message=( + f"Invalid evaluation_level '{evaluation_level}'. " + f"Must be one of: {valid}." + ), + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.INVALID_VALUE, + target=error_target, + ) class ContentType(str, Enum): @@ -459,13 +537,21 @@ def _validate_response(self, response: Any) -> Optional[EvaluationException]: @override def validate_eval_input(self, eval_input: Dict[str, Any]) -> bool: - """Validate the evaluation input dictionary.""" + """Validate the evaluation input dictionary. + + Supports two input modes: + 1. ``conversation`` β€” dict with a ``messages`` key (legacy conversation format) + 2. ``query`` / ``response`` β€” single-turn evaluation + """ + # Legacy conversation path conversation = eval_input.get("conversation") if conversation: conversation_validation_exception = self._validate_conversation(conversation) if conversation_validation_exception: raise conversation_validation_exception return True + + # Single-turn query/response path query = eval_input.get("query") response = eval_input.get("response") query_validation_exception = self._validate_query(query) @@ -477,6 +563,116 @@ def validate_eval_input(self, eval_input: Dict[str, Any]) -> bool: return True +class MessagesOrQueryResponseInputValidator(ConversationValidator): + """Validator that supports both single-turn (query/response) and multi-turn (messages) inputs. + + When ``messages`` is provided, it validates the messages list. + Otherwise, it delegates to the parent ``ConversationValidator`` for the query/response path. + """ + + @override + def validate_eval_input(self, eval_input: Dict[str, Any]) -> bool: + """Validate evaluation input, supporting messages as an alternative to query/response.""" + messages = eval_input.get("messages") + if messages is not None: + if not isinstance(messages, list): + raise EvaluationException( + message="messages must be provided as a list of message dictionaries.", + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.INVALID_VALUE, + target=self.error_target, + ) + if len(messages) == 0: + raise EvaluationException( + message="messages list must not be empty.", + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.INVALID_VALUE, + target=self.error_target, + ) + + # Per-message structural checks + valid_roles = {r.value for r in MessageRole} + roles_present: set = set() + for i, msg in enumerate(messages): + if not isinstance(msg, dict): + raise EvaluationException( + message=( + f"Each item in 'messages' must be a dictionary, " + f"but item at index {i} is {type(msg).__name__}." + ), + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.INVALID_VALUE, + target=self.error_target, + ) + role = msg.get("role") + if role is None: + raise EvaluationException( + message=f"Each message must contain a 'role' key, but message at index {i} is missing it.", + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.INVALID_VALUE, + target=self.error_target, + ) + if role not in valid_roles: + raise EvaluationException( + message=( + f"Invalid role '{role}' at message index {i}. " + f"Must be one of: {sorted(valid_roles)}." + ), + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.INVALID_VALUE, + target=self.error_target, + ) + roles_present.add(role) + + # Conversation-level checks + if MessageRole.USER not in roles_present: + raise EvaluationException( + message="messages must contain at least one message with role 'user'.", + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.INVALID_VALUE, + target=self.error_target, + ) + if MessageRole.ASSISTANT not in roles_present: + raise EvaluationException( + message="messages must contain at least one message with role 'assistant'.", + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.INVALID_VALUE, + target=self.error_target, + ) + if messages[-1]["role"] != MessageRole.ASSISTANT: + raise EvaluationException( + message=( + f"The last message must have role 'assistant', " + f"but found role '{messages[-1]['role']}'." + ), + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.INVALID_VALUE, + target=self.error_target, + ) + # The final assistant message must contain text + last_content = messages[-1].get("content", "") + if isinstance(last_content, list): + has_text = any( + isinstance(c, dict) and c.get("type") in ("text",) + or isinstance(c, str) + for c in last_content + ) + if not has_text: + raise EvaluationException( + message=( + "The last assistant message must contain text content, " + "not only tool calls. The conversation appears to be " + "mid-execution β€” provide the agent's final text response." + ), + blame=ErrorBlame.USER_ERROR, + category=ErrorCategory.INVALID_VALUE, + target=self.error_target, + ) + + return True + return super().validate_eval_input(eval_input) + + # endregion Validators @@ -546,6 +742,82 @@ def _preprocess_messages(messages): return messages +def serialize_messages(messages: List[dict]) -> str: + """Serialize a list of chat messages into a labeled transcript for multi-turn coherence.""" + if not messages: + return "" + + all_user_queries: List = [] + all_agent_responses: List = [] + cur_user_query: List = [] + cur_agent_response: List = [] + system_message = None + + for msg in messages: + if not isinstance(msg, dict): + continue + role = msg.get("role") + if not role: + continue + + normalized = msg + if role == MessageRole.ASSISTANT and isinstance(msg.get("content"), str): + normalized = {**msg, "content": [{"type": "text", "text": msg["content"]}]} + + if role in (MessageRole.SYSTEM, MessageRole.DEVELOPER): + system_message = msg.get("content", "") + elif role == MessageRole.USER and "content" in msg: + if cur_agent_response: + formatted = _get_agent_response(cur_agent_response, include_tool_messages=True) + all_agent_responses.append([formatted]) + cur_agent_response = [] + content = msg["content"] + if isinstance(content, str): + text_in_msg = [content] + else: + text_in_msg = _extract_text_from_content(content) + if text_in_msg: + cur_user_query.append(text_in_msg) + elif role in (MessageRole.ASSISTANT, MessageRole.TOOL): + if cur_user_query: + all_user_queries.append(cur_user_query) + cur_user_query = [] + cur_agent_response.append(normalized) + + if cur_user_query: + all_user_queries.append(cur_user_query) + if cur_agent_response: + formatted = _get_agent_response(cur_agent_response, include_tool_messages=True) + all_agent_responses.append([formatted]) + + conversation_history: Dict = { + "user_queries": all_user_queries, + "agent_responses": all_agent_responses[:len(all_user_queries) - 1] + if len(all_user_queries) > 0 + else [], + } + if system_message: + conversation_history["system_message"] = system_message + + result = _pretty_format_conversation_history(conversation_history) + + start = max(len(all_user_queries) - 1, 0) + for i, agent_response in enumerate(all_agent_responses[start:], start=start): + result += f"Agent turn {i + 1}:\n" + for msg_text in agent_response: + if isinstance(msg_text, list): + for submsg in msg_text: + result += " " + "\n ".join(submsg.split("\n")) + "\n" + else: + result += " " + "\n ".join(msg_text.split("\n")) + "\n" + result += "\n" + + return result.rstrip("\n") + + +logger = logging.getLogger(__name__) + + class CoherenceEvaluator(PromptyEvaluatorBase[Union[str, float]]): """ Evaluates coherence score for a given query and response or a multi-turn conversation, including reasoning. @@ -601,7 +873,9 @@ class CoherenceEvaluator(PromptyEvaluatorBase[Union[str, float]]): """ _PROMPTY_FILE = "coherence.prompty" + _MULTI_TURN_PROMPTY_FILE = "coherence_multi_turn.prompty" _RESULT_KEY = "coherence" + _OPTIONAL_PARAMS = ["messages"] _validator: ValidatorInterface @@ -609,7 +883,7 @@ class CoherenceEvaluator(PromptyEvaluatorBase[Union[str, float]]): """Evaluator identifier, experimental and to be used only with evaluation in cloud.""" @override - def __init__(self, model_config, *, threshold=3, credential=None, **kwargs): + def __init__(self, model_config, *, threshold=3, credential=None, evaluation_level=None, **kwargs): """Initialize the Coherence evaluator. :param model_config: Configuration for the Azure OpenAI model. @@ -619,14 +893,24 @@ def __init__(self, model_config, *, threshold=3, credential=None, **kwargs): :type threshold: int :param credential: The credential for authentication. :type credential: Optional[Any] + :keyword evaluation_level: Force a specific evaluation level for this invocation. When ``None`` + (default), the level is auto-detected from input shape (``messages`` -> conversation, + ``query``/``response`` -> turn). Set to ``EvaluationLevel.CONVERSATION`` or + ``EvaluationLevel.TURN`` to override auto-detection. + :type evaluation_level: Optional[Union[EvaluationLevel, str]] """ current_dir = os.path.dirname(__file__) prompty_path = os.path.join(current_dir, self._PROMPTY_FILE) self._threshold = threshold self._higher_is_better = True - # Initialize input validator - self._validator = ConversationValidator(error_target=ErrorTarget.COHERENCE_EVALUATOR) + # Validate and store evaluation level + self._evaluation_level = _resolve_evaluation_level( + evaluation_level, ErrorTarget.COHERENCE_EVALUATOR + ) + + # Initialize input validator (supports both query/response and messages) + self._validator = MessagesOrQueryResponseInputValidator(error_target=ErrorTarget.COHERENCE_EVALUATOR) super().__init__( model_config=model_config, @@ -638,6 +922,20 @@ def __init__(self, model_config, *, threshold=3, credential=None, **kwargs): **kwargs, ) + # Load the multi-turn prompty flow for conversation-level evaluation + multi_turn_prompty_path = os.path.join(current_dir, self._MULTI_TURN_PROMPTY_FILE) + prompty_model_config = construct_prompty_model_config( + validate_model_config(model_config), + self._DEFAULT_OPEN_API_VERSION, + f"azure-ai-evaluation (type=evaluator subtype={self.__class__.__name__})", + ) + self._multi_turn_flow = AsyncPrompty.load( + source=multi_turn_prompty_path, + model=prompty_model_config, + token_credential=credential, + is_reasoning_model=self._is_reasoning_model, + ) + @overload def __call__( self, @@ -671,6 +969,14 @@ def __call__( :rtype: Dict[str, Union[float, Dict[str, List[float]]]] """ + @overload + def __call__( + self, + *, + messages: List[dict], + ) -> Dict[str, Union[str, float]]: + """Evaluate coherence for a full multi-turn conversation.""" + @override def __call__( # pylint: disable=docstring-missing-param self, @@ -705,6 +1011,7 @@ def _not_applicable_result( f"{self._result_key}_result": "pass", f"{self._result_key}_threshold": threshold, f"{self._result_key}_reason": f"Not applicable: {error_message}", + f"{self._result_key}_properties": {}, f"{self._result_key}_prompt_tokens": 0, f"{self._result_key}_completion_tokens": 0, f"{self._result_key}_total_tokens": 0, @@ -714,6 +1021,42 @@ def _not_applicable_result( f"{self._result_key}_sample_output": "", } + def _should_use_conversation_level(self, eval_input: Dict) -> bool: + """Determine whether to use conversation-level evaluation.""" + if self._evaluation_level == EvaluationLevel.CONVERSATION: + return True + if self._evaluation_level == EvaluationLevel.TURN: + return False + return eval_input.get("messages") is not None + + def _build_result( + self, + score: Optional[int], + result: str, + reason: str, + status: str, + properties: Dict, + prompty_output_dict: Optional[Dict] = None, + ) -> Dict[str, Union[str, int, float, Dict, None]]: + """Build a standardized result dictionary for multi-turn coherence outputs.""" + p = prompty_output_dict if isinstance(prompty_output_dict, dict) else {} + return { + self._result_key: score, + f"{self._result_key}_score": score, + f"{self._result_key}_result": result, + f"{self._result_key}_threshold": self._threshold, + f"{self._result_key}_reason": reason, + f"{self._result_key}_status": status, + f"{self._result_key}_properties": properties, + f"{self._result_key}_prompt_tokens": p.get("input_token_count", 0), + f"{self._result_key}_completion_tokens": p.get("output_token_count", 0), + f"{self._result_key}_total_tokens": p.get("total_token_count", 0), + f"{self._result_key}_finish_reason": p.get("finish_reason", ""), + f"{self._result_key}_model": p.get("model_id", ""), + f"{self._result_key}_sample_input": p.get("sample_input", ""), + f"{self._result_key}_sample_output": p.get("sample_output", ""), + } + @override async def _real_call(self, **kwargs): """Perform asynchronous call where real end-to-end evaluation logic is executed. @@ -723,6 +1066,20 @@ async def _real_call(self, **kwargs): :return: The evaluation result. :rtype: Union[DoEvalResult[T_EvalValue], AggregateResult[T_EvalValue]] """ + # Reshape inputs based on evaluation level before validation + if self._evaluation_level == EvaluationLevel.CONVERSATION and not kwargs.get("messages"): + query = kwargs.get("query") + response = kwargs.get("response") + if isinstance(query, str) and isinstance(response, str) and query and response: + query, response = _wrap_string_messages(query, response) + if isinstance(query, list) and isinstance(response, list): + kwargs["messages"] = _merge_query_response_messages(query, response) + elif self._evaluation_level == EvaluationLevel.TURN and kwargs.get("messages"): + if any(m.get("role") == MessageRole.USER for m in kwargs["messages"]): + query_messages, response_messages = _split_messages_at_latest_user(kwargs["messages"]) + kwargs["query"] = query_messages + kwargs["response"] = response_messages + # Validate input before processing self._validator.validate_eval_input(kwargs) @@ -737,6 +1094,9 @@ async def _do_eval(self, eval_input: Dict) -> Dict[str, Union[float, str]]: # t :return: The evaluation result. :rtype: Dict """ + if self._should_use_conversation_level(eval_input): + return await self._do_eval_conversation_level(eval_input) + if _is_intermediate_response(eval_input.get("response")): return self._not_applicable_result( "Intermediate response. Please provide the agent's final response for evaluation.", @@ -746,6 +1106,7 @@ async def _do_eval(self, eval_input: Dict) -> Dict[str, Union[float, str]]: # t eval_input["response"] = _preprocess_messages(eval_input["response"]) if isinstance(eval_input.get("query"), list): eval_input["query"] = _preprocess_messages(eval_input["query"]) + eval_input.pop("messages", None) result = await super()._do_eval(eval_input) @@ -758,3 +1119,56 @@ async def _do_eval(self, eval_input: Dict) -> Dict[str, Union[float, str]]: # t target=ErrorTarget.COHERENCE_EVALUATOR, ) return result + + async def _do_eval_conversation_level(self, eval_input: Dict) -> Dict[str, Union[str, int, float, Dict, None]]: + """Evaluate coherence for a full multi-turn conversation.""" + messages = _preprocess_messages(eval_input["messages"]) + conversation_text = serialize_messages(messages) + prompty_output_dict = await self._multi_turn_flow( + timeout=self._LLM_CALL_TIMEOUT, + messages=conversation_text, + ) + return self._parse_prompty_output(prompty_output_dict) + + def _parse_prompty_output(self, prompty_output_dict: Dict) -> Dict[str, Union[str, int, float, Dict, None]]: + """Parse multi-turn prompty JSON output into evaluator result schema.""" + llm_output = prompty_output_dict.get("llm_output", prompty_output_dict) + score = None + result = "error" + reason = "Evaluator returned invalid output." + status = "error" + properties = {} + + if isinstance(llm_output, dict): + status = str(llm_output.get("status", "completed")).strip().lower() + reason = llm_output.get("reason", "") + properties = llm_output.get("properties") or {} + + if status == "skipped": + result = "not_applicable" + reason = reason or "Conversation coherence cannot be evaluated due to non-logical user flow." + else: + score_value = llm_output.get("score") + if score_value is None: + result = "error" + reason = "Evaluator returned invalid output: missing 'score'." + status = "error" + else: + try: + score_float = float(score_value) + except (TypeError, ValueError): + result = "error" + reason = f"Evaluator returned invalid output: invalid 'score' value: {score_value}" + status = "error" + else: + score = max(1, min(5, int(round(score_float)))) + result = "pass" if score >= self._threshold else "fail" + + return self._build_result( + score=score, + result=result, + reason=reason, + status=status, + properties=properties, + prompty_output_dict=prompty_output_dict, + ) diff --git a/assets/evaluators/builtin/coherence/evaluator/coherence_multi_turn.prompty b/assets/evaluators/builtin/coherence/evaluator/coherence_multi_turn.prompty new file mode 100644 index 0000000000..e3e9af2481 --- /dev/null +++ b/assets/evaluators/builtin/coherence/evaluator/coherence_multi_turn.prompty @@ -0,0 +1,169 @@ +--- +name: Coherence Multi-Turn +description: Evaluates conversation-level coherence with simple skip gating +model: + api: chat + parameters: + temperature: 0.0 + max_tokens: 2500 + top_p: 1.0 + presence_penalty: 0 + frequency_penalty: 0 + response_format: + type: json_object +inputs: + messages: + type: string +--- +system: +# Instruction +You are an expert evaluator for conversation-level coherence. + +user: +# Definition +**Coherence** is how logically consistent, well-connected, and easy to follow the AGENT's responses are across the full conversation. + +## Agent-primary scope +- Primary target is AGENT coherence across the session. +- Do not score only the last AGENT response. +- Use the full trajectory and how responses connect over time. + +## Simple skip gating (conversation-level) +Before scoring the AGENT: +1. Check whether user turns after the first are logical follow-ups to prior context. +2. If user topics are broadly scattered and the conversation is mostly derailed, mark the evaluation as skipped. +3. Distinguish user pivots: + - Pivot while the current task is still active/unresolved: treat as derail evidence. + - Pivot after the AGENT has clearly completed the prior task: treat as a new valid thread; do not skip for this alone. + +Use **skipped** only when conversation flow is clearly non-evaluable, such as: +- most follow-up user turns are unrelated topic jumps, and +- there is no stable conversational thread to judge AGENT coherence fairly. + +If there is a usable conversation thread, do **not** skip; score AGENT coherence. + +## Conversation examples for skip gating +Use these examples as guidance for when to skip vs continue scoring. + +### Example A: Prior task completed, then user starts a new task (do not skip) +- User Turn 1: "Can you summarize this release note?" +- Agent Turn 1: "Here is a concise summary with key changes." +- User Turn 2: "Great, that solves it. Now help me draft an email to the team." +- Agent Turn 2: "Sure, here is an email draft." +Expected gating outcome: `completed` (not skipped). The pivot happens after the first task is clearly completed. + +### Example B: User pivots mid-task and keeps jumping topics (skip) +- User Turn 1: "Help me fix this Python import error." +- Agent Turn 1: "Share the traceback and your import statements." +- User Turn 2: "Also, what is the weather in Tokyo?" +- Agent Turn 2: "I can help with weather, but please share the traceback for the error." +- User Turn 3: "Actually suggest a beach vacation plan." +- Agent Turn 3: "Where would you like to travel for vacation?" +Expected gating outcome: `skipped`. The original task stays unresolved and most follow-ups are unrelated jumps. + +### Example C: Mostly stable thread with one brief side jump (do not skip) +- User Turn 1: "Plan a 3-day Rome itinerary." +- Agent Turn 1: "Day-by-day plan with museums and food spots." +- User Turn 2: "Quick one: what time zone is Rome?" +- Agent Turn 2: "Central European Time." +- User Turn 3: "Great, continue with restaurant options near the day-2 route." +- Agent Turn 3: "Sure, let me search for restaurants.." +Expected gating outcome: `completed` (not skipped). There is still a stable conversation thread. + +# Ratings +## [Coherence: 1] (Severely Incoherent Conversation) +AGENT responses are disjointed or contradictory across turns, fail to track context, and are hard to follow. + +### Example +- User Turn 1: "Help me compare renting vs buying." +- Agent Turn 1: "Renting has lower upfront cost." +- User Turn 2: "Can you include monthly cost factors?" +- Agent Turn 2: "Bananas are yellow and trains are fast." +- User Turn 3: "Please answer my housing question." +- Agent Turn 3: "I already explained quantum physics." + +## [Coherence: 2] (Poor Conversation Coherence) +AGENT responses show major flow problems, frequent context breaks, and weak turn-to-turn continuity. + +### Example +- User Turn 1: "Plan a simple workout for beginners." +- Agent Turn 1: "Start with 20 minutes of light cardio." +- User Turn 2: "What should I do on day two?" +- Agent Turn 2: "You should learn SQL joins." +- User Turn 3: "I meant the workout plan." +- Agent Turn 3: "Maybe do squats, but also think about web hosting." + +## [Coherence: 3] (Partially Coherent Conversation) +AGENT responses are somewhat connected but with noticeable jumps, weak transitions, or missed context links. + +### Example +- User Turn 1: "Help me prepare for a product manager interview." +- Agent Turn 1: "Start with product sense and execution questions." +- User Turn 2: "Can you suggest a 2-week plan?" +- Agent Turn 2: "Week 1: product sense. Week 2: execution and metrics." +- User Turn 3: "How should I practice daily?" +- Agent Turn 3: "Do one mock daily, and also maybe learn guitar theory." + +## [Coherence: 4] (Coherent Conversation) +AGENT responses are mostly well-structured across turns, with clear continuity and understandable progression. + +### Example +- User Turn 1: "Help me draft a customer apology email." +- Agent Turn 1: "Start with apology, impact acknowledgement, and next steps." +- User Turn 2: "Can you make it shorter and warmer?" +- Agent Turn 2: "Sure, here is a concise version with a friendlier tone." +- User Turn 3: "Add one sentence about refund timing." +- Agent Turn 3: "Added a sentence that the refund will post within 5 business days." + +## [Coherence: 5] (Highly Coherent Conversation) +AGENT responses are consistently logical and seamless across turns, preserving context and progression with excellent clarity. + +### Example +- User Turn 1: "I need a migration plan from MySQL to PostgreSQL." +- Agent Turn 1: "Phase 1: schema mapping and compatibility checks." +- User Turn 2: "What should Phase 2 include?" +- Agent Turn 2: "Phase 2: data migration rehearsal, validation, and rollback plan." +- User Turn 3: "Now give me a risk table by phase." +- Agent Turn 3: "Here is a phase-by-phase risk table with mitigation and owners." + +## Conversation examples by coherence score +Use these examples to anchor scoring after skip gating passes. + +# Data +CONVERSATION: {{messages}} + +# Tasks +Return ONLY a JSON object with: +- `score`: integer 1-5 if completed, or null if skipped +- `status`: "completed" or "skipped" +- `reason`: short explanation (15-60 words) +- `properties`: object for completed, null for skipped + +For `properties` in completed evaluations, include: +- `gating_summary`: short summary of the user-flow gate outcome +- `conversation_flow_summary`: how AGENT responses flowed across the session +- `agent_coherence_issues`: concrete AGENT coherence issues, or "None" + +## Output format (completed) +{ + "score": 4, + "status": "completed", + "reason": "The conversation is coherent overall. The agent mostly keeps topic continuity and logical flow, with minor transition roughness in one follow-up.", + "properties": { + "gating_summary": "User flow remained mostly on-topic, so scoring proceeded.", + "conversation_flow_summary": "Agent responses stayed aligned with prior turns and maintained a clear progression.", + "agent_coherence_issues": "Minor abrupt shift in one follow-up explanation." + } +} + +## Output format for skipped conversations (example) +{ + "score": null, + "status": "skipped", + "reason": "Conversation is mostly derailed by unrelated user topic jumps, so agent coherence cannot be judged fairly.", + "properties": null +} + +Return valid JSON only. No markdown. No extra keys. + +# Output diff --git a/assets/evaluators/builtin/coherence/spec.yaml b/assets/evaluators/builtin/coherence/spec.yaml index 54aca0c2f4..65d6c00872 100644 --- a/assets/evaluators/builtin/coherence/spec.yaml +++ b/assets/evaluators/builtin/coherence/spec.yaml @@ -1,11 +1,12 @@ type: "evaluator" name: "builtin.coherence" -version: 4 +version: 5 displayName: "Coherence-Evaluator" description: "Evaluates how logically connected and consistent the response is. Ensures ideas flow naturally and make sense together. It’s best used for generative business writing such as summarizing meeting notes, creating marketing materials, and drafting emails." evaluatorType: "builtin" evaluatorSubType: "code" categories: ["quality", "agents"] +supportedEvaluationLevels: ["conversation", "turn"] tags: provider: "Microsoft" is_continuous_scenario: "true" @@ -19,19 +20,35 @@ initParameterSchema: minimum: 1 maximum: 5 multipleOf: 1 + evaluation_level: + type: "string" required: ["deployment_name"] dataMappingSchema: type: "object" properties: query: - type: "string" + anyOf: + - type: "string" + - type: "array" + items: + type: "object" response: - type: "string" - required: ["query", "response"] + anyOf: + - type: "string" + - type: "array" + items: + type: "object" + messages: + type: "array" + items: + type: "object" + anyOf: + - required: ["query", "response"] + - required: ["messages"] outputSchema: coherence: type: "ordinal" desirable_direction: "increase" min_value: 1 max_value: 5 -path: ./evaluator \ No newline at end of file +path: ./evaluator diff --git a/assets/evaluators/tests/test_evaluators_behavior/test_coherence_evaluator_behavior.py b/assets/evaluators/tests/test_evaluators_behavior/test_coherence_evaluator_behavior.py index fe8eb02ae9..39d8cca9b5 100644 --- a/assets/evaluators/tests/test_evaluators_behavior/test_coherence_evaluator_behavior.py +++ b/assets/evaluators/tests/test_evaluators_behavior/test_coherence_evaluator_behavior.py @@ -3,11 +3,23 @@ """Behavioral tests for Coherence Evaluator.""" +import os import pytest +from typing import Any, Dict, List +from unittest.mock import MagicMock + +from azure.ai.evaluation import AzureOpenAIModelConfiguration +from azure.ai.evaluation._exceptions import EvaluationException + from .base_evaluator_behavior_test import BaseEvaluatorBehaviorTest from .base_tool_evaluation_test import BaseToolEvaluationTest from . import common_tool_test_data as data -from ...builtin.coherence.evaluator._coherence import CoherenceEvaluator +from ...builtin.coherence.evaluator._coherence import ( + CoherenceEvaluator, + EvaluationLevel, + serialize_messages, +) +from ..common.evaluator_mock_config import get_flow_side_effect_for_evaluator @pytest.mark.unittest @@ -96,3 +108,234 @@ class TestCoherenceEvaluatorBehavior(BaseEvaluatorBehaviorTest, BaseToolEvaluati # endregion evaluator_type = CoherenceEvaluator + + +def _create_multi_turn_mock_side_effect( + score: int = 5, + status: str = "completed", + reason: str = "Conversation is coherent overall.", + properties: Dict[str, Any] = None, +): + """Create a mock side effect that returns dict output for multi-turn coherence.""" + if properties is None and status == "completed": + properties = { + "gating_summary": "User flow mostly on-topic.", + "conversation_flow_summary": "Agent responses follow context across turns.", + "agent_coherence_issues": "None", + } + + async def flow_side_effect(timeout, **kwargs): + return { + "llm_output": { + "score": score if status == "completed" else None, + "status": status, + "reason": reason, + "properties": properties if status == "completed" else None, + } + } + + return flow_side_effect + + +def _create_mocked_coherence_evaluator(evaluation_level=None, multi_turn_side_effect=None): + """Create a CoherenceEvaluator with both _flow and _multi_turn_flow mocked.""" + model_config = AzureOpenAIModelConfiguration( + azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT", "https://Sanitized.api.cognitive.microsoft.com"), + azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT", "aoai-deployment"), + ) + evaluator = CoherenceEvaluator(model_config=model_config, evaluation_level=evaluation_level) + evaluator._flow = MagicMock(side_effect=get_flow_side_effect_for_evaluator("coherence")) + evaluator._multi_turn_flow = MagicMock(side_effect=multi_turn_side_effect or _create_multi_turn_mock_side_effect()) + return evaluator + + +# region Multi-turn (messages) behavioral tests + +VALID_MESSAGES: List[Dict[str, Any]] = [ + { + "role": "user", + "content": [{"type": "text", "text": "I need to plan a trip to Paris."}], + }, + { + "role": "assistant", + "content": [{"type": "text", "text": "Sure, what dates are you considering?"}], + }, + { + "role": "user", + "content": [{"type": "text", "text": "Next weekend. I also want museum recommendations."}], + }, + { + "role": "assistant", + "content": [{"type": "text", "text": "Great. For next weekend, I recommend the Louvre and Musee d'Orsay."}], + }, +] + + +@pytest.mark.unittest +class TestCoherenceMultiturnBehavior: + """Behavioral tests for the multi-turn (messages) path of CoherenceEvaluator.""" + + def test_messages_valid_input(self): + """Valid messages list produces expected output fields.""" + evaluator = _create_mocked_coherence_evaluator() + result = evaluator(messages=VALID_MESSAGES) + + assert "coherence" in result + assert "coherence_result" in result + assert "coherence_reason" in result + assert "coherence_score" in result + assert "coherence_status" in result + assert "coherence_properties" in result + assert "coherence_threshold" in result + assert 1 <= result["coherence"] <= 5 + assert result["coherence_score"] == result["coherence"] + assert result["coherence_status"] == "completed" + + def test_messages_string_content(self): + """Messages with string content are handled and serialized.""" + evaluator = _create_mocked_coherence_evaluator() + messages = [ + {"role": "user", "content": "What is photosynthesis?"}, + {"role": "assistant", "content": "It is how plants convert sunlight into energy."}, + ] + result = evaluator(messages=messages) + + assert "coherence" in result + call_kwargs = evaluator._multi_turn_flow.call_args + conversation_text = call_kwargs.kwargs.get("messages", "") + assert "What is photosynthesis?" in conversation_text + + def test_messages_with_system_message(self): + """Messages with system/developer context are handled.""" + evaluator = _create_mocked_coherence_evaluator() + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + ] + VALID_MESSAGES + result = evaluator(messages=messages) + assert "coherence" in result + + def test_messages_intermediate_response_rejected(self): + """Messages ending with only tool calls (no text) are rejected.""" + evaluator = _create_mocked_coherence_evaluator() + messages = [ + {"role": "user", "content": [{"type": "text", "text": "Find me flights."}]}, + { + "role": "assistant", + "content": [ + { + "type": "function_call", + "name": "search_flights", + "tool_call_id": "call_1", + "arguments": {"origin": "NYC", "destination": "Paris"}, + } + ], + }, + ] + with pytest.raises(EvaluationException, match="must contain text content"): + evaluator(messages=messages) + + def test_messages_uses_multi_turn_flow(self): + """Verify that messages path calls _multi_turn_flow, not _flow.""" + evaluator = _create_mocked_coherence_evaluator() + evaluator(messages=VALID_MESSAGES) + + evaluator._multi_turn_flow.assert_called_once() + evaluator._flow.assert_not_called() + + def test_query_response_uses_single_turn_flow(self): + """Verify that query/response path still calls _flow.""" + evaluator = _create_mocked_coherence_evaluator() + evaluator(query="What is photosynthesis?", response="It is how plants convert sunlight into energy.") + + evaluator._flow.assert_called_once() + evaluator._multi_turn_flow.assert_not_called() + + def test_messages_skip_output_maps_to_not_applicable(self): + """Skipped multi-turn output follows standardized skipped schema.""" + skipped_side_effect = _create_multi_turn_mock_side_effect( + status="skipped", + reason="Conversation is mostly derailed by unrelated topic jumps.", + ) + evaluator = _create_mocked_coherence_evaluator(multi_turn_side_effect=skipped_side_effect) + result = evaluator(messages=VALID_MESSAGES) + + assert result["coherence"] is None + assert result["coherence_score"] is None + assert result["coherence_result"] == "not_applicable" + assert result["coherence_status"] == "skipped" + assert result["coherence_properties"] == {} + + +# endregion + + +# region evaluation_level tests + + +@pytest.mark.unittest +class TestCoherenceEvaluationLevel: + """Tests for the evaluation_level parameter.""" + + def test_auto_detect_uses_multi_turn_for_messages(self): + """Default mode auto-detects multi-turn when messages are provided.""" + evaluator = _create_mocked_coherence_evaluator(evaluation_level=None) + evaluator(messages=VALID_MESSAGES) + evaluator._multi_turn_flow.assert_called_once() + evaluator._flow.assert_not_called() + + def test_auto_detect_uses_single_turn_for_query_response(self): + """Default mode auto-detects single-turn for query/response.""" + evaluator = _create_mocked_coherence_evaluator(evaluation_level=None) + evaluator(query="What is the capital of France?", response="Paris.") + evaluator._flow.assert_called_once() + evaluator._multi_turn_flow.assert_not_called() + + def test_forced_conversation_with_string_query_response_wraps_to_messages(self): + """Forced conversation wraps string query/response into messages and uses multi-turn flow.""" + evaluator = _create_mocked_coherence_evaluator(evaluation_level=EvaluationLevel.CONVERSATION) + result = evaluator(query="What is the capital of France?", response="Paris.") + evaluator._multi_turn_flow.assert_called_once() + evaluator._flow.assert_not_called() + call_kwargs = evaluator._multi_turn_flow.call_args + conversation_text = call_kwargs.kwargs.get("messages", "") + assert "What is the capital of France?" in conversation_text + assert "Paris." in conversation_text + assert "coherence" in result + + def test_forced_turn_with_messages_converts(self): + """Forced turn converts messages into query/response and uses single-turn flow.""" + evaluator = _create_mocked_coherence_evaluator(evaluation_level=EvaluationLevel.TURN) + result = evaluator(messages=VALID_MESSAGES) + evaluator._flow.assert_called_once() + evaluator._multi_turn_flow.assert_not_called() + assert "coherence" in result + + def test_invalid_evaluation_level_raises(self): + """Invalid evaluation level raises at init time.""" + with pytest.raises(EvaluationException, match="Invalid evaluation_level"): + _create_mocked_coherence_evaluator(evaluation_level="batch") + + +# endregion + + +# region serialize_messages tests + + +class TestCoherenceSerializeMessages: + """Unit tests for coherence serialize_messages helper.""" + + def test_simple_conversation_serializes(self): + """Simple user/assistant messages are serialized with turn labels.""" + messages = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + ] + output = serialize_messages(messages) + assert "User turn 1:" in output + assert "Agent turn 1:" in output + assert "Hello" in output + assert "Hi there!" in output + + +# endregion