diff --git a/plugins/nemo-evaluator/pyproject.toml b/plugins/nemo-evaluator/pyproject.toml index 6979af8627..25920643c1 100644 --- a/plugins/nemo-evaluator/pyproject.toml +++ b/plugins/nemo-evaluator/pyproject.toml @@ -50,6 +50,9 @@ dev = ["pytest>=8.3.4", "pytest-asyncio>=0.25.3", "ruff>=0.11.8"] asyncio_mode = "auto" pythonpath = ["src"] testpaths = ["tests"] +markers = [ + "integration: live service integration tests (opt-in; e.g. RUN_INTAKE_INTEGRATION=1)", +] # Opt this plugin into OpenAPI spec generation [tool.nemo.openapi] diff --git a/plugins/nemo-evaluator/src/nemo_evaluator/intake/mapping.py b/plugins/nemo-evaluator/src/nemo_evaluator/intake/mapping.py index 755dd713dc..e87e7cc1b4 100644 --- a/plugins/nemo-evaluator/src/nemo_evaluator/intake/mapping.py +++ b/plugins/nemo-evaluator/src/nemo_evaluator/intake/mapping.py @@ -29,9 +29,11 @@ from __future__ import annotations +import math +from dataclasses import dataclass from typing import Literal -from nemo_evaluator_sdk.agent_eval.scores import AgentEvalTaskScore +from nemo_evaluator_sdk.agent_eval.scores import AgentEvalScoreStatus, AgentEvalTaskScore from nemo_evaluator_sdk.agent_eval.trials import AgentEvalTrial from nemo_platform.types.intake.evaluator_result_create_params import EvaluatorResultCreateParams from nemo_platform.types.intake.evaluator_result_data_type import EvaluatorResultDataType @@ -112,29 +114,57 @@ def trial_to_atif_ingest( return body +@dataclass(frozen=True) +class SkippedOutput: + """A metric output omitted from publish, with the reason it was dropped (see cross-team ask X6).""" + + name: str + reason: str + + def score_to_evaluator_results( score: AgentEvalTaskScore, *, session_id: str, span_id: str, -) -> list[EvaluatorResultCreateParams]: - """Turn one ``AgentEvalTaskScore`` into one evaluator-result param per output. - - ``name`` is ``"{metric_type}.{output}"`` (matching the SDK summary's - aggregate naming). The output's value is coerced into the matching - ``data_type``, populating exactly one of ``value`` / ``string_value``. - ``session_id`` and ``span_id`` are supplied by the caller: the trajectory - span id is resolved at publish time (the adapter's concern), not derivable - from the pure score. +) -> tuple[list[EvaluatorResultCreateParams], list[SkippedOutput]]: + """Map one ``AgentEvalTaskScore`` to ``(rows, skipped)`` for Intake. + + ``rows`` is one evaluator-result param per publishable output: ``name`` is + ``"{metric_type}.{output}"`` (matching the SDK summary's aggregate naming) and the + value is coerced into the matching ``data_type``, populating exactly one of ``value`` + / ``string_value``. ``session_id``/``span_id`` are supplied by the caller — the + trajectory span id is resolved at publish time, not derivable from the pure score. + + ``skipped`` carries the outputs that can't be published, with the reason — so the + publishable/omitted split has a single source of truth and callers can report the + omissions instead of silently losing them. A FAILED score yields no rows (every output + skipped); a completed score's non-finite (NaN/inf) outputs are dropped (NaN isn't + JSON-representable — the platform client's encoder rejects it — so it can't be sent). + + TODO(X6): once Intake can represent a failed metric result, publish these as failures + instead of dropping them. """ + if score.status == AgentEvalScoreStatus.FAILED: + skipped = [ + SkippedOutput(name=f"{score.metric_type}.{output.name}", reason="scoring failed") + for output in score.outputs + ] + return [], skipped + comment = score.diagnostics[0].message if score.diagnostics else None rows: list[EvaluatorResultCreateParams] = [] + skipped: list[SkippedOutput] = [] for output in score.outputs: + name = f"{score.metric_type}.{output.name}" data_type, value, string_value = _coerce_metric_value(output.value) + if value is not None and not math.isfinite(value): + skipped.append(SkippedOutput(name=name, reason="non-finite value")) + continue row: EvaluatorResultCreateParams = { "session_id": session_id, "span_id": span_id, - "name": f"{score.metric_type}.{output.name}", + "name": name, "data_type": data_type, } if value is not None: @@ -144,7 +174,7 @@ def score_to_evaluator_results( if comment is not None: row["comment"] = comment rows.append(row) - return rows + return rows, skipped def _coerce_metric_value(value: object) -> tuple[EvaluatorResultDataType, float | None, str | None]: diff --git a/plugins/nemo-evaluator/src/nemo_evaluator/intake/publish.py b/plugins/nemo-evaluator/src/nemo_evaluator/intake/publish.py new file mode 100644 index 0000000000..11a2acf05b --- /dev/null +++ b/plugins/nemo-evaluator/src/nemo_evaluator/intake/publish.py @@ -0,0 +1,213 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Publish a completed agent evaluation to Intake. + +``publish_to_intake`` is the explicit, post-run consumer of ``AgentEvalResult`` +(see AALGO-290). It is **not** a side effect of ``AgentEvaluator.run()`` and +there is no feature flag — optionality is structural: you make the call or you +don't, and the platform client is a required argument. + +It references an **existing** Experiment (created by the caller via the platform +Experiments SDK) and never creates one. Per Trial it: POSTs the ATIF trajectory, +resolves the trajectory's root span, then POSTs one evaluator-result per metric +output. All request shapes come from :mod:`nemo_evaluator.intake.mapping`; the +HTTP calls go through the generated platform SDK's ``intake`` resources. +""" + +from __future__ import annotations + +import asyncio +from collections import defaultdict + +from nemo_evaluator.intake import mapping +from nemo_evaluator.sdk import http_utils +from nemo_evaluator_sdk.agent_eval.results import AgentEvalResult +from nemo_evaluator_sdk.agent_eval.scores import AgentEvalTaskScore +from nemo_evaluator_sdk.agent_eval.trials import AgentEvalTrial +from nemo_platform import AsyncNeMoPlatform +from nemo_platform.types.intake.trace_filter_param import TraceFilterParam +from pydantic import BaseModel, ConfigDict, Field + +#: Default ceiling on concurrent per-trial publishes. +DEFAULT_MAX_CONCURRENCY = 8 + + +class PublishError(RuntimeError): + """Raised when one or more trials fail to publish (or a span never resolves). + + Carries the partial :class:`PublishReport` of trials that *did* publish, so the + caller can see what landed before re-running. + """ + + def __init__(self, message: str, *, report: PublishReport | None = None) -> None: + super().__init__(message) + self.report = report + + +class PublishedTrial(BaseModel): + """Record of one Trial written to Intake.""" + + model_config = ConfigDict(extra="forbid") + + trial_id: str = Field(description="Identifier of the published trial.") + session_id: str = Field(description="Intake session id minted for the trajectory.") + span_id: str = Field(description="Resolved root AGENT span id the scores were attached to.") + evaluator_result_count: int = Field(description="Number of evaluator-result rows written for this trial.") + + +class SkippedScore(BaseModel): + """A score output omitted from publish because Intake can't represent it yet (cross-team ask X6).""" + + model_config = ConfigDict(extra="forbid") + + trial_id: str = Field(description="Trial whose score output was omitted.") + name: str = Field(description='"{metric_type}.{output}" of the omitted output.') + reason: str = Field(description="Why it was omitted (e.g. 'scoring failed', 'non-finite value').") + + +class PublishReport(BaseModel): + """Summary of a ``publish_to_intake`` run.""" + + model_config = ConfigDict(extra="forbid") + + experiment_id: str = Field(description="Experiment the results were published under.") + workspace: str = Field(description="Workspace the writes targeted.") + run_id: str = Field(description="Source AgentEvalResult run id.") + published_trials: list[PublishedTrial] = Field( + default_factory=list, description="Per-trial records of what was written." + ) + skipped: list[SkippedScore] = Field( + default_factory=list, + description="Score outputs omitted because Intake can't represent failed/non-finite scores (cross-team ask X6).", + ) + + @property + def trial_count(self) -> int: + """Number of trials published.""" + return len(self.published_trials) + + @property + def evaluator_result_count(self) -> int: + """Total evaluator-result rows written across all trials.""" + return sum(trial.evaluator_result_count for trial in self.published_trials) + + +async def publish_to_intake( + result: AgentEvalResult, + *, + platform: AsyncNeMoPlatform, + experiment_id: str, + workspace: str | None = None, + agent_name: str = "agent", + agent_version: str = mapping.DEFAULT_AGENT_VERSION, + model_name: str | None = None, + max_concurrency: int = DEFAULT_MAX_CONCURRENCY, +) -> PublishReport: + """Publish a completed ``AgentEvalResult`` to Intake under an existing Experiment. + + For each trial: POST the ATIF trajectory, resolve its root span, then POST one + evaluator-result per metric output. Trials are published concurrently up to + ``max_concurrency``. + + Publishing is **not atomic** and Intake has no rollback, so a per-trial failure + must not abort the others: every trial that can land does, and the failures are + collected and raised together as a :class:`PublishError` (carrying the partial + report). The evaluation's local bundle is the system of record and is never + touched, so the caller can re-run ``publish_to_intake`` once the issue is fixed + to publish the remaining trials. (Re-publish is not yet idempotent — see ask X1.) + + ``experiment_id`` must reference an Experiment that already exists — ATIF ingest + rejects unknown experiments with HTTP 400. Creating the Experiment/group is a + separate, caller-side step via the platform Experiments SDK. + + Agent identity (``agent_name``/``agent_version``/``model_name``) is taken as + arguments because it lives on the run *target*, which ``AgentEvalResult`` does + not carry (design §3.9 #6). + """ + resolved_workspace = http_utils.resolve_workspace(platform, workspace, strict=True) + + scores_by_trial: dict[str, list[AgentEvalTaskScore]] = defaultdict(list) + for score in result.scores: + scores_by_trial[score.trial_id].append(score) + + semaphore = asyncio.Semaphore(max_concurrency) + skipped: list[SkippedScore] = [] + + async def _publish_trial(trial: AgentEvalTrial) -> PublishedTrial: + async with semaphore: + body = mapping.trial_to_atif_ingest( + trial, + run_id=result.run_id, + experiment_id=experiment_id, + agent_name=agent_name, + agent_version=agent_version, + model_name=model_name, + ) + body["workspace"] = resolved_workspace + await platform.intake.ingest.atif.create(**body) + + session_id = mapping.session_id_for(result.run_id, trial.id) + span_id = await _resolve_root_span_id(platform, workspace=resolved_workspace, session_id=session_id) + + written = 0 + for score in scores_by_trial.get(trial.id, []): + rows, omitted = mapping.score_to_evaluator_results(score, session_id=session_id, span_id=span_id) + for row in rows: + row["workspace"] = resolved_workspace + await platform.intake.evaluator_results.create(**row) + written += 1 + skipped.extend(SkippedScore(trial_id=trial.id, name=item.name, reason=item.reason) for item in omitted) + + return PublishedTrial( + trial_id=trial.id, + session_id=session_id, + span_id=span_id, + evaluator_result_count=written, + ) + + outcomes = await asyncio.gather(*(_publish_trial(trial) for trial in result.trials), return_exceptions=True) + + published: list[PublishedTrial] = [] + failures: list[tuple[str, BaseException]] = [] + for trial, outcome in zip(result.trials, outcomes, strict=True): + if isinstance(outcome, PublishedTrial): + published.append(outcome) + else: + failures.append((trial.id, outcome)) + + report = PublishReport( + experiment_id=experiment_id, + workspace=resolved_workspace, + run_id=result.run_id, + published_trials=published, + skipped=skipped, + ) + if failures: + raise PublishError(_publish_failure_message(result, report, failures), report=report) + return report + + +def _publish_failure_message( + result: AgentEvalResult, + report: PublishReport, + failures: list[tuple[str, BaseException]], +) -> str: + """Build an actionable error: what failed, where the results are cached, how to recover.""" + location = f"cached locally at {result.output_dir}" if result.output_dir is not None else "in the local run bundle" + detail = "\n ".join(f"{trial_id}: {type(error).__name__}: {error}" for trial_id, error in failures) + return ( + f"publish_to_intake: {len(failures)} of {len(result.trials)} trial(s) failed to publish " + f"({report.trial_count} succeeded). The evaluation results are {location}; re-run " + f"publish_to_intake(result, ...) once the issue is resolved to publish the rest.\n" + f"Failed trials:\n {detail}" + ) + + +async def _resolve_root_span_id(platform: AsyncNeMoPlatform, *, workspace: str, session_id: str) -> str: + """Return the root AGENT span id for a freshly-ingested trajectory (design §3.5, option 1).""" + trace_filter: TraceFilterParam = {"session_id": session_id} + async for trace in platform.intake.traces.list(workspace=workspace, filter=trace_filter): + if trace.root_span_id: + return trace.root_span_id + raise PublishError(f"No root span resolved for session {session_id!r} after ATIF ingest") diff --git a/plugins/nemo-evaluator/tests/intake/test_mapping.py b/plugins/nemo-evaluator/tests/intake/test_mapping.py index fb2ed02202..85586f8e3d 100644 --- a/plugins/nemo-evaluator/tests/intake/test_mapping.py +++ b/plugins/nemo-evaluator/tests/intake/test_mapping.py @@ -5,6 +5,8 @@ from __future__ import annotations +import math + import pytest from nemo_evaluator.intake.mapping import ( ATIF_SCHEMA_VERSION, @@ -28,6 +30,7 @@ Label, MetricOutput, ) +from nemo_platform.types.intake.evaluator_result_create_params import EvaluatorResultCreateParams def _trial(*, trial_id: str = "trial-1", task_id: str = "task-1", output_text: str | None = "hello") -> AgentEvalTrial: @@ -36,19 +39,32 @@ def _trial(*, trial_id: str = "trial-1", task_id: str = "task-1", output_text: s return AgentEvalTrial(id=trial_id, task_id=task_id, status=status, output=output) -def _score(*, outputs: list[MetricOutput], diagnostics: list[AgentEvalDiagnostic] | None = None) -> AgentEvalTaskScore: +def _score( + *, + outputs: list[MetricOutput], + diagnostics: list[AgentEvalDiagnostic] | None = None, + status: AgentEvalScoreStatus = AgentEvalScoreStatus.COMPLETED, +) -> AgentEvalTaskScore: return AgentEvalTaskScore( id="score-1", run_id="run-1", task_id="task-1", trial_id="trial-1", metric_type="accuracy", - status=AgentEvalScoreStatus.COMPLETED, + status=status, outputs=outputs, diagnostics=diagnostics or [], ) +def _rows( + score: AgentEvalTaskScore, *, session_id: str = "s", span_id: str = "sp" +) -> list[EvaluatorResultCreateParams]: + """The publishable rows from a score, dropping the skipped list (for row-shape assertions).""" + rows, _ = score_to_evaluator_results(score, session_id=session_id, span_id=span_id) + return rows + + # --- session_id_for --------------------------------------------------------- @@ -109,7 +125,7 @@ def test_trial_to_atif_ingest_includes_final_metrics_when_given() -> None: def test_score_row_naming_and_targeting() -> None: - rows = score_to_evaluator_results( + rows = _rows( _score(outputs=[MetricOutput(name="score", value=0.5)]), session_id="run-1:trial-1", span_id="span-abc", @@ -121,9 +137,8 @@ def test_score_row_naming_and_targeting() -> None: def test_one_row_per_output() -> None: - rows = score_to_evaluator_results( + rows = _rows( _score(outputs=[MetricOutput(name="a", value=1.0), MetricOutput(name="b", value=2.0)]), - session_id="s", span_id="span", ) assert [row["name"] for row in rows] == ["accuracy.a", "accuracy.b"] @@ -131,9 +146,7 @@ def test_one_row_per_output() -> None: @pytest.mark.parametrize("value", [True, BooleanValue(True)]) def test_boolean_coercion_true(value: object) -> None: - row = score_to_evaluator_results( - _score(outputs=[MetricOutput(name="passed", value=value)]), session_id="s", span_id="sp" - )[0] + row = _rows(_score(outputs=[MetricOutput(name="passed", value=value)]))[0] assert row["data_type"] == "BOOLEAN" assert row["value"] == 1.0 assert "string_value" not in row @@ -141,18 +154,14 @@ def test_boolean_coercion_true(value: object) -> None: @pytest.mark.parametrize("value", [False, BooleanValue(False)]) def test_boolean_coercion_false(value: object) -> None: - row = score_to_evaluator_results( - _score(outputs=[MetricOutput(name="passed", value=value)]), session_id="s", span_id="sp" - )[0] + row = _rows(_score(outputs=[MetricOutput(name="passed", value=value)]))[0] assert row["data_type"] == "BOOLEAN" assert row["value"] == 0.0 @pytest.mark.parametrize("value", [0.87, 3, ContinuousScore(0.87), DiscreteScore(3)]) def test_numeric_coercion(value: object) -> None: - row = score_to_evaluator_results( - _score(outputs=[MetricOutput(name="m", value=value)]), session_id="s", span_id="sp" - )[0] + row = _rows(_score(outputs=[MetricOutput(name="m", value=value)]))[0] assert row["data_type"] == "NUMERIC" assert isinstance(row["value"], float) assert "string_value" not in row @@ -160,9 +169,7 @@ def test_numeric_coercion(value: object) -> None: @pytest.mark.parametrize("value", ["PASS", Label("PASS")]) def test_text_coercion(value: object) -> None: - row = score_to_evaluator_results( - _score(outputs=[MetricOutput(name="verdict", value=value)]), session_id="s", span_id="sp" - )[0] + row = _rows(_score(outputs=[MetricOutput(name="verdict", value=value)]))[0] assert row["data_type"] == "TEXT" assert row["string_value"] == "PASS" assert "value" not in row @@ -176,12 +183,39 @@ def test_comment_taken_from_first_diagnostic() -> None: AgentEvalDiagnostic(severity=AgentEvalDiagnosticSeverity.INFO, message="second"), ], ) - row = score_to_evaluator_results(score, session_id="s", span_id="sp")[0] + row = _rows(score)[0] assert row["comment"] == "first" def test_comment_absent_without_diagnostics() -> None: - row = score_to_evaluator_results( - _score(outputs=[MetricOutput(name="score", value=1.0)]), session_id="s", span_id="sp" - )[0] + row = _rows(_score(outputs=[MetricOutput(name="score", value=1.0)]))[0] assert "comment" not in row + + +# --- score_to_evaluator_results: skipped outputs ---------------------------- + + +def test_non_finite_outputs_are_skipped_not_dropped_silently() -> None: + rows, skipped = score_to_evaluator_results( + _score(outputs=[MetricOutput(name="score", value=1.0), MetricOutput(name="broken", value=math.nan)]), + session_id="s", + span_id="sp", + ) + assert [row["name"] for row in rows] == ["accuracy.score"] + assert [(item.name, item.reason) for item in skipped] == [("accuracy.broken", "non-finite value")] + + +def test_failed_score_yields_no_rows_and_skips_every_output() -> None: + rows, skipped = score_to_evaluator_results( + _score( + outputs=[MetricOutput(name="score", value=1.0), MetricOutput(name="passed", value=True)], + status=AgentEvalScoreStatus.FAILED, + ), + session_id="s", + span_id="sp", + ) + assert rows == [] + assert [(item.name, item.reason) for item in skipped] == [ + ("accuracy.score", "scoring failed"), + ("accuracy.passed", "scoring failed"), + ] diff --git a/plugins/nemo-evaluator/tests/intake/test_publish.py b/plugins/nemo-evaluator/tests/intake/test_publish.py new file mode 100644 index 0000000000..3d075bd068 --- /dev/null +++ b/plugins/nemo-evaluator/tests/intake/test_publish.py @@ -0,0 +1,269 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for publish_to_intake — the explicit Evaluator -> Intake publish step.""" + +from __future__ import annotations + +import math +from collections.abc import AsyncIterator +from types import SimpleNamespace +from typing import Any, cast + +import pytest +from nemo_evaluator.intake.publish import PublishError, publish_to_intake +from nemo_evaluator_sdk.agent_eval.results import AgentEvalResult, AgentEvalSummary +from nemo_evaluator_sdk.agent_eval.scores import AgentEvalScoreStatus, AgentEvalTaskScore +from nemo_evaluator_sdk.agent_eval.trials import AgentEvalTrial, AgentEvalTrialStatus, AgentOutput +from nemo_evaluator_sdk.metrics.protocol import MetricOutput +from nemo_platform import AsyncNeMoPlatform + +# --- fakes ------------------------------------------------------------------ + + +class _FakeAtif: + def __init__(self, calls: list[dict[str, Any]], *, fail: bool = False) -> None: + self._calls = calls + self._fail = fail + + async def create(self, **kwargs: Any) -> None: + if self._fail: + raise RuntimeError("atif ingest 400") + self._calls.append(kwargs) + + +class _FakeEvaluatorResults: + def __init__(self, calls: list[dict[str, Any]], *, fail_session: str | None = None) -> None: + self._calls = calls + self._fail_session = fail_session + + async def create(self, **kwargs: Any) -> object: + if self._fail_session is not None and kwargs.get("session_id") == self._fail_session: + raise RuntimeError(f"evaluator-results 500 for {kwargs['session_id']}") + self._calls.append(kwargs) + return SimpleNamespace(evaluator_result_id="eval-1") + + +class _FakeTraces: + """Returns one root-span trace per requested session id (or none, to test resolution failure).""" + + def __init__(self, *, root_span_id: str | None) -> None: + self._root_span_id = root_span_id + + def list(self, *, workspace: str, filter: dict[str, Any]) -> AsyncIterator[object]: # noqa: A002 + root_span_id = self._root_span_id + session_id = filter["session_id"] + + async def _gen() -> AsyncIterator[object]: + if root_span_id is not None: + yield SimpleNamespace(session_id=session_id, root_span_id=f"{root_span_id}:{session_id}") + + return _gen() + + +class _FakeClient: + def __init__( + self, + *, + workspace: str | None = "default", + root_span_id: str | None = "span", + atif_fail: bool = False, + fail_eval_session: str | None = None, + ) -> None: + self.workspace = workspace + self.atif_calls: list[dict[str, Any]] = [] + self.eval_calls: list[dict[str, Any]] = [] + self.intake = SimpleNamespace( + ingest=SimpleNamespace(atif=_FakeAtif(self.atif_calls, fail=atif_fail)), + evaluator_results=_FakeEvaluatorResults(self.eval_calls, fail_session=fail_eval_session), + traces=_FakeTraces(root_span_id=root_span_id), + ) + + +def _client(**kwargs: Any) -> AsyncNeMoPlatform: + return cast(AsyncNeMoPlatform, _FakeClient(**kwargs)) + + +# --- fixtures --------------------------------------------------------------- + + +def _trial(trial_id: str, task_id: str = "task-1") -> AgentEvalTrial: + return AgentEvalTrial( + id=trial_id, + task_id=task_id, + status=AgentEvalTrialStatus.COMPLETED, + output=AgentOutput(output_text="answer"), + ) + + +def _score( + trial_id: str, + metric_type: str, + outputs: list[MetricOutput], + status: AgentEvalScoreStatus = AgentEvalScoreStatus.COMPLETED, +) -> AgentEvalTaskScore: + return AgentEvalTaskScore( + id=f"score-{trial_id}-{metric_type}", + run_id="run-1", + task_id="task-1", + trial_id=trial_id, + metric_type=metric_type, + status=status, + outputs=outputs, + ) + + +def _result(trials: list[AgentEvalTrial], scores: list[AgentEvalTaskScore]) -> AgentEvalResult: + return AgentEvalResult( + run_id="run-1", + tasks=[], + trials=trials, + scores=scores, + summary=AgentEvalSummary(), + ) + + +# --- tests ------------------------------------------------------------------ + + +async def test_publishes_trajectory_and_scores() -> None: + result = _result( + trials=[_trial("t-1")], + scores=[ + _score("t-1", "accuracy", [MetricOutput(name="score", value=0.5), MetricOutput(name="passed", value=True)]), + _score("t-1", "latency", [MetricOutput(name="p50", value=1.2)]), + ], + ) + client = _FakeClient() + report = await publish_to_intake(result, platform=cast(AsyncNeMoPlatform, client), experiment_id="exp-1") + + assert len(client.atif_calls) == 1 + assert client.atif_calls[0]["session_id"] == "run-1:t-1" + assert client.atif_calls[0]["experiment_context"] == {"experiment_id": "exp-1", "test_case_id": "task-1"} + # 3 metric outputs across the two score records -> 3 evaluator-result rows. + assert len(client.eval_calls) == 3 + assert {call["name"] for call in client.eval_calls} == {"accuracy.score", "accuracy.passed", "latency.p50"} + # span_id resolved from the trace and threaded into every row. + assert {call["span_id"] for call in client.eval_calls} == {"span:run-1:t-1"} + + assert report.trial_count == 1 + assert report.evaluator_result_count == 3 + published = report.published_trials[0] + assert (published.trial_id, published.session_id, published.span_id, published.evaluator_result_count) == ( + "t-1", + "run-1:t-1", + "span:run-1:t-1", + 3, + ) + + +async def test_multiple_trials_each_get_their_own_session_and_span() -> None: + result = _result( + trials=[_trial("t-1"), _trial("t-2")], + scores=[ + _score("t-1", "accuracy", [MetricOutput(name="score", value=1.0)]), + _score("t-2", "accuracy", [MetricOutput(name="score", value=0.0)]), + ], + ) + client = _FakeClient() + report = await publish_to_intake(result, platform=cast(AsyncNeMoPlatform, client), experiment_id="exp-1") + + assert len(client.atif_calls) == 2 + assert report.trial_count == 2 + by_session = {call["session_id"]: call["span_id"] for call in client.eval_calls} + assert by_session == {"run-1:t-1": "span:run-1:t-1", "run-1:t-2": "span:run-1:t-2"} + + +async def test_trial_without_scores_still_ingests_trajectory() -> None: + result = _result(trials=[_trial("t-1")], scores=[]) + client = _FakeClient() + report = await publish_to_intake(result, platform=cast(AsyncNeMoPlatform, client), experiment_id="exp-1") + + assert len(client.atif_calls) == 1 + assert len(client.eval_calls) == 0 + assert report.published_trials[0].evaluator_result_count == 0 + + +async def test_explicit_workspace_overrides_client_default() -> None: + result = _result(trials=[_trial("t-1")], scores=[]) + client = _FakeClient(workspace="default") + report = await publish_to_intake( + result, platform=cast(AsyncNeMoPlatform, client), experiment_id="exp-1", workspace="ws-2" + ) + assert report.workspace == "ws-2" + assert client.atif_calls[0]["workspace"] == "ws-2" + + +async def test_missing_workspace_raises() -> None: + result = _result(trials=[_trial("t-1")], scores=[]) + client = _FakeClient(workspace=None) + with pytest.raises(ValueError, match="workspace"): + await publish_to_intake(result, platform=cast(AsyncNeMoPlatform, client), experiment_id="exp-1") + + +async def test_unresolvable_span_raises_publish_error() -> None: + result = _result( + trials=[_trial("t-1")], + scores=[_score("t-1", "accuracy", [MetricOutput(name="score", value=1.0)])], + ) + client = _FakeClient(root_span_id=None) + with pytest.raises(PublishError, match="No root span"): + await publish_to_intake(result, platform=cast(AsyncNeMoPlatform, client), experiment_id="exp-1") + + +async def test_ingest_failure_propagates() -> None: + result = _result(trials=[_trial("t-1")], scores=[]) + client = _FakeClient(atif_fail=True) + with pytest.raises(RuntimeError, match="atif ingest 400"): + await publish_to_intake(result, platform=cast(AsyncNeMoPlatform, client), experiment_id="exp-1") + + +async def test_failed_and_non_finite_scores_are_skipped_and_reported() -> None: + # NaN can't be sent (not JSON-serializable) and a FAILED score is not a real measurement; both + # are omitted but surfaced in the report so the omission is explicit, not silent (X6). + result = _result( + trials=[_trial("t-1")], + scores=[ + _score( + "t-1", "accuracy", [MetricOutput(name="score", value=1.0), MetricOutput(name="broken", value=math.nan)] + ), + _score("t-1", "judge", [MetricOutput(name="verdict", value=math.nan)], status=AgentEvalScoreStatus.FAILED), + ], + ) + client = _FakeClient() + report = await publish_to_intake(result, platform=cast(AsyncNeMoPlatform, client), experiment_id="exp-1") + + # Only the finite, completed output is sent to Intake. + assert {call["name"] for call in client.eval_calls} == {"accuracy.score"} + # The omissions are reported, with reasons. + assert {(skip.name, skip.reason) for skip in report.skipped} == { + ("accuracy.broken", "non-finite value"), + ("judge.verdict", "scoring failed"), + } + + +async def test_one_trial_failure_does_not_block_others_and_is_reported() -> None: + # Partial uploads are acceptable (intake has no rollback), so a single trial's failure must NOT + # abort the others — every trial that can land should land, leaving less for an idempotent retry. + result = _result( + trials=[_trial("t-1"), _trial("t-2")], + scores=[ + _score("t-1", "accuracy", [MetricOutput(name="score", value=1.0)]), + _score("t-2", "accuracy", [MetricOutput(name="score", value=0.0)]), + ], + ) + client = _FakeClient(fail_eval_session="run-1:t-2") + + with pytest.raises(PublishError) as excinfo: + await publish_to_intake( + result, platform=cast(AsyncNeMoPlatform, client), experiment_id="exp-1", max_concurrency=1 + ) + + # The healthy trial still published despite the other failing. + assert any(call["session_id"] == "run-1:t-1" for call in client.eval_calls) + assert all(call["session_id"] != "run-1:t-2" for call in client.eval_calls) + + # The failure surfaces the affected trial and points the user at recovery. + message = str(excinfo.value).lower() + assert "t-2" in message + assert "re-run" in message or "cached" in message or "publish" in message diff --git a/plugins/nemo-evaluator/tests/integration/test_publish_to_intake.py b/plugins/nemo-evaluator/tests/integration/test_publish_to_intake.py new file mode 100644 index 0000000000..c156b29737 --- /dev/null +++ b/plugins/nemo-evaluator/tests/integration/test_publish_to_intake.py @@ -0,0 +1,304 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Integration test: publish_to_intake against a live Intake + ClickHouse. + +Marked ``integration`` (auto-applied to ``/integration/`` paths), so it runs under +``make test-integration`` / ``-m integration`` and is excluded from the unit suite. +Session fixtures stand up ClickHouse (Docker) and the platform +(``auth,entities,intake``); the test skips cleanly when Docker is unavailable. + +Run directly:: + + uv run pytest plugins/nemo-evaluator/tests/integration/test_publish_to_intake.py -v + +Requires Docker (Intake is ClickHouse-backed) and a free :8080 / :8123. +""" + +from __future__ import annotations + +import math +import os +import socket +import subprocess +import time +import urllib.request +from collections.abc import Iterator +from importlib.util import find_spec +from pathlib import Path + +import pytest +from nemo_evaluator.intake.publish import publish_to_intake +from nemo_evaluator_sdk.agent_eval.results import AgentEvalResult, AgentEvalSummary +from nemo_evaluator_sdk.agent_eval.scores import AgentEvalScoreStatus, AgentEvalTaskScore +from nemo_evaluator_sdk.agent_eval.trials import AgentEvalTrial, AgentEvalTrialStatus, AgentOutput +from nemo_evaluator_sdk.metrics.protocol import MetricOutput +from nemo_platform import AsyncNeMoPlatform +from nemo_platform.types.intake.trace_filter_param import TraceFilterParam + +pytestmark = pytest.mark.integration + +REPO_ROOT = Path(__file__).resolve().parents[4] +BASE_URL = os.environ.get("NMP_BASE_URL", "http://localhost:8080") +WORKSPACE = "default" +GROUP_NAME = "intake-it-group" +EXPERIMENT_NAME = "intake-it-exp" +RUN_ID = "intake-it-run" +NAN_EXPERIMENT_NAME = "intake-it-nan-exp" +NAN_RUN_ID = "intake-it-nan-run" +CLICKHOUSE_CONTAINER = "nmp-intake-clickhouse" + + +def _docker_available() -> bool: + if find_spec("docker") is None: + return False + from docker.errors import DockerException + + import docker + + try: + client = docker.from_env() + try: + client.ping() + finally: + client.close() + return True + except (DockerException, OSError): + return False + + +def _wait_for_tcp(host: str, port: int, *, timeout: float) -> None: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(2) + if sock.connect_ex((host, port)) == 0: + return + time.sleep(1) + raise RuntimeError(f"{host}:{port} not reachable within {timeout}s") + + +def _wait_for_ready(base_url: str, *, timeout: float) -> None: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(f"{base_url}/health/ready", timeout=2) as response: # noqa: S310 + if response.status == 200: + return + except OSError: + pass + time.sleep(2) + raise RuntimeError(f"platform at {base_url} not ready within {timeout}s") + + +@pytest.fixture(scope="session") +def _clickhouse() -> Iterator[None]: + if not _docker_available(): + pytest.skip("Docker not available; required for ClickHouse-backed Intake") + subprocess.run( + ["bash", str(REPO_ROOT / "services/intake/scripts/spans/run_clickhouse.sh")], + check=True, + cwd=REPO_ROOT, + ) + try: + _wait_for_tcp("localhost", 8123, timeout=60) + yield + finally: + subprocess.run(["docker", "rm", "-f", CLICKHOUSE_CONTAINER], check=False) + + +@pytest.fixture(scope="session") +def platform_base_url(_clickhouse: None) -> Iterator[str]: + process = subprocess.Popen( + ["uv", "run", "nemo", "services", "run", "--services", "auth,entities,intake"], + cwd=REPO_ROOT, + env={**os.environ, "NMP_BASE_URL": BASE_URL}, + ) + try: + _wait_for_ready(BASE_URL, timeout=180) + yield BASE_URL + finally: + process.terminate() + try: + process.wait(timeout=20) + except subprocess.TimeoutExpired: + process.kill() + + +def _result() -> AgentEvalResult: + trials = [ + AgentEvalTrial( + id="trial-1", + task_id="task-1", + status=AgentEvalTrialStatus.COMPLETED, + output=AgentOutput(output_text="The capital of France is Paris."), + ), + AgentEvalTrial( + id="trial-2", + task_id="task-2", + status=AgentEvalTrialStatus.COMPLETED, + output=AgentOutput(output_text="2 + 2 = 4."), + ), + ] + scores = [ + AgentEvalTaskScore( + id="score-1", + run_id=RUN_ID, + task_id="task-1", + trial_id="trial-1", + metric_type="accuracy", + status=AgentEvalScoreStatus.COMPLETED, + outputs=[MetricOutput(name="score", value=1.0), MetricOutput(name="passed", value=True)], + ), + AgentEvalTaskScore( + id="score-2", + run_id=RUN_ID, + task_id="task-1", + trial_id="trial-1", + metric_type="judge", + status=AgentEvalScoreStatus.COMPLETED, + outputs=[MetricOutput(name="verdict", value="correct")], + ), + AgentEvalTaskScore( + id="score-3", + run_id=RUN_ID, + task_id="task-2", + trial_id="trial-2", + metric_type="accuracy", + status=AgentEvalScoreStatus.COMPLETED, + outputs=[MetricOutput(name="score", value=0.0), MetricOutput(name="passed", value=False)], + ), + ] + return AgentEvalResult(run_id=RUN_ID, tasks=[], trials=trials, scores=scores, summary=AgentEvalSummary()) + + +async def test_publish_to_intake_round_trip(platform_base_url: str) -> None: + async with AsyncNeMoPlatform(base_url=platform_base_url, max_retries=2) as client: + # Precondition: the Experiment must exist before ingest. + group = await client.experiment_groups.create( + workspace=WORKSPACE, name=GROUP_NAME, description="Intake IT", exist_ok=True + ) + await client.experiments.create( + workspace=WORKSPACE, + name=EXPERIMENT_NAME, + experiment_group_id=group.id, + dataset_name="intake-it-dataset", + dataset_version="v1", + exist_ok=True, + ) + + report = await publish_to_intake( + _result(), + platform=client, + experiment_id=EXPERIMENT_NAME, + workspace=WORKSPACE, + agent_name="intake-it-agent", + model_name="intake-it-model", + ) + + assert report.trial_count == 2 + assert report.evaluator_result_count == 5 + published = {trial.trial_id: trial for trial in report.published_trials} + + # --- trial-1: trajectory + experiment-context propagation, read back via the Intake API. + t1 = published["trial-1"] + trace_filter: TraceFilterParam = {"session_id": t1.session_id} + traces = [trace async for trace in client.intake.traces.list(workspace=WORKSPACE, filter=trace_filter)] + assert len(traces) == 1 + trace = traces[0] + assert trace.session_id == t1.session_id + assert trace.root_span_id == t1.span_id + assert trace.experiment_context is not None + assert trace.experiment_context.experiment_id == EXPERIMENT_NAME + assert trace.experiment_context.test_case_id == "task-1" + + # --- trial-1 scores: every field, every data_type coercion. + rows = await client.intake.spans.evaluator_results.list(t1.span_id, workspace=WORKSPACE) + by_name = {row.name: row for row in rows} + assert set(by_name) == {"accuracy.score", "accuracy.passed", "judge.verdict"} + for row in rows: + assert row.session_id == t1.session_id + assert row.span_id == t1.span_id + assert row.workspace == WORKSPACE + assert by_name["accuracy.score"].data_type == "NUMERIC" + assert by_name["accuracy.score"].value == 1.0 + assert by_name["accuracy.passed"].data_type == "BOOLEAN" + assert by_name["accuracy.passed"].value == 1.0 + assert by_name["judge.verdict"].data_type == "TEXT" + assert by_name["judge.verdict"].string_value == "correct" + + # --- trial-2: distinct session/span; BOOLEAN false coerces to 0.0. + t2 = published["trial-2"] + assert t2.session_id != t1.session_id + assert t2.span_id != t1.span_id + rows2 = await client.intake.spans.evaluator_results.list(t2.span_id, workspace=WORKSPACE) + by_name2 = {row.name: row for row in rows2} + assert set(by_name2) == {"accuracy.score", "accuracy.passed"} + assert by_name2["accuracy.passed"].data_type == "BOOLEAN" + assert by_name2["accuracy.passed"].value == 0.0 + assert by_name2["accuracy.score"].value == 0.0 + + +def _nan_result() -> AgentEvalResult: + """A result with a NaN-valued output and a FAILED score alongside one valid score.""" + trial = AgentEvalTrial( + id="trial-1", + task_id="task-1", + status=AgentEvalTrialStatus.COMPLETED, + output=AgentOutput(output_text="answer"), + ) + scores = [ + AgentEvalTaskScore( + id="score-ok", + run_id=NAN_RUN_ID, + task_id="task-1", + trial_id="trial-1", + metric_type="accuracy", + status=AgentEvalScoreStatus.COMPLETED, + outputs=[MetricOutput(name="score", value=0.5), MetricOutput(name="broken", value=math.nan)], + ), + AgentEvalTaskScore( + id="score-failed", + run_id=NAN_RUN_ID, + task_id="task-1", + trial_id="trial-1", + metric_type="judge", + status=AgentEvalScoreStatus.FAILED, + outputs=[MetricOutput(name="verdict", value=math.nan)], + ), + ] + return AgentEvalResult(run_id=NAN_RUN_ID, tasks=[], trials=[trial], scores=scores, summary=AgentEvalSummary()) + + +async def test_publish_skips_nan_and_failed_scores(platform_base_url: str) -> None: + # A NaN value is not representable in JSON and a FAILED score is not a real measurement; neither + # should reach Intake. Only the finite, completed output should be stored. + async with AsyncNeMoPlatform(base_url=platform_base_url, max_retries=2) as client: + group = await client.experiment_groups.create(workspace=WORKSPACE, name=GROUP_NAME, exist_ok=True) + await client.experiments.create( + workspace=WORKSPACE, + name=NAN_EXPERIMENT_NAME, + experiment_group_id=group.id, + dataset_name="intake-it-nan-dataset", + dataset_version="v1", + exist_ok=True, + ) + + report = await publish_to_intake( + _nan_result(), + platform=client, + experiment_id=NAN_EXPERIMENT_NAME, + workspace=WORKSPACE, + agent_name="intake-it-agent", + ) + + published = report.published_trials[0] + rows = await client.intake.spans.evaluator_results.list(published.span_id, workspace=WORKSPACE) + assert {row.name for row in rows} == {"accuracy.score"} + assert report.evaluator_result_count == 1 + + # The dropped outputs are surfaced (not silently lost) until Intake can model failure. + assert {(skip.name, skip.reason) for skip in report.skipped} == { + ("accuracy.broken", "non-finite value"), + ("judge.verdict", "scoring failed"), + }