Skip to content

Commit 4dd018a

Browse files
GovindhKishoresjrl
andauthored
feat(evaluators): add native run_async support to LLMEvaluator (#11581)
Co-authored-by: Sebastian Husch Lee <10526848+sjrl@users.noreply.github.com>
1 parent 6c153cc commit 4dd018a

7 files changed

Lines changed: 434 additions & 1 deletion

File tree

haystack/components/evaluators/context_relevance.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,39 @@ def run(self, **inputs: Any) -> dict[str, Any]:
174174
- `results`: A list of dictionaries with `relevant_statements` and `score` for each input context.
175175
"""
176176
result = super(ContextRelevanceEvaluator, self).run(**inputs) # noqa: UP008
177+
# Post-process the raw results to calculate relevance metrics and scores
178+
return self._postprocess_results(result)
177179

180+
@component.output_types(score=float, results=list[dict[str, Any]])
181+
async def run_async(self, **inputs: Any) -> dict[str, Any]:
182+
"""
183+
Run the LLM evaluator asynchronously.
184+
185+
:param questions:
186+
A list of questions.
187+
:param contexts:
188+
A list of lists of contexts. Each list of contexts corresponds to one question.
189+
:returns:
190+
A dictionary with the following outputs:
191+
- `score`: Mean context relevance score over all the provided input questions.
192+
- `results`: A list of dictionaries with `relevant_statements` and `score` for each input context.
193+
"""
194+
result = await super(ContextRelevanceEvaluator, self).run_async(**inputs) # noqa: UP008
195+
# Post-process the raw results to calculate relevance metrics and scores
196+
return self._postprocess_results(result)
197+
198+
def _postprocess_results(self, result: dict[str, Any]) -> dict[str, Any]:
199+
"""
200+
Post-processes raw LLM evaluator outputs to compute context relevance scores.
201+
202+
Calculates binary scores based on whether relevant statements were found,
203+
averages the scores across all successful queries, and updates the result payload.
204+
205+
:param result:
206+
The raw evaluation dictionary from the base LLM evaluator.
207+
:returns:
208+
The updated dictionary containing final scores and tracking metrics.
209+
"""
178210
for idx, res in enumerate(result["results"]):
179211
if res is None:
180212
result["results"][idx] = {"relevant_statements": [], "score": float("nan")}

haystack/components/evaluators/faithfulness.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def __init__(
149149
progress_bar=progress_bar,
150150
)
151151

152-
@component.output_types(individual_scores=list[int], score=float, results=list[dict[str, Any]])
152+
@component.output_types(individual_scores=list[float], score=float, results=list[dict[str, Any]])
153153
def run(self, **inputs: Any) -> dict[str, Any]:
154154
"""
155155
Run the LLM evaluator.
@@ -167,6 +167,42 @@ def run(self, **inputs: Any) -> dict[str, Any]:
167167
- `results`: A list of dictionaries with `statements` and `statement_scores` for each input answer.
168168
"""
169169
result = super(FaithfulnessEvaluator, self).run(**inputs) # noqa: UP008
170+
# Post-process the raw results to calculate relevance metrics and scores
171+
return self._postprocess_results(result)
172+
173+
@component.output_types(individual_scores=list[float], score=float, results=list[dict[str, Any]])
174+
async def run_async(self, **inputs: Any) -> dict[str, Any]:
175+
"""
176+
Run the LLM evaluator asynchronously.
177+
178+
:param questions:
179+
A list of questions.
180+
:param contexts:
181+
A nested list of contexts that correspond to the questions.
182+
:param predicted_answers:
183+
A list of predicted answers.
184+
:returns:
185+
A dictionary with the following outputs:
186+
- `score`: Mean faithfulness score over all the provided input answers.
187+
- `individual_scores`: A list of faithfulness scores for each input answer.
188+
- `results`: A list of dictionaries with `statements` and `statement_scores` for each input answer.
189+
"""
190+
result = await super(FaithfulnessEvaluator, self).run_async(**inputs) # noqa: UP008
191+
# Post-process the raw results to calculate relevance metrics and scores
192+
return self._postprocess_results(result)
193+
194+
def _postprocess_results(self, result: dict[str, Any]) -> dict[str, Any]:
195+
"""
196+
Post-processes raw LLM evaluator outputs to compute faithfulness scores.
197+
198+
Calculates statement-level score averages, computes the overall mean faithfulness
199+
score across successful queries, and updates the result payload.
200+
201+
:param result:
202+
The raw evaluation dictionary from the base LLM evaluator.
203+
:returns:
204+
The updated dictionary containing final scores and tracking metrics.
205+
"""
170206

171207
# calculate average statement faithfulness score per query
172208
for idx, res in enumerate(result["results"]):

haystack/components/evaluators/llm_evaluator.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5+
import asyncio
56
import json
67
from typing import Any
78

89
from tqdm import tqdm
10+
from tqdm.asyncio import tqdm as async_tqdm
911

1012
from haystack import component, default_from_dict, default_to_dict, logging
1113
from haystack.components.builders import PromptBuilder
@@ -240,6 +242,84 @@ def run(self, **inputs: Any) -> dict[str, Any]:
240242

241243
return {"results": results, "meta": metadata or None}
242244

245+
@component.output_types(results=list[dict[str, Any]])
246+
async def run_async(self, **inputs: Any) -> dict[str, Any]:
247+
"""
248+
Run the LLM evaluator asynchronously
249+
250+
:param inputs:
251+
The input values to evaluate. The keys are the input names and the values are lists of input values.
252+
:returns:
253+
A dictionary with a `results` entry that contains a list of results.
254+
Each result is a dictionary containing the keys as defined in the `outputs` parameter of the LLMEvaluator
255+
and the evaluation results as the values. If an exception occurs for a particular input value, the result
256+
will be `None` for that entry.
257+
If the API is "openai" and the response contains a "meta" key, the metadata from OpenAI will be included
258+
in the output dictionary, under the key "meta".
259+
:raises TypeError:
260+
If the chat generator does not support async execution.
261+
:raises ValueError:
262+
Only in the case that `raise_on_failure` is set to True and the received inputs are not lists or have
263+
different lengths, or if the output is not a valid JSON or doesn't contain the expected keys.
264+
"""
265+
266+
if not self._is_warmed_up:
267+
self.warm_up()
268+
269+
self.validate_input_parameters(dict(self.inputs), inputs)
270+
271+
# inputs is a dictionary with keys being input names and values being a list of input values
272+
# We need to iterate through the lists in parallel for all keys of the dictionary
273+
input_names, values = inputs.keys(), list(zip(*inputs.values(), strict=True))
274+
list_of_input_names_to_values = [dict(zip(input_names, v, strict=True)) for v in values]
275+
276+
results: list[dict[str, Any] | None] = []
277+
metadata = []
278+
errors = 0
279+
280+
generator_has_async = hasattr(self._chat_generator, "run_async")
281+
for input_names_to_values in async_tqdm(list_of_input_names_to_values, disable=not self.progress_bar):
282+
prompt = self.builder.run(**input_names_to_values)
283+
messages = [ChatMessage.from_user(prompt["prompt"])]
284+
try:
285+
if generator_has_async:
286+
result = await self._chat_generator.run_async(messages=messages) # type: ignore[attr-defined]
287+
else:
288+
logger.debug(
289+
"{generator_type} does not implement 'run_async'."
290+
" Running the synchronous 'run' method in a thread to avoid blocking the event loop.",
291+
generator_type=type(self._chat_generator).__name__,
292+
)
293+
result = await asyncio.to_thread(self._chat_generator.run, messages=messages)
294+
except Exception as e:
295+
if self.raise_on_failure:
296+
raise ValueError(f"Error while generating response for prompt: {prompt}. Error: {e}") from e
297+
logger.warning("Error while generating response for prompt: {prompt}. Error: {e}", prompt=prompt, e=e)
298+
results.append(None)
299+
errors += 1
300+
continue
301+
302+
parsed_result = _parse_dict_from_json(
303+
result["replies"][0].text, expected_keys=self.outputs, raise_on_failure=self.raise_on_failure
304+
)
305+
if parsed_result is None:
306+
results.append(None)
307+
errors += 1
308+
else:
309+
results.append(parsed_result)
310+
311+
if result["replies"][0].meta:
312+
metadata.append(result["replies"][0].meta)
313+
314+
if errors > 0:
315+
logger.warning(
316+
"LLM evaluator failed for {errors} out of {len(list_of_input_names_to_values)} inputs.",
317+
errors=errors,
318+
len=len(list_of_input_names_to_values),
319+
)
320+
321+
return {"results": results, "meta": metadata or None}
322+
243323
def prepare_template(self) -> str:
244324
"""
245325
Prepare the prompt template.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
features:
3+
- |
4+
Added native asynchronous support (``run_async``) to ``LLMEvaluator``, ``FaithfulnessEvaluator``, and ``ContextRelevanceEvaluator``. This allows concurrent evaluation loops inside async applications like FastMCP or FastAPI without blocking the main event loop, while automatically falling back to thread workers for synchronous chat generators.

test/components/evaluators/test_context_relevance_evaluator.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,3 +262,84 @@ def test_live_run(self):
262262
assert "prompt_tokens" in result["meta"][0]["usage"]
263263
assert "completion_tokens" in result["meta"][0]["usage"]
264264
assert "total_tokens" in result["meta"][0]["usage"]
265+
266+
267+
class TestContextRelevanceEvaluatorAsync:
268+
@pytest.mark.asyncio
269+
async def test_run_async_calculates_mean_score(self, monkeypatch):
270+
monkeypatch.setenv("OPENAI_API_KEY", "test-api-key")
271+
component = ContextRelevanceEvaluator()
272+
273+
async def chat_generator_run_async(self, *args, **kwargs):
274+
if "Football" in kwargs["messages"][0].text:
275+
return {"replies": [ChatMessage.from_assistant('{"relevant_statements": ["a", "b"], "score": 1}')]}
276+
return {"replies": [ChatMessage.from_assistant('{"relevant_statements": [], "score": 0}')]}
277+
278+
monkeypatch.setattr(
279+
"haystack.components.evaluators.llm_evaluator.OpenAIChatGenerator.run_async", chat_generator_run_async
280+
)
281+
282+
questions = ["Which is the most popular global sport?", "Who created the Python language?"]
283+
contexts = [
284+
["Football is the world's most popular sport."],
285+
["Python is a cross-platform programming language."],
286+
]
287+
288+
results = await component.run_async(questions=questions, contexts=contexts)
289+
290+
assert results == {
291+
"results": [{"score": 1, "relevant_statements": ["a", "b"]}, {"score": 0, "relevant_statements": []}],
292+
"score": 0.5,
293+
"meta": None,
294+
"individual_scores": [1, 0],
295+
}
296+
297+
@pytest.mark.asyncio
298+
async def test_run_async_returns_nan_raise_on_failure_false(self, monkeypatch, caplog):
299+
monkeypatch.setenv("OPENAI_API_KEY", "test-api-key")
300+
component = ContextRelevanceEvaluator(raise_on_failure=False)
301+
302+
async def chat_generator_run_async(self, *args, **kwargs):
303+
if "Python" in kwargs["messages"][0].text:
304+
raise Exception("OpenAI API request failed.")
305+
return {"replies": [ChatMessage.from_assistant('{"relevant_statements": ["c", "d"], "score": 1}')]}
306+
307+
monkeypatch.setattr(
308+
"haystack.components.evaluators.llm_evaluator.OpenAIChatGenerator.run_async", chat_generator_run_async
309+
)
310+
311+
questions = ["Which is the most popular global sport?", "Who created the Python language?"]
312+
contexts = [["Football is popular."], ["Python was created by Guido van Rossum."]]
313+
314+
with caplog.at_level("WARNING", logger="haystack.components.evaluators.context_relevance"):
315+
results = await component.run_async(questions=questions, contexts=contexts)
316+
317+
assert results["score"] == 1
318+
assert results["results"][0] == {"relevant_statements": ["c", "d"], "score": 1}
319+
assert results["results"][1]["relevant_statements"] == []
320+
assert math.isnan(results["results"][1]["score"])
321+
322+
assert "1 query(s) failed and were excluded from the score." in caplog.text
323+
324+
@pytest.mark.asyncio
325+
@pytest.mark.skipif(
326+
not os.environ.get("OPENAI_API_KEY", None),
327+
reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.",
328+
)
329+
@pytest.mark.integration
330+
async def test_live_run_async(self):
331+
questions = ["Who created the Python language?"]
332+
contexts = [["Python, created by Guido van Rossum, is a high-level general-purpose programming language."]]
333+
334+
evaluator = ContextRelevanceEvaluator(chat_generator=OpenAIChatGenerator(model="gpt-4.1-nano"))
335+
result = await evaluator.run_async(questions=questions, contexts=contexts)
336+
337+
required_fields = {"results"}
338+
assert all(field in result for field in required_fields)
339+
nested_required_fields = {"score", "relevant_statements"}
340+
assert all(field in result["results"][0] for field in nested_required_fields)
341+
342+
assert "meta" in result
343+
assert "prompt_tokens" in result["meta"][0]["usage"]
344+
assert "completion_tokens" in result["meta"][0]["usage"]
345+
assert "total_tokens" in result["meta"][0]["usage"]

test/components/evaluators/test_faithfulness_evaluator.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,3 +320,87 @@ def test_live_run(self):
320320
assert "prompt_tokens" in result["meta"][0]["usage"]
321321
assert "completion_tokens" in result["meta"][0]["usage"]
322322
assert "total_tokens" in result["meta"][0]["usage"]
323+
324+
325+
class TestFaithfulnessEvaluatorAsync:
326+
@pytest.mark.asyncio
327+
async def test_run_async_calculates_mean_score(self, monkeypatch):
328+
monkeypatch.setenv("OPENAI_API_KEY", "test-api-key")
329+
component = FaithfulnessEvaluator()
330+
331+
async def chat_generator_run_async(self, *args, **kwargs):
332+
if "Football" in kwargs["messages"][0].text:
333+
return {
334+
"replies": [ChatMessage.from_assistant('{"statements": ["a", "b"], "statement_scores": [1, 0]}')]
335+
}
336+
return {"replies": [ChatMessage.from_assistant('{"statements": ["c", "d"], "statement_scores": [1, 1]}')]}
337+
338+
monkeypatch.setattr(
339+
"haystack.components.evaluators.llm_evaluator.OpenAIChatGenerator.run_async", chat_generator_run_async
340+
)
341+
342+
questions = ["Which is the most popular global sport?", "Who created the Python language?"]
343+
contexts = [["Football is the world's most popular sport."], ["Python was created by Guido van Rossum."]]
344+
predicted_answers = ["Football is the most popular sport.", "Python is a language created by George Lucas."]
345+
results = await component.run_async(questions=questions, contexts=contexts, predicted_answers=predicted_answers)
346+
assert results == {
347+
"individual_scores": [0.5, 1.0],
348+
"results": [
349+
{"score": 0.5, "statement_scores": [1, 0], "statements": ["a", "b"]},
350+
{"score": 1.0, "statement_scores": [1, 1], "statements": ["c", "d"]},
351+
],
352+
"score": 0.75,
353+
"meta": None,
354+
}
355+
356+
@pytest.mark.asyncio
357+
async def test_run_async_returns_nan_raise_on_failure_false(self, monkeypatch, caplog):
358+
monkeypatch.setenv("OPENAI_API_KEY", "test-api-key")
359+
component = FaithfulnessEvaluator(raise_on_failure=False)
360+
361+
async def chat_generator_run_async(self, *args, **kwargs):
362+
if "Python" in kwargs["messages"][0].text:
363+
raise Exception("OpenAI API request failed.")
364+
return {"replies": [ChatMessage.from_assistant('{"statements": ["c", "d"], "statement_scores": [1, 1]}')]}
365+
366+
monkeypatch.setattr(
367+
"haystack.components.evaluators.llm_evaluator.OpenAIChatGenerator.run_async", chat_generator_run_async
368+
)
369+
370+
questions = ["Which is the most popular global sport?", "Who created the Python language?"]
371+
contexts = [["Football is popular."], ["Python was created by Guido."]]
372+
predicted_answers = ["Football is popular.", "Guido van Rossum."]
373+
374+
with caplog.at_level("WARNING", logger="haystack.components.evaluators.faithfulness"):
375+
results = await component.run_async(
376+
questions=questions, contexts=contexts, predicted_answers=predicted_answers
377+
)
378+
379+
assert results["score"] == 1.0
380+
assert results["individual_scores"][0] == 1.0
381+
assert math.isnan(results["individual_scores"][1])
382+
assert "1 query(s) failed and were excluded from the score." in caplog.text
383+
384+
@pytest.mark.asyncio
385+
@pytest.mark.skipif(
386+
not os.environ.get("OPENAI_API_KEY", None),
387+
reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.",
388+
)
389+
@pytest.mark.integration
390+
async def test_live_run_async(self):
391+
questions = ["What is Python and who created it?"]
392+
contexts = [["Python is a programming language created by Guido van Rossum."]]
393+
predicted_answers = ["Python is a programming language created by George Lucas."]
394+
evaluator = FaithfulnessEvaluator(chat_generator=OpenAIChatGenerator(model="gpt-4.1-nano"))
395+
result = await evaluator.run_async(questions=questions, contexts=contexts, predicted_answers=predicted_answers)
396+
397+
required_fields = {"individual_scores", "results", "score"}
398+
assert all(field in result for field in required_fields)
399+
nested_required_fields = {"score", "statement_scores", "statements"}
400+
assert all(field in result["results"][0] for field in nested_required_fields)
401+
402+
# assert that metadata is present in the result
403+
assert "meta" in result
404+
assert "prompt_tokens" in result["meta"][0]["usage"]
405+
assert "completion_tokens" in result["meta"][0]["usage"]
406+
assert "total_tokens" in result["meta"][0]["usage"]

0 commit comments

Comments
 (0)