Skip to content

Commit 09174c0

Browse files
SandyChapmanclaude
andauthored
feat(evaluator): publish_to_intake entrypoint (D9) (#456)
* feat(evaluator): add publish_to_intake entrypoint (D9) publish_to_intake(result, *, platform, experiment_id, workspace=None, ...) is the explicit, post-run consumer of AgentEvalResult. Per trial it POSTs the ATIF trajectory, resolves the root span via the traces query-back (§3.5 option 1), then POSTs one evaluator-result per metric output. All request shapes come from the D8 mapping module; wire calls go through the generated SDK intake resources. - Async, bounded-concurrent across trials (asyncio.gather + semaphore). - References an existing Experiment; never creates one (caller pre-creates via the Experiments SDK). Agent identity is taken as arguments since it lives on the run target, which AgentEvalResult does not carry (§3.9 #6). - Returns a PublishReport (per-trial session/span ids + result counts). - Raises on HTTP/validation failure; the local bundle is never touched. Unit-tested with a fake async client (round-trip, multi-trial, score-less trial, workspace resolution, span-resolution failure, ingest-failure propagation). Manually validated end-to-end against live Intake + ClickHouse. Refs: AALGO-290 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: Sandy Chapman <schapman@nvidia.com> * test(evaluator): add live Intake integration test for publish_to_intake (D9) Stands up ClickHouse (Docker) + the platform (auth,entities,intake) via session fixtures, creates an Experiment, runs publish_to_intake, and asserts the full round-trip back through the public Intake API: experiment_context propagation (experiment_id + test_case_id), root-span resolution, and every evaluator-result field/data_type (NUMERIC/BOOLEAN/TEXT, including false->0.0). Marked 'integration' (registered in the plugin pyproject), so it runs under make test-integration / -m integration and is excluded from the unit suite; it skips cleanly when Docker is unavailable, mirroring the intake conftest's _docker_available() gate. Verified end-to-end locally (~18s). Refs: AALGO-290 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: Sandy Chapman <schapman@nvidia.com> --------- Signed-off-by: Sandy Chapman <schapman@nvidia.com> Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 19ce8d1 commit 09174c0

6 files changed

Lines changed: 886 additions & 33 deletions

File tree

plugins/nemo-evaluator/pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ dev = ["pytest>=8.3.4", "pytest-asyncio>=0.25.3", "ruff>=0.11.8"]
5050
asyncio_mode = "auto"
5151
pythonpath = ["src"]
5252
testpaths = ["tests"]
53+
markers = [
54+
"integration: live service integration tests (opt-in; e.g. RUN_INTAKE_INTEGRATION=1)",
55+
]
5356

5457
# Opt this plugin into OpenAPI spec generation
5558
[tool.nemo.openapi]

plugins/nemo-evaluator/src/nemo_evaluator/intake/mapping.py

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929

3030
from __future__ import annotations
3131

32+
import math
33+
from dataclasses import dataclass
3234
from typing import Literal
3335

34-
from nemo_evaluator_sdk.agent_eval.scores import AgentEvalTaskScore
36+
from nemo_evaluator_sdk.agent_eval.scores import AgentEvalScoreStatus, AgentEvalTaskScore
3537
from nemo_evaluator_sdk.agent_eval.trials import AgentEvalTrial
3638
from nemo_platform.types.intake.evaluator_result_create_params import EvaluatorResultCreateParams
3739
from nemo_platform.types.intake.evaluator_result_data_type import EvaluatorResultDataType
@@ -112,29 +114,57 @@ def trial_to_atif_ingest(
112114
return body
113115

114116

117+
@dataclass(frozen=True)
118+
class SkippedOutput:
119+
"""A metric output omitted from publish, with the reason it was dropped (see cross-team ask X6)."""
120+
121+
name: str
122+
reason: str
123+
124+
115125
def score_to_evaluator_results(
116126
score: AgentEvalTaskScore,
117127
*,
118128
session_id: str,
119129
span_id: str,
120-
) -> list[EvaluatorResultCreateParams]:
121-
"""Turn one ``AgentEvalTaskScore`` into one evaluator-result param per output.
122-
123-
``name`` is ``"{metric_type}.{output}"`` (matching the SDK summary's
124-
aggregate naming). The output's value is coerced into the matching
125-
``data_type``, populating exactly one of ``value`` / ``string_value``.
126-
``session_id`` and ``span_id`` are supplied by the caller: the trajectory
127-
span id is resolved at publish time (the adapter's concern), not derivable
128-
from the pure score.
130+
) -> tuple[list[EvaluatorResultCreateParams], list[SkippedOutput]]:
131+
"""Map one ``AgentEvalTaskScore`` to ``(rows, skipped)`` for Intake.
132+
133+
``rows`` is one evaluator-result param per publishable output: ``name`` is
134+
``"{metric_type}.{output}"`` (matching the SDK summary's aggregate naming) and the
135+
value is coerced into the matching ``data_type``, populating exactly one of ``value``
136+
/ ``string_value``. ``session_id``/``span_id`` are supplied by the caller — the
137+
trajectory span id is resolved at publish time, not derivable from the pure score.
138+
139+
``skipped`` carries the outputs that can't be published, with the reason — so the
140+
publishable/omitted split has a single source of truth and callers can report the
141+
omissions instead of silently losing them. A FAILED score yields no rows (every output
142+
skipped); a completed score's non-finite (NaN/inf) outputs are dropped (NaN isn't
143+
JSON-representable — the platform client's encoder rejects it — so it can't be sent).
144+
145+
TODO(X6): once Intake can represent a failed metric result, publish these as failures
146+
instead of dropping them.
129147
"""
148+
if score.status == AgentEvalScoreStatus.FAILED:
149+
skipped = [
150+
SkippedOutput(name=f"{score.metric_type}.{output.name}", reason="scoring failed")
151+
for output in score.outputs
152+
]
153+
return [], skipped
154+
130155
comment = score.diagnostics[0].message if score.diagnostics else None
131156
rows: list[EvaluatorResultCreateParams] = []
157+
skipped: list[SkippedOutput] = []
132158
for output in score.outputs:
159+
name = f"{score.metric_type}.{output.name}"
133160
data_type, value, string_value = _coerce_metric_value(output.value)
161+
if value is not None and not math.isfinite(value):
162+
skipped.append(SkippedOutput(name=name, reason="non-finite value"))
163+
continue
134164
row: EvaluatorResultCreateParams = {
135165
"session_id": session_id,
136166
"span_id": span_id,
137-
"name": f"{score.metric_type}.{output.name}",
167+
"name": name,
138168
"data_type": data_type,
139169
}
140170
if value is not None:
@@ -144,7 +174,7 @@ def score_to_evaluator_results(
144174
if comment is not None:
145175
row["comment"] = comment
146176
rows.append(row)
147-
return rows
177+
return rows, skipped
148178

149179

150180
def _coerce_metric_value(value: object) -> tuple[EvaluatorResultDataType, float | None, str | None]:
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Publish a completed agent evaluation to Intake.
5+
6+
``publish_to_intake`` is the explicit, post-run consumer of ``AgentEvalResult``
7+
(see AALGO-290). It is **not** a side effect of ``AgentEvaluator.run()`` and
8+
there is no feature flag — optionality is structural: you make the call or you
9+
don't, and the platform client is a required argument.
10+
11+
It references an **existing** Experiment (created by the caller via the platform
12+
Experiments SDK) and never creates one. Per Trial it: POSTs the ATIF trajectory,
13+
resolves the trajectory's root span, then POSTs one evaluator-result per metric
14+
output. All request shapes come from :mod:`nemo_evaluator.intake.mapping`; the
15+
HTTP calls go through the generated platform SDK's ``intake`` resources.
16+
"""
17+
18+
from __future__ import annotations
19+
20+
import asyncio
21+
from collections import defaultdict
22+
23+
from nemo_evaluator.intake import mapping
24+
from nemo_evaluator.sdk import http_utils
25+
from nemo_evaluator_sdk.agent_eval.results import AgentEvalResult
26+
from nemo_evaluator_sdk.agent_eval.scores import AgentEvalTaskScore
27+
from nemo_evaluator_sdk.agent_eval.trials import AgentEvalTrial
28+
from nemo_platform import AsyncNeMoPlatform
29+
from nemo_platform.types.intake.trace_filter_param import TraceFilterParam
30+
from pydantic import BaseModel, ConfigDict, Field
31+
32+
#: Default ceiling on concurrent per-trial publishes.
33+
DEFAULT_MAX_CONCURRENCY = 8
34+
35+
36+
class PublishError(RuntimeError):
37+
"""Raised when one or more trials fail to publish (or a span never resolves).
38+
39+
Carries the partial :class:`PublishReport` of trials that *did* publish, so the
40+
caller can see what landed before re-running.
41+
"""
42+
43+
def __init__(self, message: str, *, report: PublishReport | None = None) -> None:
44+
super().__init__(message)
45+
self.report = report
46+
47+
48+
class PublishedTrial(BaseModel):
49+
"""Record of one Trial written to Intake."""
50+
51+
model_config = ConfigDict(extra="forbid")
52+
53+
trial_id: str = Field(description="Identifier of the published trial.")
54+
session_id: str = Field(description="Intake session id minted for the trajectory.")
55+
span_id: str = Field(description="Resolved root AGENT span id the scores were attached to.")
56+
evaluator_result_count: int = Field(description="Number of evaluator-result rows written for this trial.")
57+
58+
59+
class SkippedScore(BaseModel):
60+
"""A score output omitted from publish because Intake can't represent it yet (cross-team ask X6)."""
61+
62+
model_config = ConfigDict(extra="forbid")
63+
64+
trial_id: str = Field(description="Trial whose score output was omitted.")
65+
name: str = Field(description='"{metric_type}.{output}" of the omitted output.')
66+
reason: str = Field(description="Why it was omitted (e.g. 'scoring failed', 'non-finite value').")
67+
68+
69+
class PublishReport(BaseModel):
70+
"""Summary of a ``publish_to_intake`` run."""
71+
72+
model_config = ConfigDict(extra="forbid")
73+
74+
experiment_id: str = Field(description="Experiment the results were published under.")
75+
workspace: str = Field(description="Workspace the writes targeted.")
76+
run_id: str = Field(description="Source AgentEvalResult run id.")
77+
published_trials: list[PublishedTrial] = Field(
78+
default_factory=list, description="Per-trial records of what was written."
79+
)
80+
skipped: list[SkippedScore] = Field(
81+
default_factory=list,
82+
description="Score outputs omitted because Intake can't represent failed/non-finite scores (cross-team ask X6).",
83+
)
84+
85+
@property
86+
def trial_count(self) -> int:
87+
"""Number of trials published."""
88+
return len(self.published_trials)
89+
90+
@property
91+
def evaluator_result_count(self) -> int:
92+
"""Total evaluator-result rows written across all trials."""
93+
return sum(trial.evaluator_result_count for trial in self.published_trials)
94+
95+
96+
async def publish_to_intake(
97+
result: AgentEvalResult,
98+
*,
99+
platform: AsyncNeMoPlatform,
100+
experiment_id: str,
101+
workspace: str | None = None,
102+
agent_name: str = "agent",
103+
agent_version: str = mapping.DEFAULT_AGENT_VERSION,
104+
model_name: str | None = None,
105+
max_concurrency: int = DEFAULT_MAX_CONCURRENCY,
106+
) -> PublishReport:
107+
"""Publish a completed ``AgentEvalResult`` to Intake under an existing Experiment.
108+
109+
For each trial: POST the ATIF trajectory, resolve its root span, then POST one
110+
evaluator-result per metric output. Trials are published concurrently up to
111+
``max_concurrency``.
112+
113+
Publishing is **not atomic** and Intake has no rollback, so a per-trial failure
114+
must not abort the others: every trial that can land does, and the failures are
115+
collected and raised together as a :class:`PublishError` (carrying the partial
116+
report). The evaluation's local bundle is the system of record and is never
117+
touched, so the caller can re-run ``publish_to_intake`` once the issue is fixed
118+
to publish the remaining trials. (Re-publish is not yet idempotent — see ask X1.)
119+
120+
``experiment_id`` must reference an Experiment that already exists — ATIF ingest
121+
rejects unknown experiments with HTTP 400. Creating the Experiment/group is a
122+
separate, caller-side step via the platform Experiments SDK.
123+
124+
Agent identity (``agent_name``/``agent_version``/``model_name``) is taken as
125+
arguments because it lives on the run *target*, which ``AgentEvalResult`` does
126+
not carry (design §3.9 #6).
127+
"""
128+
resolved_workspace = http_utils.resolve_workspace(platform, workspace, strict=True)
129+
130+
scores_by_trial: dict[str, list[AgentEvalTaskScore]] = defaultdict(list)
131+
for score in result.scores:
132+
scores_by_trial[score.trial_id].append(score)
133+
134+
semaphore = asyncio.Semaphore(max_concurrency)
135+
skipped: list[SkippedScore] = []
136+
137+
async def _publish_trial(trial: AgentEvalTrial) -> PublishedTrial:
138+
async with semaphore:
139+
body = mapping.trial_to_atif_ingest(
140+
trial,
141+
run_id=result.run_id,
142+
experiment_id=experiment_id,
143+
agent_name=agent_name,
144+
agent_version=agent_version,
145+
model_name=model_name,
146+
)
147+
body["workspace"] = resolved_workspace
148+
await platform.intake.ingest.atif.create(**body)
149+
150+
session_id = mapping.session_id_for(result.run_id, trial.id)
151+
span_id = await _resolve_root_span_id(platform, workspace=resolved_workspace, session_id=session_id)
152+
153+
written = 0
154+
for score in scores_by_trial.get(trial.id, []):
155+
rows, omitted = mapping.score_to_evaluator_results(score, session_id=session_id, span_id=span_id)
156+
for row in rows:
157+
row["workspace"] = resolved_workspace
158+
await platform.intake.evaluator_results.create(**row)
159+
written += 1
160+
skipped.extend(SkippedScore(trial_id=trial.id, name=item.name, reason=item.reason) for item in omitted)
161+
162+
return PublishedTrial(
163+
trial_id=trial.id,
164+
session_id=session_id,
165+
span_id=span_id,
166+
evaluator_result_count=written,
167+
)
168+
169+
outcomes = await asyncio.gather(*(_publish_trial(trial) for trial in result.trials), return_exceptions=True)
170+
171+
published: list[PublishedTrial] = []
172+
failures: list[tuple[str, BaseException]] = []
173+
for trial, outcome in zip(result.trials, outcomes, strict=True):
174+
if isinstance(outcome, PublishedTrial):
175+
published.append(outcome)
176+
else:
177+
failures.append((trial.id, outcome))
178+
179+
report = PublishReport(
180+
experiment_id=experiment_id,
181+
workspace=resolved_workspace,
182+
run_id=result.run_id,
183+
published_trials=published,
184+
skipped=skipped,
185+
)
186+
if failures:
187+
raise PublishError(_publish_failure_message(result, report, failures), report=report)
188+
return report
189+
190+
191+
def _publish_failure_message(
192+
result: AgentEvalResult,
193+
report: PublishReport,
194+
failures: list[tuple[str, BaseException]],
195+
) -> str:
196+
"""Build an actionable error: what failed, where the results are cached, how to recover."""
197+
location = f"cached locally at {result.output_dir}" if result.output_dir is not None else "in the local run bundle"
198+
detail = "\n ".join(f"{trial_id}: {type(error).__name__}: {error}" for trial_id, error in failures)
199+
return (
200+
f"publish_to_intake: {len(failures)} of {len(result.trials)} trial(s) failed to publish "
201+
f"({report.trial_count} succeeded). The evaluation results are {location}; re-run "
202+
f"publish_to_intake(result, ...) once the issue is resolved to publish the rest.\n"
203+
f"Failed trials:\n {detail}"
204+
)
205+
206+
207+
async def _resolve_root_span_id(platform: AsyncNeMoPlatform, *, workspace: str, session_id: str) -> str:
208+
"""Return the root AGENT span id for a freshly-ingested trajectory (design §3.5, option 1)."""
209+
trace_filter: TraceFilterParam = {"session_id": session_id}
210+
async for trace in platform.intake.traces.list(workspace=workspace, filter=trace_filter):
211+
if trace.root_span_id:
212+
return trace.root_span_id
213+
raise PublishError(f"No root span resolved for session {session_id!r} after ATIF ingest")

0 commit comments

Comments
 (0)