Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions plugins/nemo-evaluator/src/nemo_evaluator/intake/mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Boundary mapping: Evaluator vocabulary -> Intake/Experiments wire shapes.

This is the single place where Evaluator domain objects (``AgentEvalTrial``,
``AgentEvalTaskScore``, ``MetricOutput``) become the request bodies Intake and
the Experiments API expect. The Intake write-adapter tickets (D3/D4/D5) obtain
their request shapes and field names *only* from here, so a later rename is a
one-file change.

Design constraints (see AALGO-289):

* **Pure.** Every function reads SDK types and returns request params. No HTTP,
no platform client, no imports from the Intake *service* (``nmp.intake.*``).
* **Typed at the boundary.** The returned values are the generated platform
SDK's ``TypedDict`` params (``AtifCreateParams`` / ``EvaluatorResultCreateParams``).
At runtime they are plain dicts the adapter splats into the client
(``client.intake.ingest.atif.create(**body)``); statically, ``ty`` checks our
field names, literals, and nested shapes against the real generated schema, so
an API change that regenerates the SDK surfaces here as a type error instead of
drifting silently. We depend on the client SDK (already a plugin dependency),
never on the Intake service package.
* The well-known evidence-key constants (``initial_state``/``trace``/``logs``/
``final_state``/``verifier_logs``) belong with the SDK evidence work (D1,
AALGO-281). Until D1 lands, this module references them as string literals so
it stays unblocked.
"""

from __future__ import annotations

from typing import Literal

from nemo_evaluator_sdk.agent_eval.scores import AgentEvalTaskScore
from nemo_evaluator_sdk.agent_eval.trials import AgentEvalTrial
from nemo_platform.types.intake.evaluator_result_create_params import EvaluatorResultCreateParams
from nemo_platform.types.intake.evaluator_result_data_type import EvaluatorResultDataType
from nemo_platform.types.intake.experiment_context_param import ExperimentContextParam
from nemo_platform.types.intake.ingest.atif_agent_param import AtifAgentParam
from nemo_platform.types.intake.ingest.atif_create_params import AtifCreateParams
from nemo_platform.types.intake.ingest.atif_final_metrics_param import AtifFinalMetricsParam
from nemo_platform.types.intake.ingest.atif_step_agent_param import AtifStepAgentParam

# --- Shared conventions -----------------------------------------------------

#: ATIF schema version the adapter emits.
ATIF_SCHEMA_VERSION: Literal["ATIF-v1.7"] = "ATIF-v1.7"

#: Default ``agent.version`` when the run target carries none. Neither Model nor
#: Agent has a version field today, and ATIF requires one (design doc §3.9 #6).
DEFAULT_AGENT_VERSION = "unknown"

# Evidence-descriptor keys. ATIF is carried as a ``format`` on ``kind="trace"``,
# *not* as a distinct ``kind``. These are string literals until D1 (AALGO-281)
# promotes them to shared descriptor-key constants on the SDK evidence types.
EVIDENCE_KIND_TRACE = "trace"
TRACE_FORMAT_ATIF = "atif"


def session_id_for(run_id: str, trial_id: str) -> str:
"""Return the stable, adapter-minted session id for a trial.

One session id per Trial keeps ATIF ingest idempotent and lets per-metric
scores be attached to the same trajectory afterward. This is the single
source of the convention; callers must not hand-roll it.
"""
return f"{run_id}:{trial_id}"


def run_task_to_experiment_context(trial: AgentEvalTrial, *, experiment_id: str) -> ExperimentContextParam:
"""Build the lean ingest ``experiment_context`` for a trial.

Only ``experiment_id`` and ``test_case_id`` live here. Dataset, group, and
free-form metadata belong on the Experiment entity (created separately via
the platform Experiments SDK), not on the per-ingest context.
"""
return {"experiment_id": experiment_id, "test_case_id": trial.task_id}


def trial_to_atif_ingest(
trial: AgentEvalTrial,
*,
run_id: str,
experiment_id: str,
agent_name: str,
agent_version: str = DEFAULT_AGENT_VERSION,
model_name: str | None = None,
final_metrics: AtifFinalMetricsParam | None = None,
) -> AtifCreateParams:
"""Build the ATIF ingest params for a single Trial.

Until ATIF normalization of trace evidence lands (D2, AALGO-282), this emits
a minimal single-step trajectory carrying the trial's final output text, so
the session/score path works end to end. Real ``steps[]`` reconstructed from
``trial.evidence`` arrive with D2.
"""
output_text = trial.output.output_text if trial.output is not None else None
agent: AtifAgentParam = {"name": agent_name, "version": agent_version}
if model_name is not None:
agent["model_name"] = model_name
step: AtifStepAgentParam = {"source": "agent", "step_id": 1, "message": output_text or ""}

body: AtifCreateParams = {
"schema_version": ATIF_SCHEMA_VERSION,
"session_id": session_id_for(run_id, trial.id),
"agent": agent,
"steps": [step],
"experiment_context": run_task_to_experiment_context(trial, experiment_id=experiment_id),
}
if final_metrics is not None:
body["final_metrics"] = final_metrics
return body


def score_to_evaluator_results(
score: AgentEvalTaskScore,
*,
session_id: str,
span_id: str,
) -> list[EvaluatorResultCreateParams]:
"""Turn one ``AgentEvalTaskScore`` into one evaluator-result param per output.

``name`` is ``"{metric_type}.{output}"`` (matching the SDK summary's
aggregate naming). The output's value is coerced into the matching
``data_type``, populating exactly one of ``value`` / ``string_value``.
``session_id`` and ``span_id`` are supplied by the caller: the trajectory
span id is resolved at publish time (the adapter's concern), not derivable
from the pure score.
"""
comment = score.diagnostics[0].message if score.diagnostics else None
rows: list[EvaluatorResultCreateParams] = []
for output in score.outputs:
data_type, value, string_value = _coerce_metric_value(output.value)
row: EvaluatorResultCreateParams = {
"session_id": session_id,
"span_id": span_id,
"name": f"{score.metric_type}.{output.name}",
"data_type": data_type,
}
if value is not None:
row["value"] = value
if string_value is not None:
row["string_value"] = string_value
if comment is not None:
row["comment"] = comment
rows.append(row)
return rows


def _coerce_metric_value(value: object) -> tuple[EvaluatorResultDataType, float | None, str | None]:
"""Classify a metric output value into ``(data_type, value, string_value)``.

Unwraps a Pydantic ``RootModel`` (``.root``) first, then:

* ``bool`` -> ``BOOLEAN`` with value 1.0/0.0 (checked before ``int``, since
``bool`` is a subclass of ``int``);
* ``int``/``float`` -> ``NUMERIC``;
* anything else (strings, labels) -> ``TEXT`` via ``str()``.

CATEGORICAL is intentionally not emitted: a category and free text are
indistinguishable at the value level today (both arrive as ``str``/``Label``),
so everything string-valued maps to TEXT until a real signal exists.
"""
unwrapped = getattr(value, "root", value)
if isinstance(unwrapped, bool):
return "BOOLEAN", (1.0 if unwrapped else 0.0), None
if isinstance(unwrapped, (int, float)):
return "NUMERIC", float(unwrapped), None
return "TEXT", None, str(unwrapped)
39 changes: 39 additions & 0 deletions plugins/nemo-evaluator/tests/intake/test_import_hygiene.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Guardrail: the intake mapping module must not import the Intake service.

The mapping is pure boundary code: it reads SDK types and returns plain dicts
shaped for Intake's requests, but it must not depend on the Intake service
(``nmp.intake.*``), an HTTP client, or the platform client. This keeps the
translation isolated so D3/D4/D5 can build the wire calls on top of it without
the mapping itself pulling in the service.
"""

from __future__ import annotations

import re
from pathlib import Path

import nemo_evaluator.intake as intake

INTAKE_ROOT = Path(next(iter(intake.__path__))).resolve()

# Imports that would couple the pure mapping to the Intake service or transport.
_FORBIDDEN = re.compile(
r"^\s*(?:from|import)\s+(nmp\.intake|nmp_intake|httpx)",
re.MULTILINE,
)


def test_intake_mapping_has_no_service_imports() -> None:
offenders: list[str] = []
for path in sorted(INTAKE_ROOT.rglob("*.py")):
text = path.read_text(encoding="utf-8")
for match in _FORBIDDEN.finditer(text):
line_no = text.count("\n", 0, match.start()) + 1
offenders.append(f"{path.relative_to(INTAKE_ROOT)}:{line_no}: {match.group(0).strip()}")

assert not offenders, "nemo_evaluator.intake must not import the Intake service / transport:\n" + "\n".join(
offenders
)
187 changes: 187 additions & 0 deletions plugins/nemo-evaluator/tests/intake/test_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Unit tests for the Evaluator -> Intake boundary mapping module."""

from __future__ import annotations

import pytest
from nemo_evaluator.intake.mapping import (
ATIF_SCHEMA_VERSION,
DEFAULT_AGENT_VERSION,
run_task_to_experiment_context,
score_to_evaluator_results,
session_id_for,
trial_to_atif_ingest,
)
from nemo_evaluator_sdk.agent_eval.scores import (
AgentEvalDiagnostic,
AgentEvalDiagnosticSeverity,
AgentEvalScoreStatus,
AgentEvalTaskScore,
)
from nemo_evaluator_sdk.agent_eval.trials import AgentEvalTrial, AgentEvalTrialStatus, AgentOutput
from nemo_evaluator_sdk.metrics.protocol import (
BooleanValue,
ContinuousScore,
DiscreteScore,
Label,
MetricOutput,
)


def _trial(*, trial_id: str = "trial-1", task_id: str = "task-1", output_text: str | None = "hello") -> AgentEvalTrial:
output = AgentOutput(output_text=output_text) if output_text is not None else None
status = AgentEvalTrialStatus.COMPLETED if output is not None else AgentEvalTrialStatus.FAILED
return AgentEvalTrial(id=trial_id, task_id=task_id, status=status, output=output)


def _score(*, outputs: list[MetricOutput], diagnostics: list[AgentEvalDiagnostic] | None = None) -> AgentEvalTaskScore:
return AgentEvalTaskScore(
id="score-1",
run_id="run-1",
task_id="task-1",
trial_id="trial-1",
metric_type="accuracy",
status=AgentEvalScoreStatus.COMPLETED,
outputs=outputs,
diagnostics=diagnostics or [],
)


# --- session_id_for ---------------------------------------------------------


def test_session_id_is_stable_per_trial() -> None:
assert session_id_for("run-1", "trial-1") == "run-1:trial-1"


# --- run_task_to_experiment_context -----------------------------------------


def test_experiment_context_is_lean() -> None:
context = run_task_to_experiment_context(_trial(task_id="task-42"), experiment_id="bench-x-variant")
assert context == {"experiment_id": "bench-x-variant", "test_case_id": "task-42"}


# --- trial_to_atif_ingest ---------------------------------------------------


def test_trial_to_atif_ingest_shape() -> None:
body = trial_to_atif_ingest(
_trial(trial_id="t-1", task_id="task-1", output_text="final answer"),
run_id="run-1",
experiment_id="exp-1",
agent_name="my-agent",
model_name="gpt-4o",
)
assert body["schema_version"] == ATIF_SCHEMA_VERSION
assert body["session_id"] == "run-1:t-1"
assert body["agent"] == {"name": "my-agent", "version": DEFAULT_AGENT_VERSION, "model_name": "gpt-4o"}
assert body["steps"] == [{"source": "agent", "step_id": 1, "message": "final answer"}]
assert body["experiment_context"] == {"experiment_id": "exp-1", "test_case_id": "task-1"}
assert "final_metrics" not in body


def test_trial_to_atif_ingest_defaults_version_and_omits_model_name() -> None:
body = trial_to_atif_ingest(_trial(), run_id="run-1", experiment_id="exp-1", agent_name="a")
assert body["agent"] == {"name": "a", "version": "unknown"}
assert "model_name" not in body["agent"]


def test_trial_to_atif_ingest_handles_missing_output() -> None:
body = trial_to_atif_ingest(_trial(output_text=None), run_id="run-1", experiment_id="exp-1", agent_name="a")
assert body["steps"] == [{"source": "agent", "step_id": 1, "message": ""}]


def test_trial_to_atif_ingest_includes_final_metrics_when_given() -> None:
body = trial_to_atif_ingest(
_trial(),
run_id="run-1",
experiment_id="exp-1",
agent_name="a",
final_metrics={"total_prompt_tokens": 10},
)
assert body["final_metrics"] == {"total_prompt_tokens": 10}


# --- score_to_evaluator_results: data_type coercions ------------------------


def test_score_row_naming_and_targeting() -> None:
rows = score_to_evaluator_results(
_score(outputs=[MetricOutput(name="score", value=0.5)]),
session_id="run-1:trial-1",
span_id="span-abc",
)
assert len(rows) == 1
assert rows[0]["name"] == "accuracy.score"
assert rows[0]["session_id"] == "run-1:trial-1"
assert rows[0]["span_id"] == "span-abc"


def test_one_row_per_output() -> None:
rows = score_to_evaluator_results(
_score(outputs=[MetricOutput(name="a", value=1.0), MetricOutput(name="b", value=2.0)]),
session_id="s",
span_id="span",
)
assert [row["name"] for row in rows] == ["accuracy.a", "accuracy.b"]


@pytest.mark.parametrize("value", [True, BooleanValue(True)])
def test_boolean_coercion_true(value: object) -> None:
row = score_to_evaluator_results(
_score(outputs=[MetricOutput(name="passed", value=value)]), session_id="s", span_id="sp"
)[0]
assert row["data_type"] == "BOOLEAN"
assert row["value"] == 1.0
assert "string_value" not in row


@pytest.mark.parametrize("value", [False, BooleanValue(False)])
def test_boolean_coercion_false(value: object) -> None:
row = score_to_evaluator_results(
_score(outputs=[MetricOutput(name="passed", value=value)]), session_id="s", span_id="sp"
)[0]
assert row["data_type"] == "BOOLEAN"
assert row["value"] == 0.0


@pytest.mark.parametrize("value", [0.87, 3, ContinuousScore(0.87), DiscreteScore(3)])
def test_numeric_coercion(value: object) -> None:
row = score_to_evaluator_results(
_score(outputs=[MetricOutput(name="m", value=value)]), session_id="s", span_id="sp"
)[0]
assert row["data_type"] == "NUMERIC"
assert isinstance(row["value"], float)
assert "string_value" not in row


@pytest.mark.parametrize("value", ["PASS", Label("PASS")])
def test_text_coercion(value: object) -> None:
row = score_to_evaluator_results(
_score(outputs=[MetricOutput(name="verdict", value=value)]), session_id="s", span_id="sp"
)[0]
assert row["data_type"] == "TEXT"
assert row["string_value"] == "PASS"
assert "value" not in row


def test_comment_taken_from_first_diagnostic() -> None:
score = _score(
outputs=[MetricOutput(name="score", value=1.0)],
diagnostics=[
AgentEvalDiagnostic(severity=AgentEvalDiagnosticSeverity.WARNING, message="first"),
AgentEvalDiagnostic(severity=AgentEvalDiagnosticSeverity.INFO, message="second"),
],
)
row = score_to_evaluator_results(score, session_id="s", span_id="sp")[0]
assert row["comment"] == "first"


def test_comment_absent_without_diagnostics() -> None:
row = score_to_evaluator_results(
_score(outputs=[MetricOutput(name="score", value=1.0)]), session_id="s", span_id="sp"
)[0]
assert "comment" not in row