Skip to content

Commit 78b2a3d

Browse files
SandyChapmanclaude
andcommitted
test(evaluator): add live Intake integration test for publish_to_intake (D9)
Stands up ClickHouse (Docker) + the platform (auth,entities,intake) via session fixtures, creates an Experiment, runs publish_to_intake, and asserts the full round-trip back through the public Intake API: experiment_context propagation (experiment_id + test_case_id), root-span resolution, and every evaluator-result field/data_type (NUMERIC/BOOLEAN/TEXT, including false->0.0). Marked 'integration' (registered in the plugin pyproject), so it runs under make test-integration / -m integration and is excluded from the unit suite; it skips cleanly when Docker is unavailable, mirroring the intake conftest's _docker_available() gate. Verified end-to-end locally (~18s). Refs: AALGO-290 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: Sandy Chapman <schapman@nvidia.com>
1 parent 66fc545 commit 78b2a3d

2 files changed

Lines changed: 239 additions & 0 deletions

File tree

plugins/nemo-evaluator/pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ dev = ["pytest>=8.3.4", "pytest-asyncio>=0.25.3", "ruff>=0.11.8"]
5050
asyncio_mode = "auto"
5151
pythonpath = ["src"]
5252
testpaths = ["tests"]
53+
markers = [
54+
"integration: live service integration tests (opt-in; e.g. RUN_INTAKE_INTEGRATION=1)",
55+
]
5356

5457
# Opt this plugin into OpenAPI spec generation
5558
[tool.nemo.openapi]
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Integration test: publish_to_intake against a live Intake + ClickHouse.
5+
6+
Marked ``integration`` (auto-applied to ``/integration/`` paths), so it runs under
7+
``make test-integration`` / ``-m integration`` and is excluded from the unit suite.
8+
Session fixtures stand up ClickHouse (Docker) and the platform
9+
(``auth,entities,intake``); the test skips cleanly when Docker is unavailable.
10+
11+
Run directly::
12+
13+
uv run pytest plugins/nemo-evaluator/tests/integration/test_publish_to_intake.py -v
14+
15+
Requires Docker (Intake is ClickHouse-backed) and a free :8080 / :8123.
16+
"""
17+
18+
from __future__ import annotations
19+
20+
import os
21+
import socket
22+
import subprocess
23+
import time
24+
import urllib.request
25+
from collections.abc import Iterator
26+
from importlib.util import find_spec
27+
from pathlib import Path
28+
29+
import pytest
30+
from nemo_evaluator.intake.publish import publish_to_intake
31+
from nemo_evaluator_sdk.agent_eval.results import AgentEvalResult, AgentEvalSummary
32+
from nemo_evaluator_sdk.agent_eval.scores import AgentEvalScoreStatus, AgentEvalTaskScore
33+
from nemo_evaluator_sdk.agent_eval.trials import AgentEvalTrial, AgentEvalTrialStatus, AgentOutput
34+
from nemo_evaluator_sdk.metrics.protocol import MetricOutput
35+
from nemo_platform import AsyncNeMoPlatform
36+
from nemo_platform.types.intake.trace_filter_param import TraceFilterParam
37+
38+
pytestmark = pytest.mark.integration
39+
40+
REPO_ROOT = Path(__file__).resolve().parents[4]
41+
BASE_URL = os.environ.get("NMP_BASE_URL", "http://localhost:8080")
42+
WORKSPACE = "default"
43+
GROUP_NAME = "intake-it-group"
44+
EXPERIMENT_NAME = "intake-it-exp"
45+
RUN_ID = "intake-it-run"
46+
CLICKHOUSE_CONTAINER = "nmp-intake-clickhouse"
47+
48+
49+
def _docker_available() -> bool:
50+
if find_spec("docker") is None:
51+
return False
52+
from docker.errors import DockerException
53+
54+
import docker
55+
56+
try:
57+
client = docker.from_env()
58+
try:
59+
client.ping()
60+
finally:
61+
client.close()
62+
return True
63+
except (DockerException, OSError):
64+
return False
65+
66+
67+
def _wait_for_tcp(host: str, port: int, *, timeout: float) -> None:
68+
deadline = time.monotonic() + timeout
69+
while time.monotonic() < deadline:
70+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
71+
sock.settimeout(2)
72+
if sock.connect_ex((host, port)) == 0:
73+
return
74+
time.sleep(1)
75+
raise RuntimeError(f"{host}:{port} not reachable within {timeout}s")
76+
77+
78+
def _wait_for_ready(base_url: str, *, timeout: float) -> None:
79+
deadline = time.monotonic() + timeout
80+
while time.monotonic() < deadline:
81+
try:
82+
with urllib.request.urlopen(f"{base_url}/health/ready", timeout=2) as response: # noqa: S310
83+
if response.status == 200:
84+
return
85+
except OSError:
86+
pass
87+
time.sleep(2)
88+
raise RuntimeError(f"platform at {base_url} not ready within {timeout}s")
89+
90+
91+
@pytest.fixture(scope="session")
92+
def _clickhouse() -> Iterator[None]:
93+
if not _docker_available():
94+
pytest.skip("Docker not available; required for ClickHouse-backed Intake")
95+
subprocess.run(
96+
["bash", str(REPO_ROOT / "services/intake/scripts/spans/run_clickhouse.sh")],
97+
check=True,
98+
cwd=REPO_ROOT,
99+
)
100+
try:
101+
_wait_for_tcp("localhost", 8123, timeout=60)
102+
yield
103+
finally:
104+
subprocess.run(["docker", "rm", "-f", CLICKHOUSE_CONTAINER], check=False)
105+
106+
107+
@pytest.fixture(scope="session")
108+
def platform_base_url(_clickhouse: None) -> Iterator[str]:
109+
process = subprocess.Popen(
110+
["uv", "run", "nemo", "services", "run", "--services", "auth,entities,intake"],
111+
cwd=REPO_ROOT,
112+
env={**os.environ, "NMP_BASE_URL": BASE_URL},
113+
)
114+
try:
115+
_wait_for_ready(BASE_URL, timeout=180)
116+
yield BASE_URL
117+
finally:
118+
process.terminate()
119+
try:
120+
process.wait(timeout=20)
121+
except subprocess.TimeoutExpired:
122+
process.kill()
123+
124+
125+
def _result() -> AgentEvalResult:
126+
trials = [
127+
AgentEvalTrial(
128+
id="trial-1",
129+
task_id="task-1",
130+
status=AgentEvalTrialStatus.COMPLETED,
131+
output=AgentOutput(output_text="The capital of France is Paris."),
132+
),
133+
AgentEvalTrial(
134+
id="trial-2",
135+
task_id="task-2",
136+
status=AgentEvalTrialStatus.COMPLETED,
137+
output=AgentOutput(output_text="2 + 2 = 4."),
138+
),
139+
]
140+
scores = [
141+
AgentEvalTaskScore(
142+
id="score-1",
143+
run_id=RUN_ID,
144+
task_id="task-1",
145+
trial_id="trial-1",
146+
metric_type="accuracy",
147+
status=AgentEvalScoreStatus.COMPLETED,
148+
outputs=[MetricOutput(name="score", value=1.0), MetricOutput(name="passed", value=True)],
149+
),
150+
AgentEvalTaskScore(
151+
id="score-2",
152+
run_id=RUN_ID,
153+
task_id="task-1",
154+
trial_id="trial-1",
155+
metric_type="judge",
156+
status=AgentEvalScoreStatus.COMPLETED,
157+
outputs=[MetricOutput(name="verdict", value="correct")],
158+
),
159+
AgentEvalTaskScore(
160+
id="score-3",
161+
run_id=RUN_ID,
162+
task_id="task-2",
163+
trial_id="trial-2",
164+
metric_type="accuracy",
165+
status=AgentEvalScoreStatus.COMPLETED,
166+
outputs=[MetricOutput(name="score", value=0.0), MetricOutput(name="passed", value=False)],
167+
),
168+
]
169+
return AgentEvalResult(run_id=RUN_ID, tasks=[], trials=trials, scores=scores, summary=AgentEvalSummary())
170+
171+
172+
async def test_publish_to_intake_round_trip(platform_base_url: str) -> None:
173+
async with AsyncNeMoPlatform(base_url=platform_base_url, max_retries=2) as client:
174+
# Precondition: the Experiment must exist before ingest.
175+
group = await client.experiment_groups.create(
176+
workspace=WORKSPACE, name=GROUP_NAME, description="Intake IT", exist_ok=True
177+
)
178+
await client.experiments.create(
179+
workspace=WORKSPACE,
180+
name=EXPERIMENT_NAME,
181+
experiment_group_id=group.id,
182+
dataset_name="intake-it-dataset",
183+
dataset_version="v1",
184+
exist_ok=True,
185+
)
186+
187+
report = await publish_to_intake(
188+
_result(),
189+
platform=client,
190+
experiment_id=EXPERIMENT_NAME,
191+
workspace=WORKSPACE,
192+
agent_name="intake-it-agent",
193+
model_name="intake-it-model",
194+
)
195+
196+
assert report.trial_count == 2
197+
assert report.evaluator_result_count == 5
198+
published = {trial.trial_id: trial for trial in report.published_trials}
199+
200+
# --- trial-1: trajectory + experiment-context propagation, read back via the Intake API.
201+
t1 = published["trial-1"]
202+
trace_filter: TraceFilterParam = {"session_id": t1.session_id}
203+
traces = [trace async for trace in client.intake.traces.list(workspace=WORKSPACE, filter=trace_filter)]
204+
assert len(traces) == 1
205+
trace = traces[0]
206+
assert trace.session_id == t1.session_id
207+
assert trace.root_span_id == t1.span_id
208+
assert trace.experiment_context is not None
209+
assert trace.experiment_context.experiment_id == EXPERIMENT_NAME
210+
assert trace.experiment_context.test_case_id == "task-1"
211+
212+
# --- trial-1 scores: every field, every data_type coercion.
213+
rows = await client.intake.spans.evaluator_results.list(t1.span_id, workspace=WORKSPACE)
214+
by_name = {row.name: row for row in rows}
215+
assert set(by_name) == {"accuracy.score", "accuracy.passed", "judge.verdict"}
216+
for row in rows:
217+
assert row.session_id == t1.session_id
218+
assert row.span_id == t1.span_id
219+
assert row.workspace == WORKSPACE
220+
assert by_name["accuracy.score"].data_type == "NUMERIC"
221+
assert by_name["accuracy.score"].value == 1.0
222+
assert by_name["accuracy.passed"].data_type == "BOOLEAN"
223+
assert by_name["accuracy.passed"].value == 1.0
224+
assert by_name["judge.verdict"].data_type == "TEXT"
225+
assert by_name["judge.verdict"].string_value == "correct"
226+
227+
# --- trial-2: distinct session/span; BOOLEAN false coerces to 0.0.
228+
t2 = published["trial-2"]
229+
assert t2.session_id != t1.session_id
230+
assert t2.span_id != t1.span_id
231+
rows2 = await client.intake.spans.evaluator_results.list(t2.span_id, workspace=WORKSPACE)
232+
by_name2 = {row.name: row for row in rows2}
233+
assert set(by_name2) == {"accuracy.score", "accuracy.passed"}
234+
assert by_name2["accuracy.passed"].data_type == "BOOLEAN"
235+
assert by_name2["accuracy.passed"].value == 0.0
236+
assert by_name2["accuracy.score"].value == 0.0

0 commit comments

Comments
 (0)