From f937fa563938a1d53e0d615a9297560dec988ab6 Mon Sep 17 00:00:00 2001 From: Sandy Chapman Date: Wed, 24 Jun 2026 16:34:52 -0300 Subject: [PATCH] =?UTF-8?q?feat(evaluator):=20add=20Trial=E2=86=92Intake?= =?UTF-8?q?=20boundary=20mapping=20module=20(D8)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds plugins/nemo-evaluator/src/nemo_evaluator/intake/mapping.py: the single pure layer that translates Evaluator vocabulary (AgentEvalTrial, AgentEvalTaskScore, MetricOutput) into the platform SDK's typed Intake request params, so the D3/D4/D5 write-adapters share one source of request shapes. - trial_to_atif_ingest -> AtifCreateParams (minimal single-step trajectory until D2 trace normalization; defaults agent.version per design §3.9 #6). - score_to_evaluator_results -> list[EvaluatorResultCreateParams], one row per MetricOutput, name='{metric_type}.{output}', span_id supplied by the caller (resolved post-ingest; the adapter owns that orchestration). - run_task_to_experiment_context -> ExperimentContextParam (lean {experiment_id, test_case_id}). Returns the generated nemo-platform-sdk *CreateParams TypedDicts (runtime dicts, statically checked against the real schema) rather than hand-shaped dicts; imports the SDK client types, never the Intake service (nmp.intake.*). CATEGORICAL coercion is intentionally deferred (strings -> TEXT) until a real signal exists. Includes unit tests for all coercions + the .root unwrap and an import-hygiene guardrail. Refs: AALGO-289 Co-Authored-By: Claude Opus 4.8 Signed-off-by: Sandy Chapman --- .../src/nemo_evaluator/intake/mapping.py | 169 ++++++++++++++++ .../tests/intake/test_import_hygiene.py | 39 ++++ .../tests/intake/test_mapping.py | 187 ++++++++++++++++++ 3 files changed, 395 insertions(+) create mode 100644 plugins/nemo-evaluator/src/nemo_evaluator/intake/mapping.py create mode 100644 plugins/nemo-evaluator/tests/intake/test_import_hygiene.py create mode 100644 plugins/nemo-evaluator/tests/intake/test_mapping.py diff --git a/plugins/nemo-evaluator/src/nemo_evaluator/intake/mapping.py b/plugins/nemo-evaluator/src/nemo_evaluator/intake/mapping.py new file mode 100644 index 0000000000..755dd713dc --- /dev/null +++ b/plugins/nemo-evaluator/src/nemo_evaluator/intake/mapping.py @@ -0,0 +1,169 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Boundary mapping: Evaluator vocabulary -> Intake/Experiments wire shapes. + +This is the single place where Evaluator domain objects (``AgentEvalTrial``, +``AgentEvalTaskScore``, ``MetricOutput``) become the request bodies Intake and +the Experiments API expect. The Intake write-adapter tickets (D3/D4/D5) obtain +their request shapes and field names *only* from here, so a later rename is a +one-file change. + +Design constraints (see AALGO-289): + +* **Pure.** Every function reads SDK types and returns request params. No HTTP, + no platform client, no imports from the Intake *service* (``nmp.intake.*``). +* **Typed at the boundary.** The returned values are the generated platform + SDK's ``TypedDict`` params (``AtifCreateParams`` / ``EvaluatorResultCreateParams``). + At runtime they are plain dicts the adapter splats into the client + (``client.intake.ingest.atif.create(**body)``); statically, ``ty`` checks our + field names, literals, and nested shapes against the real generated schema, so + an API change that regenerates the SDK surfaces here as a type error instead of + drifting silently. We depend on the client SDK (already a plugin dependency), + never on the Intake service package. +* The well-known evidence-key constants (``initial_state``/``trace``/``logs``/ + ``final_state``/``verifier_logs``) belong with the SDK evidence work (D1, + AALGO-281). Until D1 lands, this module references them as string literals so + it stays unblocked. +""" + +from __future__ import annotations + +from typing import Literal + +from nemo_evaluator_sdk.agent_eval.scores import AgentEvalTaskScore +from nemo_evaluator_sdk.agent_eval.trials import AgentEvalTrial +from nemo_platform.types.intake.evaluator_result_create_params import EvaluatorResultCreateParams +from nemo_platform.types.intake.evaluator_result_data_type import EvaluatorResultDataType +from nemo_platform.types.intake.experiment_context_param import ExperimentContextParam +from nemo_platform.types.intake.ingest.atif_agent_param import AtifAgentParam +from nemo_platform.types.intake.ingest.atif_create_params import AtifCreateParams +from nemo_platform.types.intake.ingest.atif_final_metrics_param import AtifFinalMetricsParam +from nemo_platform.types.intake.ingest.atif_step_agent_param import AtifStepAgentParam + +# --- Shared conventions ----------------------------------------------------- + +#: ATIF schema version the adapter emits. +ATIF_SCHEMA_VERSION: Literal["ATIF-v1.7"] = "ATIF-v1.7" + +#: Default ``agent.version`` when the run target carries none. Neither Model nor +#: Agent has a version field today, and ATIF requires one (design doc §3.9 #6). +DEFAULT_AGENT_VERSION = "unknown" + +# Evidence-descriptor keys. ATIF is carried as a ``format`` on ``kind="trace"``, +# *not* as a distinct ``kind``. These are string literals until D1 (AALGO-281) +# promotes them to shared descriptor-key constants on the SDK evidence types. +EVIDENCE_KIND_TRACE = "trace" +TRACE_FORMAT_ATIF = "atif" + + +def session_id_for(run_id: str, trial_id: str) -> str: + """Return the stable, adapter-minted session id for a trial. + + One session id per Trial keeps ATIF ingest idempotent and lets per-metric + scores be attached to the same trajectory afterward. This is the single + source of the convention; callers must not hand-roll it. + """ + return f"{run_id}:{trial_id}" + + +def run_task_to_experiment_context(trial: AgentEvalTrial, *, experiment_id: str) -> ExperimentContextParam: + """Build the lean ingest ``experiment_context`` for a trial. + + Only ``experiment_id`` and ``test_case_id`` live here. Dataset, group, and + free-form metadata belong on the Experiment entity (created separately via + the platform Experiments SDK), not on the per-ingest context. + """ + return {"experiment_id": experiment_id, "test_case_id": trial.task_id} + + +def trial_to_atif_ingest( + trial: AgentEvalTrial, + *, + run_id: str, + experiment_id: str, + agent_name: str, + agent_version: str = DEFAULT_AGENT_VERSION, + model_name: str | None = None, + final_metrics: AtifFinalMetricsParam | None = None, +) -> AtifCreateParams: + """Build the ATIF ingest params for a single Trial. + + Until ATIF normalization of trace evidence lands (D2, AALGO-282), this emits + a minimal single-step trajectory carrying the trial's final output text, so + the session/score path works end to end. Real ``steps[]`` reconstructed from + ``trial.evidence`` arrive with D2. + """ + output_text = trial.output.output_text if trial.output is not None else None + agent: AtifAgentParam = {"name": agent_name, "version": agent_version} + if model_name is not None: + agent["model_name"] = model_name + step: AtifStepAgentParam = {"source": "agent", "step_id": 1, "message": output_text or ""} + + body: AtifCreateParams = { + "schema_version": ATIF_SCHEMA_VERSION, + "session_id": session_id_for(run_id, trial.id), + "agent": agent, + "steps": [step], + "experiment_context": run_task_to_experiment_context(trial, experiment_id=experiment_id), + } + if final_metrics is not None: + body["final_metrics"] = final_metrics + return body + + +def score_to_evaluator_results( + score: AgentEvalTaskScore, + *, + session_id: str, + span_id: str, +) -> list[EvaluatorResultCreateParams]: + """Turn one ``AgentEvalTaskScore`` into one evaluator-result param per output. + + ``name`` is ``"{metric_type}.{output}"`` (matching the SDK summary's + aggregate naming). The output's value is coerced into the matching + ``data_type``, populating exactly one of ``value`` / ``string_value``. + ``session_id`` and ``span_id`` are supplied by the caller: the trajectory + span id is resolved at publish time (the adapter's concern), not derivable + from the pure score. + """ + comment = score.diagnostics[0].message if score.diagnostics else None + rows: list[EvaluatorResultCreateParams] = [] + for output in score.outputs: + data_type, value, string_value = _coerce_metric_value(output.value) + row: EvaluatorResultCreateParams = { + "session_id": session_id, + "span_id": span_id, + "name": f"{score.metric_type}.{output.name}", + "data_type": data_type, + } + if value is not None: + row["value"] = value + if string_value is not None: + row["string_value"] = string_value + if comment is not None: + row["comment"] = comment + rows.append(row) + return rows + + +def _coerce_metric_value(value: object) -> tuple[EvaluatorResultDataType, float | None, str | None]: + """Classify a metric output value into ``(data_type, value, string_value)``. + + Unwraps a Pydantic ``RootModel`` (``.root``) first, then: + + * ``bool`` -> ``BOOLEAN`` with value 1.0/0.0 (checked before ``int``, since + ``bool`` is a subclass of ``int``); + * ``int``/``float`` -> ``NUMERIC``; + * anything else (strings, labels) -> ``TEXT`` via ``str()``. + + CATEGORICAL is intentionally not emitted: a category and free text are + indistinguishable at the value level today (both arrive as ``str``/``Label``), + so everything string-valued maps to TEXT until a real signal exists. + """ + unwrapped = getattr(value, "root", value) + if isinstance(unwrapped, bool): + return "BOOLEAN", (1.0 if unwrapped else 0.0), None + if isinstance(unwrapped, (int, float)): + return "NUMERIC", float(unwrapped), None + return "TEXT", None, str(unwrapped) diff --git a/plugins/nemo-evaluator/tests/intake/test_import_hygiene.py b/plugins/nemo-evaluator/tests/intake/test_import_hygiene.py new file mode 100644 index 0000000000..4ca9e3a26e --- /dev/null +++ b/plugins/nemo-evaluator/tests/intake/test_import_hygiene.py @@ -0,0 +1,39 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Guardrail: the intake mapping module must not import the Intake service. + +The mapping is pure boundary code: it reads SDK types and returns plain dicts +shaped for Intake's requests, but it must not depend on the Intake service +(``nmp.intake.*``), an HTTP client, or the platform client. This keeps the +translation isolated so D3/D4/D5 can build the wire calls on top of it without +the mapping itself pulling in the service. +""" + +from __future__ import annotations + +import re +from pathlib import Path + +import nemo_evaluator.intake as intake + +INTAKE_ROOT = Path(next(iter(intake.__path__))).resolve() + +# Imports that would couple the pure mapping to the Intake service or transport. +_FORBIDDEN = re.compile( + r"^\s*(?:from|import)\s+(nmp\.intake|nmp_intake|httpx)", + re.MULTILINE, +) + + +def test_intake_mapping_has_no_service_imports() -> None: + offenders: list[str] = [] + for path in sorted(INTAKE_ROOT.rglob("*.py")): + text = path.read_text(encoding="utf-8") + for match in _FORBIDDEN.finditer(text): + line_no = text.count("\n", 0, match.start()) + 1 + offenders.append(f"{path.relative_to(INTAKE_ROOT)}:{line_no}: {match.group(0).strip()}") + + assert not offenders, "nemo_evaluator.intake must not import the Intake service / transport:\n" + "\n".join( + offenders + ) diff --git a/plugins/nemo-evaluator/tests/intake/test_mapping.py b/plugins/nemo-evaluator/tests/intake/test_mapping.py new file mode 100644 index 0000000000..fb2ed02202 --- /dev/null +++ b/plugins/nemo-evaluator/tests/intake/test_mapping.py @@ -0,0 +1,187 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for the Evaluator -> Intake boundary mapping module.""" + +from __future__ import annotations + +import pytest +from nemo_evaluator.intake.mapping import ( + ATIF_SCHEMA_VERSION, + DEFAULT_AGENT_VERSION, + run_task_to_experiment_context, + score_to_evaluator_results, + session_id_for, + trial_to_atif_ingest, +) +from nemo_evaluator_sdk.agent_eval.scores import ( + AgentEvalDiagnostic, + AgentEvalDiagnosticSeverity, + AgentEvalScoreStatus, + AgentEvalTaskScore, +) +from nemo_evaluator_sdk.agent_eval.trials import AgentEvalTrial, AgentEvalTrialStatus, AgentOutput +from nemo_evaluator_sdk.metrics.protocol import ( + BooleanValue, + ContinuousScore, + DiscreteScore, + Label, + MetricOutput, +) + + +def _trial(*, trial_id: str = "trial-1", task_id: str = "task-1", output_text: str | None = "hello") -> AgentEvalTrial: + output = AgentOutput(output_text=output_text) if output_text is not None else None + status = AgentEvalTrialStatus.COMPLETED if output is not None else AgentEvalTrialStatus.FAILED + return AgentEvalTrial(id=trial_id, task_id=task_id, status=status, output=output) + + +def _score(*, outputs: list[MetricOutput], diagnostics: list[AgentEvalDiagnostic] | None = None) -> AgentEvalTaskScore: + return AgentEvalTaskScore( + id="score-1", + run_id="run-1", + task_id="task-1", + trial_id="trial-1", + metric_type="accuracy", + status=AgentEvalScoreStatus.COMPLETED, + outputs=outputs, + diagnostics=diagnostics or [], + ) + + +# --- session_id_for --------------------------------------------------------- + + +def test_session_id_is_stable_per_trial() -> None: + assert session_id_for("run-1", "trial-1") == "run-1:trial-1" + + +# --- run_task_to_experiment_context ----------------------------------------- + + +def test_experiment_context_is_lean() -> None: + context = run_task_to_experiment_context(_trial(task_id="task-42"), experiment_id="bench-x-variant") + assert context == {"experiment_id": "bench-x-variant", "test_case_id": "task-42"} + + +# --- trial_to_atif_ingest --------------------------------------------------- + + +def test_trial_to_atif_ingest_shape() -> None: + body = trial_to_atif_ingest( + _trial(trial_id="t-1", task_id="task-1", output_text="final answer"), + run_id="run-1", + experiment_id="exp-1", + agent_name="my-agent", + model_name="gpt-4o", + ) + assert body["schema_version"] == ATIF_SCHEMA_VERSION + assert body["session_id"] == "run-1:t-1" + assert body["agent"] == {"name": "my-agent", "version": DEFAULT_AGENT_VERSION, "model_name": "gpt-4o"} + assert body["steps"] == [{"source": "agent", "step_id": 1, "message": "final answer"}] + assert body["experiment_context"] == {"experiment_id": "exp-1", "test_case_id": "task-1"} + assert "final_metrics" not in body + + +def test_trial_to_atif_ingest_defaults_version_and_omits_model_name() -> None: + body = trial_to_atif_ingest(_trial(), run_id="run-1", experiment_id="exp-1", agent_name="a") + assert body["agent"] == {"name": "a", "version": "unknown"} + assert "model_name" not in body["agent"] + + +def test_trial_to_atif_ingest_handles_missing_output() -> None: + body = trial_to_atif_ingest(_trial(output_text=None), run_id="run-1", experiment_id="exp-1", agent_name="a") + assert body["steps"] == [{"source": "agent", "step_id": 1, "message": ""}] + + +def test_trial_to_atif_ingest_includes_final_metrics_when_given() -> None: + body = trial_to_atif_ingest( + _trial(), + run_id="run-1", + experiment_id="exp-1", + agent_name="a", + final_metrics={"total_prompt_tokens": 10}, + ) + assert body["final_metrics"] == {"total_prompt_tokens": 10} + + +# --- score_to_evaluator_results: data_type coercions ------------------------ + + +def test_score_row_naming_and_targeting() -> None: + rows = score_to_evaluator_results( + _score(outputs=[MetricOutput(name="score", value=0.5)]), + session_id="run-1:trial-1", + span_id="span-abc", + ) + assert len(rows) == 1 + assert rows[0]["name"] == "accuracy.score" + assert rows[0]["session_id"] == "run-1:trial-1" + assert rows[0]["span_id"] == "span-abc" + + +def test_one_row_per_output() -> None: + rows = score_to_evaluator_results( + _score(outputs=[MetricOutput(name="a", value=1.0), MetricOutput(name="b", value=2.0)]), + session_id="s", + span_id="span", + ) + assert [row["name"] for row in rows] == ["accuracy.a", "accuracy.b"] + + +@pytest.mark.parametrize("value", [True, BooleanValue(True)]) +def test_boolean_coercion_true(value: object) -> None: + row = score_to_evaluator_results( + _score(outputs=[MetricOutput(name="passed", value=value)]), session_id="s", span_id="sp" + )[0] + assert row["data_type"] == "BOOLEAN" + assert row["value"] == 1.0 + assert "string_value" not in row + + +@pytest.mark.parametrize("value", [False, BooleanValue(False)]) +def test_boolean_coercion_false(value: object) -> None: + row = score_to_evaluator_results( + _score(outputs=[MetricOutput(name="passed", value=value)]), session_id="s", span_id="sp" + )[0] + assert row["data_type"] == "BOOLEAN" + assert row["value"] == 0.0 + + +@pytest.mark.parametrize("value", [0.87, 3, ContinuousScore(0.87), DiscreteScore(3)]) +def test_numeric_coercion(value: object) -> None: + row = score_to_evaluator_results( + _score(outputs=[MetricOutput(name="m", value=value)]), session_id="s", span_id="sp" + )[0] + assert row["data_type"] == "NUMERIC" + assert isinstance(row["value"], float) + assert "string_value" not in row + + +@pytest.mark.parametrize("value", ["PASS", Label("PASS")]) +def test_text_coercion(value: object) -> None: + row = score_to_evaluator_results( + _score(outputs=[MetricOutput(name="verdict", value=value)]), session_id="s", span_id="sp" + )[0] + assert row["data_type"] == "TEXT" + assert row["string_value"] == "PASS" + assert "value" not in row + + +def test_comment_taken_from_first_diagnostic() -> None: + score = _score( + outputs=[MetricOutput(name="score", value=1.0)], + diagnostics=[ + AgentEvalDiagnostic(severity=AgentEvalDiagnosticSeverity.WARNING, message="first"), + AgentEvalDiagnostic(severity=AgentEvalDiagnosticSeverity.INFO, message="second"), + ], + ) + row = score_to_evaluator_results(score, session_id="s", span_id="sp")[0] + assert row["comment"] == "first" + + +def test_comment_absent_without_diagnostics() -> None: + row = score_to_evaluator_results( + _score(outputs=[MetricOutput(name="score", value=1.0)]), session_id="s", span_id="sp" + )[0] + assert "comment" not in row