From 462d44cd77ac1685693bd7efbdf5386b23eaaa44 Mon Sep 17 00:00:00 2001 From: CodeForgeNet Date: Mon, 23 Mar 2026 02:24:32 +0530 Subject: [PATCH] fix: support non-UTF-8 encodings in eval data loading Fixes #3670 pd.read_json defaulted to UTF-8 only. Files encoded with utf-8-sig (BOM) raised ValueError: Expected object or value. - Added _detect_encoding() BOM detection in load_data.py, _evaluate.py, _utils.py - Added fallback encoding chain: utf-8, utf-8-sig, latin-1, cp1252 - Improved error messages to show which encodings were attempted - Added test case and utf-8-sig encoded test data file --- .../promptflow/_utils/load_data.py | 14 +- .../promptflow/evals/evaluate/_evaluate.py | 39 +- .../promptflow/evals/evaluate/_utils.py | 485 +++++++++--------- .../unittests/data/utf8sig_test_data.jsonl | 2 + .../tests/evals/unittests/test_evaluate.py | 10 + 5 files changed, 313 insertions(+), 237 deletions(-) create mode 100644 src/promptflow-evals/tests/evals/unittests/data/utf8sig_test_data.jsonl diff --git a/src/promptflow-core/promptflow/_utils/load_data.py b/src/promptflow-core/promptflow/_utils/load_data.py index 806c7d699fa..2ee68a576db 100644 --- a/src/promptflow-core/promptflow/_utils/load_data.py +++ b/src/promptflow-core/promptflow/_utils/load_data.py @@ -12,6 +12,17 @@ module_logger = logging.getLogger(__name__) +def _detect_encoding(file_path: str) -> str: + """Detect BOM markers to identify encoding, defaulting to utf-8.""" + with open(file_path, "rb") as f: + raw = f.read(4) + if raw.startswith(b"\xef\xbb\xbf"): + return "utf-8-sig" + if raw.startswith(b"\xff\xfe") or raw.startswith(b"\xfe\xff"): + return "utf-16" + return "utf-8" + + def _pd_read_file(local_path: str, logger: logging.Logger = None, max_rows_count: int = None) -> "DataFrame": import pandas as pd @@ -34,7 +45,8 @@ def _pd_read_file(local_path: str, logger: logging.Logger = None, max_rows_count elif local_path.endswith(".json"): df = pd.read_json(local_path, dtype=dtype) elif local_path.endswith(".jsonl"): - df = pd.read_json(local_path, dtype=dtype, lines=True, nrows=max_rows_count) + encoding = _detect_encoding(local_path) + df = pd.read_json(local_path, dtype=dtype, lines=True, nrows=max_rows_count, encoding=encoding) elif local_path.endswith(".tsv"): df = pd.read_table(local_path, dtype=dtype, keep_default_na=False, nrows=max_rows_count) elif local_path.endswith(".parquet"): diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_evaluate.py b/src/promptflow-evals/promptflow/evals/evaluate/_evaluate.py index 578236c527f..b8766b89b78 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_evaluate.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_evaluate.py @@ -191,12 +191,39 @@ def _validate_and_load_data(target, data, evaluators, output_path, azure_ai_proj if not isinstance(evaluation_name, str): raise ValueError("evaluation_name must be a string.") - try: - initial_data_df = pd.read_json(data, lines=True) - except Exception as e: - raise ValueError( - f"Failed to load data from {data}. Please validate it is a valid jsonl data. Error: {str(e)}." - ) from e + def _detect_encoding(file_path: str) -> str: + """Detect BOM markers to identify encoding, defaulting to utf-8.""" + with open(file_path, "rb") as f: + raw = f.read(4) + if raw.startswith(b"\xef\xbb\xbf"): + return "utf-8-sig" + if raw.startswith(b"\xff\xfe") or raw.startswith(b"\xfe\xff"): + return "utf-16" + return "utf-8" + + _FALLBACK_ENCODINGS = ["utf-8", "utf-8-sig", "latin-1", "cp1252"] + + try: + detected = _detect_encoding(data) if isinstance(data, str) else "utf-8" + encodings = [detected] + [e for e in _FALLBACK_ENCODINGS if e.lower() != detected.lower()] + last_error = None + for encoding in encodings: + try: + initial_data_df = pd.read_json(data, lines=True, encoding=encoding) + break + except Exception as e: + last_error = e + else: + raise ValueError( + f"Failed to load data from {data}. Tried encodings: {encodings}. " + f"Please validate it is a valid jsonl data. Error: {str(last_error)}." + ) from last_error + except ValueError: + raise + except Exception as e: + raise ValueError( + f"Failed to load data from {data}. Please validate it is a valid jsonl data. Error: {str(e)}." + ) from e return initial_data_df diff --git a/src/promptflow-evals/promptflow/evals/evaluate/_utils.py b/src/promptflow-evals/promptflow/evals/evaluate/_utils.py index fbd0ab27104..d766902d47a 100644 --- a/src/promptflow-evals/promptflow/evals/evaluate/_utils.py +++ b/src/promptflow-evals/promptflow/evals/evaluate/_utils.py @@ -1,230 +1,255 @@ -# --------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# --------------------------------------------------------- -import json -import logging -import os -import re -import tempfile -from collections import namedtuple -from pathlib import Path - -import pandas as pd - -from promptflow.evals._constants import DEFAULT_EVALUATION_RESULTS_FILE_NAME, Prefixes -from promptflow.evals.evaluate._eval_run import EvalRun - -LOGGER = logging.getLogger(__name__) - -AZURE_WORKSPACE_REGEX_FORMAT = ( - "^azureml:[/]{1,2}subscriptions/([^/]+)/resource(groups|Groups)/([^/]+)" - "(/providers/Microsoft.MachineLearningServices)?/workspaces/([^/]+)$" -) - -AzureMLWorkspaceTriad = namedtuple("AzureMLWorkspace", ["subscription_id", "resource_group_name", "workspace_name"]) - - -def is_none(value): - return value is None or str(value).lower() == "none" - - -def extract_workspace_triad_from_trace_provider(trace_provider: str): # pylint: disable=name-too-long - match = re.match(AZURE_WORKSPACE_REGEX_FORMAT, trace_provider) - if not match or len(match.groups()) != 5: - raise ValueError( - "Malformed trace provider string, expected azureml://subscriptions//" - "resourceGroups//providers/Microsoft.MachineLearningServices/" - f"workspaces/, got {trace_provider}" - ) - subscription_id = match.group(1) - resource_group_name = match.group(3) - workspace_name = match.group(5) - return AzureMLWorkspaceTriad(subscription_id, resource_group_name, workspace_name) - - -def load_jsonl(path): - with open(path, "r", encoding="utf-8") as f: - return [json.loads(line) for line in f.readlines()] - - -def _azure_pf_client_and_triad(trace_destination): - from promptflow.azure._cli._utils import _get_azure_pf_client - - ws_triad = extract_workspace_triad_from_trace_provider(trace_destination) - azure_pf_client = _get_azure_pf_client( - subscription_id=ws_triad.subscription_id, - resource_group=ws_triad.resource_group_name, - workspace_name=ws_triad.workspace_name, - ) - - return azure_pf_client, ws_triad - - -def _log_metrics_and_instance_results( - metrics, - instance_results, - trace_destination, - run, - evaluation_name, -) -> str: - if trace_destination is None: - LOGGER.error("Unable to log traces as trace destination was not defined.") - return None - - azure_pf_client, ws_triad = _azure_pf_client_and_triad(trace_destination) - tracking_uri = azure_pf_client.ml_client.workspaces.get(ws_triad.workspace_name).mlflow_tracking_uri - - # Adding line_number as index column this is needed by UI to form link to individual instance run - instance_results["line_number"] = instance_results.index.values - - with EvalRun( - run_name=run.name if run is not None else evaluation_name, - tracking_uri=tracking_uri, - subscription_id=ws_triad.subscription_id, - group_name=ws_triad.resource_group_name, - workspace_name=ws_triad.workspace_name, - ml_client=azure_pf_client.ml_client, - promptflow_run=run, - ) as ev_run: - - artifact_name = EvalRun.EVALUATION_ARTIFACT if run else EvalRun.EVALUATION_ARTIFACT_DUMMY_RUN - - with tempfile.TemporaryDirectory() as tmpdir: - tmp_path = os.path.join(tmpdir, artifact_name) - - with open(tmp_path, "w", encoding="utf-8") as f: - f.write(instance_results.to_json(orient="records", lines=True)) - - ev_run.log_artifact(tmpdir, artifact_name) - - # Using mlflow to create a dummy run since once created via PF show traces of dummy run in UI. - # Those traces can be confusing. - # adding these properties to avoid showing traces if a dummy run is created. - # We are doing that only for the pure evaluation runs. - if run is None: - ev_run.write_properties_to_run_history( - properties={ - "_azureml.evaluation_run": "azure-ai-generative-parent", - "_azureml.evaluate_artifacts": json.dumps([{"path": artifact_name, "type": "table"}]), - "isEvaluatorRun": "true", - } - ) - - for metric_name, metric_value in metrics.items(): - ev_run.log_metric(metric_name, metric_value) - - evaluation_id = ev_run.info.run_name if run is not None else ev_run.info.run_id - return _get_ai_studio_url(trace_destination=trace_destination, evaluation_id=evaluation_id) - - -def _get_ai_studio_url(trace_destination: str, evaluation_id: str) -> str: - ws_triad = extract_workspace_triad_from_trace_provider(trace_destination) - studio_base_url = os.getenv("AI_STUDIO_BASE_URL", "https://ai.azure.com") - - studio_url = ( - f"{studio_base_url}/build/evaluation/{evaluation_id}?wsid=/subscriptions/{ws_triad.subscription_id}" - f"/resourceGroups/{ws_triad.resource_group_name}/providers/Microsoft.MachineLearningServices/" - f"workspaces/{ws_triad.workspace_name}" - ) - - return studio_url - - -def _trace_destination_from_project_scope(project_scope: dict) -> str: - subscription_id = project_scope["subscription_id"] - resource_group_name = project_scope["resource_group_name"] - workspace_name = project_scope["project_name"] - - trace_destination = ( - f"azureml://subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/" - f"providers/Microsoft.MachineLearningServices/workspaces/{workspace_name}" - ) - - return trace_destination - - -def _write_output(path, data_dict): - p = Path(path) - if os.path.isdir(path): - p = p / DEFAULT_EVALUATION_RESULTS_FILE_NAME - - with open(p, "w") as f: - json.dump(data_dict, f) - - -def _apply_column_mapping(source_df: pd.DataFrame, mapping_config: dict, inplace: bool = False) -> pd.DataFrame: - """ - Apply column mapping to source_df based on mapping_config. - - This function is used for pre-validation of input data for evaluators - :param source_df: the data frame to be changed. - :type source_df: pd.DataFrame - :param mapping_config: The configuration, containing column mapping. - :type mapping_config: dict. - :param inplace: If true, the source_df will be changed inplace. - :type inplace: bool - :return: The modified data frame. - """ - result_df = source_df - - if mapping_config: - column_mapping = {} - columns_to_drop = set() - pattern_prefix = "data." - run_outputs_prefix = "run.outputs." - - for map_to_key, map_value in mapping_config.items(): - match = re.search(r"^\${([^{}]+)}$", map_value) - if match is not None: - pattern = match.group(1) - if pattern.startswith(pattern_prefix): - map_from_key = pattern[len(pattern_prefix) :] - elif pattern.startswith(run_outputs_prefix): - # Target-generated columns always starts from .outputs. - map_from_key = f"{Prefixes.TSG_OUTPUTS}{pattern[len(run_outputs_prefix) :]}" - # if we are not renaming anything, skip. - if map_from_key == map_to_key: - continue - # If column needs to be mapped to already existing column, we will add it - # to the drop list. - if map_to_key in source_df.columns: - columns_to_drop.add(map_to_key) - column_mapping[map_from_key] = map_to_key - # If we map column to another one, which is already present in the data - # set and the letter also needs to be mapped, we will not drop it, but map - # instead. - columns_to_drop = columns_to_drop - set(column_mapping.keys()) - result_df = source_df.drop(columns=columns_to_drop, inplace=inplace) - result_df.rename(columns=column_mapping, inplace=True) - - return result_df - - -def _has_aggregator(evaluator): - return hasattr(evaluator, "__aggregate__") - - -def get_int_env_var(env_var_name, default_value=None): - """ - The function `get_int_env_var` retrieves an integer environment variable value, with an optional - default value if the variable is not set or cannot be converted to an integer. - - :param env_var_name: The name of the environment variable you want to retrieve the value of - :param default_value: The default value is the value that will be returned if the environment - variable is not found or if it cannot be converted to an integer - :return: an integer value. - """ - try: - return int(os.environ.get(env_var_name, default_value)) - except Exception: - return default_value - - -def set_event_loop_policy(): - import asyncio - import platform - - if platform.system().lower() == "windows": - # Reference: https://stackoverflow.com/questions/45600579/asyncio-event-loop-is-closed-when-getting-loop - # On Windows seems to be a problem with EventLoopPolicy, use this snippet to work around it - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +import json +import logging +import os +import re +import tempfile +from collections import namedtuple +from pathlib import Path + +import pandas as pd + +from promptflow.evals._constants import DEFAULT_EVALUATION_RESULTS_FILE_NAME, Prefixes +from promptflow.evals.evaluate._eval_run import EvalRun + +LOGGER = logging.getLogger(__name__) + +AZURE_WORKSPACE_REGEX_FORMAT = ( + "^azureml:[/]{1,2}subscriptions/([^/]+)/resource(groups|Groups)/([^/]+)" + "(/providers/Microsoft.MachineLearningServices)?/workspaces/([^/]+)$" +) + +AzureMLWorkspaceTriad = namedtuple("AzureMLWorkspace", ["subscription_id", "resource_group_name", "workspace_name"]) + + +def is_none(value): + return value is None or str(value).lower() == "none" + + +def extract_workspace_triad_from_trace_provider(trace_provider: str): # pylint: disable=name-too-long + match = re.match(AZURE_WORKSPACE_REGEX_FORMAT, trace_provider) + if not match or len(match.groups()) != 5: + raise ValueError( + "Malformed trace provider string, expected azureml://subscriptions//" + "resourceGroups//providers/Microsoft.MachineLearningServices/" + f"workspaces/, got {trace_provider}" + ) + subscription_id = match.group(1) + resource_group_name = match.group(3) + workspace_name = match.group(5) + return AzureMLWorkspaceTriad(subscription_id, resource_group_name, workspace_name) + + +def load_jsonl(path): + """Load jsonl file with BOM detection and fallback encodings.""" + _FALLBACK_ENCODINGS = ["utf-8", "utf-8-sig", "latin-1", "cp1252"] + + def _detect_encoding(file_path: str) -> str: + """Detect BOM markers to identify encoding, defaulting to utf-8.""" + with open(file_path, "rb") as f: + raw = f.read(4) + if raw.startswith(b"\xef\xbb\xbf"): + return "utf-8-sig" + if raw.startswith(b"\xff\xfe") or raw.startswith(b"\xfe\xff"): + return "utf-16" + return "utf-8" + + last_error = None + detected = _detect_encoding(path) + encodings = [detected] + [e for e in _FALLBACK_ENCODINGS if e.lower() != detected.lower()] + for encoding in encodings: + try: + with open(path, "r", encoding=encoding) as f: + return [json.loads(line) for line in f.readlines()] + except (UnicodeDecodeError, json.JSONDecodeError) as e: + last_error = e + continue + + raise ValueError( + f"Failed to load data from {path}. Tried encodings: {encodings}. Error: {str(last_error)}." + ) from last_error + + +def _azure_pf_client_and_triad(trace_destination): + from promptflow.azure._cli._utils import _get_azure_pf_client + + ws_triad = extract_workspace_triad_from_trace_provider(trace_destination) + azure_pf_client = _get_azure_pf_client( + subscription_id=ws_triad.subscription_id, + resource_group=ws_triad.resource_group_name, + workspace_name=ws_triad.workspace_name, + ) + + return azure_pf_client, ws_triad + + +def _log_metrics_and_instance_results( + metrics, + instance_results, + trace_destination, + run, + evaluation_name, +) -> str: + if trace_destination is None: + LOGGER.error("Unable to log traces as trace destination was not defined.") + return None + + azure_pf_client, ws_triad = _azure_pf_client_and_triad(trace_destination) + tracking_uri = azure_pf_client.ml_client.workspaces.get(ws_triad.workspace_name).mlflow_tracking_uri + + # Adding line_number as index column this is needed by UI to form link to individual instance run + instance_results["line_number"] = instance_results.index.values + + with EvalRun( + run_name=run.name if run is not None else evaluation_name, + tracking_uri=tracking_uri, + subscription_id=ws_triad.subscription_id, + group_name=ws_triad.resource_group_name, + workspace_name=ws_triad.workspace_name, + ml_client=azure_pf_client.ml_client, + promptflow_run=run, + ) as ev_run: + + artifact_name = EvalRun.EVALUATION_ARTIFACT if run else EvalRun.EVALUATION_ARTIFACT_DUMMY_RUN + + with tempfile.TemporaryDirectory() as tmpdir: + tmp_path = os.path.join(tmpdir, artifact_name) + + with open(tmp_path, "w", encoding="utf-8") as f: + f.write(instance_results.to_json(orient="records", lines=True)) + + ev_run.log_artifact(tmpdir, artifact_name) + + # Using mlflow to create a dummy run since once created via PF show traces of dummy run in UI. + # Those traces can be confusing. + # adding these properties to avoid showing traces if a dummy run is created. + # We are doing that only for the pure evaluation runs. + if run is None: + ev_run.write_properties_to_run_history( + properties={ + "_azureml.evaluation_run": "azure-ai-generative-parent", + "_azureml.evaluate_artifacts": json.dumps([{"path": artifact_name, "type": "table"}]), + "isEvaluatorRun": "true", + } + ) + + for metric_name, metric_value in metrics.items(): + ev_run.log_metric(metric_name, metric_value) + + evaluation_id = ev_run.info.run_name if run is not None else ev_run.info.run_id + return _get_ai_studio_url(trace_destination=trace_destination, evaluation_id=evaluation_id) + + +def _get_ai_studio_url(trace_destination: str, evaluation_id: str) -> str: + ws_triad = extract_workspace_triad_from_trace_provider(trace_destination) + studio_base_url = os.getenv("AI_STUDIO_BASE_URL", "https://ai.azure.com") + + studio_url = ( + f"{studio_base_url}/build/evaluation/{evaluation_id}?wsid=/subscriptions/{ws_triad.subscription_id}" + f"/resourceGroups/{ws_triad.resource_group_name}/providers/Microsoft.MachineLearningServices/" + f"workspaces/{ws_triad.workspace_name}" + ) + + return studio_url + + +def _trace_destination_from_project_scope(project_scope: dict) -> str: + subscription_id = project_scope["subscription_id"] + resource_group_name = project_scope["resource_group_name"] + workspace_name = project_scope["project_name"] + + trace_destination = ( + f"azureml://subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/" + f"providers/Microsoft.MachineLearningServices/workspaces/{workspace_name}" + ) + + return trace_destination + + +def _write_output(path, data_dict): + p = Path(path) + if os.path.isdir(path): + p = p / DEFAULT_EVALUATION_RESULTS_FILE_NAME + + with open(p, "w") as f: + json.dump(data_dict, f) + + +def _apply_column_mapping(source_df: pd.DataFrame, mapping_config: dict, inplace: bool = False) -> pd.DataFrame: + """ + Apply column mapping to source_df based on mapping_config. + + This function is used for pre-validation of input data for evaluators + :param source_df: the data frame to be changed. + :type source_df: pd.DataFrame + :param mapping_config: The configuration, containing column mapping. + :type mapping_config: dict. + :param inplace: If true, the source_df will be changed inplace. + :type inplace: bool + :return: The modified data frame. + """ + result_df = source_df + + if mapping_config: + column_mapping = {} + columns_to_drop = set() + pattern_prefix = "data." + run_outputs_prefix = "run.outputs." + + for map_to_key, map_value in mapping_config.items(): + match = re.search(r"^\${([^{}]+)}$", map_value) + if match is not None: + pattern = match.group(1) + if pattern.startswith(pattern_prefix): + map_from_key = pattern[len(pattern_prefix) :] + elif pattern.startswith(run_outputs_prefix): + # Target-generated columns always starts from .outputs. + map_from_key = f"{Prefixes.TSG_OUTPUTS}{pattern[len(run_outputs_prefix) :]}" + # if we are not renaming anything, skip. + if map_from_key == map_to_key: + continue + # If column needs to be mapped to already existing column, we will add it + # to the drop list. + if map_to_key in source_df.columns: + columns_to_drop.add(map_to_key) + column_mapping[map_from_key] = map_to_key + # If we map column to another one, which is already present in the data + # set and the letter also needs to be mapped, we will not drop it, but map + # instead. + columns_to_drop = columns_to_drop - set(column_mapping.keys()) + result_df = source_df.drop(columns=columns_to_drop, inplace=inplace) + result_df.rename(columns=column_mapping, inplace=True) + + return result_df + + +def _has_aggregator(evaluator): + return hasattr(evaluator, "__aggregate__") + + +def get_int_env_var(env_var_name, default_value=None): + """ + The function `get_int_env_var` retrieves an integer environment variable value, with an optional + default value if the variable is not set or cannot be converted to an integer. + + :param env_var_name: The name of the environment variable you want to retrieve the value of + :param default_value: The default value is the value that will be returned if the environment + variable is not found or if it cannot be converted to an integer + :return: an integer value. + """ + try: + return int(os.environ.get(env_var_name, default_value)) + except Exception: + return default_value + + +def set_event_loop_policy(): + import asyncio + import platform + + if platform.system().lower() == "windows": + # Reference: https://stackoverflow.com/questions/45600579/asyncio-event-loop-is-closed-when-getting-loop + # On Windows seems to be a problem with EventLoopPolicy, use this snippet to work around it + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) diff --git a/src/promptflow-evals/tests/evals/unittests/data/utf8sig_test_data.jsonl b/src/promptflow-evals/tests/evals/unittests/data/utf8sig_test_data.jsonl new file mode 100644 index 00000000000..a606a194967 --- /dev/null +++ b/src/promptflow-evals/tests/evals/unittests/data/utf8sig_test_data.jsonl @@ -0,0 +1,2 @@ +{"question": "\u3053\u3093\u306b\u3061\u306f", "answer": "hello", "ground_truth": "hello"} +{"question": "caf\u00e9", "answer": "coffee", "ground_truth": "coffee"} diff --git a/src/promptflow-evals/tests/evals/unittests/test_evaluate.py b/src/promptflow-evals/tests/evals/unittests/test_evaluate.py index 8f5d38db1d3..ca16ff9a9c5 100644 --- a/src/promptflow-evals/tests/evals/unittests/test_evaluate.py +++ b/src/promptflow-evals/tests/evals/unittests/test_evaluate.py @@ -123,6 +123,16 @@ def test_evaluate_invalid_jsonl_data(self, mock_model_config, invalid_jsonl_file assert "Failed to load data from " in exc_info.value.args[0] assert "Please validate it is a valid jsonl data" in exc_info.value.args[0] + def test_evaluate_utf8_sig_encoding(self): + """Test that utf-8-sig (BOM) encoded jsonl files load correctly. Fixes #3670.""" + utf8sig_file = _get_file("utf8sig_test_data.jsonl") + # Should NOT raise - previously crashed with ValueError: Expected object or value + result = evaluate( + data=utf8sig_file, + evaluators={"f1": F1ScoreEvaluator()}, + ) + assert result is not None + def test_evaluate_missing_required_inputs(self, missing_columns_jsonl_file): with pytest.raises(ValueError) as exc_info: evaluate(data=missing_columns_jsonl_file, evaluators={"g": F1ScoreEvaluator()})