From c9ca227d51535d0ada509130df7e64812c0ef279 Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Wed, 21 May 2025 09:55:03 +0000 Subject: [PATCH 1/8] Sync Fabric till 2cd1c3da --- .coveragerc | 2 + flaml/automl/automl.py | 58 ++- flaml/automl/data.py | 337 +++++++++++++- flaml/automl/logger.py | 34 +- flaml/fabric/mlflow.py | 499 +++++++++++++++++---- flaml/tune/logger.py | 37 ++ flaml/tune/spark/utils.py | 22 + flaml/tune/tune.py | 16 +- pytest.ini | 3 + test/automl/test_extra_models.py | 2 + test/automl/test_forecast.py | 5 +- test/automl/test_mlflow.py | 12 + test/nlp/test_autohf_classificationhead.py | 2 + test/nlp/test_autohf_cv.py | 2 + test/nlp/test_default.py | 4 + test/spark/test_0sparkml.py | 81 +++- test/spark/test_automl.py | 2 +- test/spark/test_ensemble.py | 3 + test/spark/test_exceptions.py | 2 +- test/spark/test_mlflow.py | 1 + test/spark/test_multiclass.py | 2 + test/spark/test_notebook.py | 2 +- test/spark/test_overtime.py | 2 +- test/spark/test_performance.py | 2 +- test/spark/test_tune.py | 2 +- test/spark/test_utils.py | 2 +- 26 files changed, 1012 insertions(+), 124 deletions(-) create mode 100644 flaml/tune/logger.py create mode 100644 pytest.ini diff --git a/.coveragerc b/.coveragerc index 91c9b36bc3..05ab59fb37 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,5 +1,7 @@ [run] branch = True source = flaml +concurrency = multiprocessing +parallel = true omit = *test* diff --git a/flaml/automl/automl.py b/flaml/automl/automl.py index 3233a1cffa..0f85046d55 100644 --- a/flaml/automl/automl.py +++ b/flaml/automl/automl.py @@ -10,6 +10,7 @@ import random import sys import time +from concurrent.futures import as_completed from functools import partial from typing import Callable, List, Optional, Union @@ -187,7 +188,8 @@ def custom_metric( mem_thres: A float of the memory size constraint in bytes. pred_time_limit: A float of the prediction latency constraint in seconds. It refers to the average prediction time per row in validation data. - train_time_limit: A float of the training time constraint in seconds. + train_time_limit: None or a float of the training time constraint in seconds for each trial. + Only valid for sequential search. verbose: int, default=3 | Controls the verbosity, higher means more messages. retrain_full: bool or str, default=True | whether to retrain the @@ -1334,7 +1336,8 @@ def custom_metric( mem_thres: A float of the memory size constraint in bytes. pred_time_limit: A float of the prediction latency constraint in seconds. It refers to the average prediction time per row in validation data. - train_time_limit: None or a float of the training time constraint in seconds. + train_time_limit: None or a float of the training time constraint in seconds for each trial. + Only valid for sequential search. X_val: None or a numpy array or a pandas dataframe of validation data. y_val: None or a numpy array or a pandas series of validation labels. sample_weight_val: None or a numpy array of the sample weight of @@ -1625,6 +1628,13 @@ def cv_score_agg_func(val_loss_folds, log_metrics_folds): _ch.setFormatter(logger_formatter) logger.addHandler(_ch) + if model_history: + logger.warning( + "With `model_history` set to `True` by default, all intermediate models are retained in memory, " + "which may significantly increase memory usage and slow down training. " + "Consider setting `model_history=False` to optimize memory and accelerate the training process." + ) + if not use_ray and not use_spark and n_concurrent_trials > 1: if ray_available: logger.warning( @@ -2717,16 +2727,42 @@ def _search(self): ): if mlflow.active_run() is None: mlflow.start_run(run_id=self.mlflow_integration.parent_run_id) - self.mlflow_integration.log_model( - self._trained_estimator.model, - self.best_estimator, - signature=self.estimator_signature, - ) - self.mlflow_integration.pickle_and_log_automl_artifacts( - self, self.model, self.best_estimator, signature=self.pipeline_signature - ) + if self.best_estimator.endswith("_spark"): + self.mlflow_integration.log_model( + self._trained_estimator.model, + self.best_estimator, + signature=self.estimator_signature, + run_id=self.mlflow_integration.parent_run_id, + ) + else: + self.mlflow_integration.pickle_and_log_automl_artifacts( + self, + self.model, + self.best_estimator, + signature=self.pipeline_signature, + run_id=self.mlflow_integration.parent_run_id, + ) else: - logger.info("not retraining because the time budget is too small.") + logger.warning("not retraining because the time budget is too small.") + if self.mlflow_integration is not None: + logger.debug("Collecting results from submitted record_state tasks") + t1 = time.perf_counter() + for future in as_completed(self.mlflow_integration.futures): + _task = self.mlflow_integration.futures[future] + try: + result = future.result() + logger.debug(f"Result for record_state task {_task}: {result}") + except Exception as e: + logger.warning(f"Exception for record_state task {_task}: {e}") + for future in as_completed(self.mlflow_integration.futures_log_model): + _task = self.mlflow_integration.futures_log_model[future] + try: + result = future.result() + logger.debug(f"Result for log_model task {_task}: {result}") + except Exception as e: + logger.warning(f"Exception for log_model task {_task}: {e}") + t2 = time.perf_counter() + logger.debug(f"Collecting results from tasks submitted to executors costs {t2-t1} seconds.") def __del__(self): if ( diff --git a/flaml/automl/data.py b/flaml/automl/data.py index 747236dad5..b7c04bd345 100644 --- a/flaml/automl/data.py +++ b/flaml/automl/data.py @@ -2,13 +2,17 @@ # * Copyright (c) Microsoft Corporation. All rights reserved. # * Licensed under the MIT License. See LICENSE file in the # * project root for license information. +import json import os -from datetime import datetime +import random +import uuid +from datetime import datetime, timedelta +from decimal import ROUND_HALF_UP, Decimal from typing import TYPE_CHECKING, Union import numpy as np -from flaml.automl.spark import DataFrame, Series, pd, ps, psDataFrame, psSeries +from flaml.automl.spark import DataFrame, F, Series, T, pd, ps, psDataFrame, psSeries from flaml.automl.training_log import training_log_reader try: @@ -19,6 +23,7 @@ if TYPE_CHECKING: from flaml.automl.task import Task + TS_TIMESTAMP_COL = "ds" TS_VALUE_COL = "y" @@ -445,3 +450,331 @@ def transform(self, X: Union[DataFrame, np.array]): def group_counts(groups): _, i, c = np.unique(groups, return_counts=True, return_index=True) return c[np.argsort(i)] + + +def get_random_dataframe(n_rows: int = 200, ratio_none: float = 0.1, seed: int = 42) -> pd.DataFrame: + """Generate a random pandas DataFrame with various data types for testing. + This function creates a DataFrame with multiple column types including: + - Timestamps + - Integers + - Floats + - Categorical values + - Booleans + - Lists (tags) + - Decimal strings + - UUIDs + - Binary data (as hex strings) + - JSON blobs + - Nullable text fields + Parameters + ---------- + n_rows : int, default=200 + Number of rows in the generated DataFrame + ratio_none : float, default=0.1 + Probability of generating None values in applicable columns + seed : int, default=42 + Random seed for reproducibility + Returns + ------- + pd.DataFrame + A DataFrame with 14 columns of various data types + Examples + -------- + >>> df = get_random_dataframe(100, 0.05, 123) + >>> df.shape + (100, 14) + >>> df.dtypes + timestamp datetime64[ns] + id int64 + score float64 + status object + flag object + count object + value object + tags object + rating object + uuid object + binary object + json_blob object + category category + nullable_text object + dtype: object + """ + + np.random.seed(seed) + random.seed(seed) + + def random_tags(): + tags = ["AI", "ML", "data", "robotics", "vision"] + return random.sample(tags, k=random.randint(1, 3)) if random.random() > ratio_none else None + + def random_decimal(): + return ( + str(Decimal(random.uniform(1, 5)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)) + if random.random() > ratio_none + else None + ) + + def random_json_blob(): + blob = {"a": random.randint(1, 10), "b": random.random()} + return json.dumps(blob) if random.random() > ratio_none else None + + def random_binary(): + return bytes(random.randint(0, 255) for _ in range(4)).hex() if random.random() > ratio_none else None + + data = { + "timestamp": [ + datetime(2020, 1, 1) + timedelta(days=np.random.randint(0, 1000)) if np.random.rand() > ratio_none else None + for _ in range(n_rows) + ], + "id": range(1, n_rows + 1), + "score": np.random.uniform(0, 100, n_rows), + "status": np.random.choice( + ["active", "inactive", "pending", None], + size=n_rows, + p=[(1 - ratio_none) / 3, (1 - ratio_none) / 3, (1 - ratio_none) / 3, ratio_none], + ), + "flag": np.random.choice( + [True, False, None], size=n_rows, p=[(1 - ratio_none) / 2, (1 - ratio_none) / 2, ratio_none] + ), + "count": [np.random.randint(0, 100) if np.random.rand() > ratio_none else None for _ in range(n_rows)], + "value": [round(np.random.normal(50, 15), 2) if np.random.rand() > ratio_none else None for _ in range(n_rows)], + "tags": [random_tags() for _ in range(n_rows)], + "rating": [random_decimal() for _ in range(n_rows)], + "uuid": [str(uuid.uuid4()) if np.random.rand() > ratio_none else None for _ in range(n_rows)], + "binary": [random_binary() for _ in range(n_rows)], + "json_blob": [random_json_blob() for _ in range(n_rows)], + "category": pd.Categorical( + np.random.choice( + ["A", "B", "C", None], + size=n_rows, + p=[(1 - ratio_none) / 3, (1 - ratio_none) / 3, (1 - ratio_none) / 3, ratio_none], + ) + ), + "nullable_text": [random.choice(["Good", "Bad", "Average", None]) for _ in range(n_rows)], + } + + return pd.DataFrame(data) + + +def auto_convert_dtypes_spark( + df: psDataFrame, + na_values: list = None, + category_threshold: float = 0.3, + convert_threshold: float = 0.6, + sample_ratio: float = 0.1, +) -> tuple[psDataFrame, dict]: + """Automatically convert data types in a PySpark DataFrame using heuristics. + + This function analyzes a sample of the DataFrame to infer appropriate data types + and applies the conversions. It handles timestamps, numeric values, booleans, + and categorical fields. + + Args: + df: A PySpark DataFrame to convert. + na_values: List of strings to be considered as NA/NaN. Defaults to + ['NA', 'na', 'NULL', 'null', '']. + category_threshold: Maximum ratio of unique values to total values + to consider a column categorical. Defaults to 0.3. + convert_threshold: Minimum ratio of successfully converted values required + to apply a type conversion. Defaults to 0.6. + sample_ratio: Fraction of data to sample for type inference. Defaults to 0.1. + + Returns: + tuple: (The DataFrame with converted types, A dictionary mapping column names to + their inferred types as strings) + + Note: + - 'category' in the schema dict is conceptual as PySpark doesn't have a true + category type like pandas + - The function uses sampling for efficiency with large datasets + """ + n_rows = df.count() + if na_values is None: + na_values = ["NA", "na", "NULL", "null", ""] + + # Normalize NA-like values + for colname, coltype in df.dtypes: + if coltype == "string": + df = df.withColumn( + colname, + F.when(F.trim(F.lower(F.col(colname))).isin([v.lower() for v in na_values]), None).otherwise( + F.col(colname) + ), + ) + + schema = {} + for colname in df.columns: + # Sample once at an appropriate ratio + sample_ratio_to_use = min(1.0, sample_ratio if n_rows * sample_ratio > 100 else 100 / n_rows) + col_sample = df.select(colname).sample(withReplacement=False, fraction=sample_ratio_to_use).dropna() + sample_count = col_sample.count() + + inferred_type = "string" # Default + + if col_sample.dtypes[0][1] != "string": + schema[colname] = col_sample.dtypes[0][1] + continue + + if sample_count == 0: + schema[colname] = "string" + continue + + # Check if timestamp + ts_col = col_sample.withColumn("parsed", F.to_timestamp(F.col(colname))) + + # Check numeric + if ( + col_sample.withColumn("n", F.col(colname).cast("double")).filter("n is not null").count() + >= sample_count * convert_threshold + ): + # All whole numbers? + all_whole = ( + col_sample.withColumn("n", F.col(colname).cast("double")) + .filter("n is not null") + .withColumn("frac", F.abs(F.col("n") % 1)) + .filter("frac > 0.000001") + .count() + == 0 + ) + inferred_type = "int" if all_whole else "double" + + # Check low-cardinality (category-like) + elif ( + sample_count > 0 + and col_sample.select(F.countDistinct(F.col(colname))).collect()[0][0] / sample_count <= category_threshold + ): + inferred_type = "category" # Will just be string, but marked as such + + # Check if timestamp + elif ts_col.filter(F.col("parsed").isNotNull()).count() >= sample_count * convert_threshold: + inferred_type = "timestamp" + + schema[colname] = inferred_type + + # Apply inferred schema + for colname, inferred_type in schema.items(): + if inferred_type == "int": + df = df.withColumn(colname, F.col(colname).cast(T.IntegerType())) + elif inferred_type == "double": + df = df.withColumn(colname, F.col(colname).cast(T.DoubleType())) + elif inferred_type == "boolean": + df = df.withColumn( + colname, + F.when(F.lower(F.col(colname)).isin("true", "yes", "1"), True) + .when(F.lower(F.col(colname)).isin("false", "no", "0"), False) + .otherwise(None), + ) + elif inferred_type == "timestamp": + df = df.withColumn(colname, F.to_timestamp(F.col(colname))) + elif inferred_type == "category": + df = df.withColumn(colname, F.col(colname).cast(T.StringType())) # Marked conceptually + + # otherwise keep as string (or original type) + + return df, schema + + +def auto_convert_dtypes_pandas( + df: pd.DataFrame, + na_values: list = None, + category_threshold: float = 0.3, + convert_threshold: float = 0.6, + sample_ratio: float = 1.0, +) -> tuple[pd.DataFrame, dict]: + """Automatically convert data types in a pandas DataFrame using heuristics. + + This function analyzes the DataFrame to infer appropriate data types + and applies the conversions. It handles timestamps, timedeltas, numeric values, + and categorical fields. + + Args: + df: A pandas DataFrame to convert. + na_values: List of strings to be considered as NA/NaN. Defaults to + ['NA', 'na', 'NULL', 'null', '']. + category_threshold: Maximum ratio of unique values to total values + to consider a column categorical. Defaults to 0.3. + convert_threshold: Minimum ratio of successfully converted values required + to apply a type conversion. Defaults to 0.6. + sample_ratio: Fraction of data to sample for type inference. Not used in pandas version + but included for API compatibility. Defaults to 1.0. + + Returns: + tuple: (The DataFrame with converted types, A dictionary mapping column names to + their inferred types as strings) + """ + if na_values is None: + na_values = {"NA", "na", "NULL", "null", ""} + + df_converted = df.convert_dtypes() + schema = {} + + # Sample if needed (for API compatibility) + if sample_ratio < 1.0: + df = df.sample(frac=sample_ratio) + + n_rows = len(df) + + for col in df.columns: + series = df[col] + # Replace NA-like values if string + series_cleaned = series.map(lambda x: np.nan if isinstance(x, str) and x.strip() in na_values else x) + + # Skip conversion if already non-object data type, except bool which can potentially be categorical + if ( + not isinstance(series_cleaned.dtype, pd.BooleanDtype) + and not isinstance(series_cleaned.dtype, pd.StringDtype) + and series_cleaned.dtype != "object" + ): + # Keep the original data type for non-object dtypes + df_converted[col] = series + schema[col] = str(series_cleaned.dtype) + continue + + # print(f"type: {series_cleaned.dtype}, column: {series_cleaned.name}") + + if not isinstance(series_cleaned.dtype, pd.BooleanDtype): + # Try numeric (int or float) + numeric = pd.to_numeric(series_cleaned, errors="coerce") + if numeric.notna().sum() >= n_rows * convert_threshold: + if (numeric.dropna() % 1 == 0).all(): + try: + df_converted[col] = numeric.astype("int") # Nullable integer + schema[col] = "int" + continue + except Exception: + pass + df_converted[col] = numeric.astype("double") + schema[col] = "double" + continue + + # Try datetime + datetime_converted = pd.to_datetime(series_cleaned, errors="coerce") + if datetime_converted.notna().sum() >= n_rows * convert_threshold: + df_converted[col] = datetime_converted + schema[col] = "timestamp" + continue + + # Try timedelta + try: + timedelta_converted = pd.to_timedelta(series_cleaned, errors="coerce") + if timedelta_converted.notna().sum() >= n_rows * convert_threshold: + df_converted[col] = timedelta_converted + schema[col] = "timedelta" + continue + except TypeError: + pass + + # Try category + try: + unique_ratio = series_cleaned.nunique(dropna=True) / n_rows if n_rows > 0 else 1.0 + if unique_ratio <= category_threshold: + df_converted[col] = series_cleaned.astype("category") + schema[col] = "category" + continue + except Exception: + pass + df_converted[col] = series_cleaned.astype("string") + schema[col] = "string" + + return df_converted, schema diff --git a/flaml/automl/logger.py b/flaml/automl/logger.py index 1085b5aae1..959337b257 100644 --- a/flaml/automl/logger.py +++ b/flaml/automl/logger.py @@ -1,7 +1,37 @@ import logging +import os + + +class ColoredFormatter(logging.Formatter): + # ANSI escape codes for colors + COLORS = { + # logging.DEBUG: "\033[36m", # Cyan + # logging.INFO: "\033[32m", # Green + logging.WARNING: "\033[33m", # Yellow + logging.ERROR: "\033[31m", # Red + logging.CRITICAL: "\033[1;31m", # Bright Red + } + RESET = "\033[0m" # Reset to default + + def __init__(self, fmt, datefmt, use_color=True): + super().__init__(fmt, datefmt) + self.use_color = use_color + + def format(self, record): + formatted = super().format(record) + if self.use_color: + color = self.COLORS.get(record.levelno, "") + if color: + return f"{color}{formatted}{self.RESET}" + return formatted + logger = logging.getLogger(__name__) -logger_formatter = logging.Formatter( - "[%(name)s: %(asctime)s] {%(lineno)d} %(levelname)s - %(message)s", "%m-%d %H:%M:%S" +use_color = True +if os.getenv("FLAML_LOG_NO_COLOR"): + use_color = False + +logger_formatter = ColoredFormatter( + "[%(name)s: %(asctime)s] {%(lineno)d} %(levelname)s - %(message)s", "%m-%d %H:%M:%S", use_color ) logger.propagate = False diff --git a/flaml/fabric/mlflow.py b/flaml/fabric/mlflow.py index 442c3709b8..b5655b98a2 100644 --- a/flaml/fabric/mlflow.py +++ b/flaml/fabric/mlflow.py @@ -1,10 +1,14 @@ +import atexit +import functools import json +import logging import os import pickle import random -import sys import tempfile import time +import warnings +from concurrent.futures import ThreadPoolExecutor, wait from typing import MutableMapping import mlflow @@ -12,14 +16,15 @@ from mlflow.entities import Metric, Param, RunTag from mlflow.exceptions import MlflowException from mlflow.utils.autologging_utils import AUTOLOGGING_INTEGRATIONS, autologging_is_disabled +from packaging.requirements import Requirement from scipy.sparse import issparse from sklearn import tree try: - from pyspark.ml import Pipeline as SparkPipeline + from pyspark.ml import PipelineModel as SparkPipelineModel except ImportError: - class SparkPipeline: + class SparkPipelineModel: pass @@ -30,8 +35,88 @@ class SparkPipeline: from flaml.automl.spark import DataFrame, Series, psDataFrame, psSeries from flaml.version import __version__ +from .lowcode import AUTOML_DISPLAY_CONFIGURATIONS + SEARCH_MAX_RESULTS = 5000 # Each train should not have more than 5000 trials IS_RENAME_CHILD_RUN = os.environ.get("FLAML_IS_RENAME_CHILD_RUN", "false").lower() == "true" +REMOVE_REQUIREMENT_LIST = [ + "synapseml-cognitive", + "synapseml-core", + "synapseml-deep-learning", + "synapseml-internal", + "synapseml-mlflow", + "synapseml-opencv", + "synapseml-vw", + "synapseml-lightgbm", + "synapseml-utils", + "nni", + "optuna", +] +OPTIONAL_REMOVE_REQUIREMENT_LIST = ["pytorch-lightning", "transformers"] + +os.environ["MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR"] = os.environ.get("MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR", "false") + +MLFLOW_NUM_WORKERS = int(os.environ.get("FLAML_MLFLOW_NUM_WORKERS", os.cpu_count() * 4 if os.cpu_count() else 2)) +executor = ThreadPoolExecutor(max_workers=MLFLOW_NUM_WORKERS) +atexit.register(lambda: executor.shutdown(wait=True)) + +IS_CLEAN_LOGS = os.environ.get("FLAML_IS_CLEAN_LOGS", "1") +if IS_CLEAN_LOGS == "1": + logging.getLogger("synapse.ml").setLevel(logging.CRITICAL) + logging.getLogger("mlflow.utils").setLevel(logging.CRITICAL) + logging.getLogger("mlflow.utils.environment").setLevel(logging.CRITICAL) + logging.getLogger("mlflow.models.model").setLevel(logging.CRITICAL) + warnings.simplefilter("ignore", category=FutureWarning) + warnings.simplefilter("ignore", category=UserWarning) + + +def convert_requirement(requirement_list: list[str]): + ret = ( + [Requirement(s.strip().lower()) for s in requirement_list] + if mlflow.__version__ <= "2.17.0" + else requirement_list + ) + return ret + + +def time_it(func_or_code=None): + """ + Decorator or function that measures execution time. + + Can be used in three ways: + 1. As a decorator with no arguments: @time_it + 2. As a decorator with arguments: @time_it() + 3. As a function call with a string of code to execute and time: time_it("some_code()") + + Args: + func_or_code (callable or str, optional): Either a function to decorate or + a string of code to execute and time. + + Returns: + callable or None: Returns a decorated function if used as a decorator, + or None if used to execute a string of code. + """ + + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + logger.debug(f"Execution of {func.__name__} took {end_time - start_time:.4f} seconds") + return result + + return wrapper + + if callable(func_or_code): + return decorator(func_or_code) + elif func_or_code is None: + return decorator + else: + start_time = time.time() + exec(func_or_code) + end_time = time.time() + logger.debug(f"Execution\n```\n{func_or_code}\n```\ntook {end_time - start_time:.4f} seconds") def flatten_dict(d: MutableMapping, sep: str = ".") -> MutableMapping: @@ -49,23 +134,28 @@ def is_autolog_enabled(): return not all(autologging_is_disabled(k) for k in AUTOLOGGING_INTEGRATIONS.keys()) -def get_mlflow_log_latency(model_history=False): +def get_mlflow_log_latency(model_history=False, delete_run=True): + try: + FLAML_MLFLOW_LOG_LATENCY = float(os.getenv("FLAML_MLFLOW_LOG_LATENCY", 0)) + except ValueError: + FLAML_MLFLOW_LOG_LATENCY = 0 + if FLAML_MLFLOW_LOG_LATENCY >= 0.1: + return FLAML_MLFLOW_LOG_LATENCY st = time.time() with mlflow.start_run(nested=True, run_name="get_mlflow_log_latency") as run: if model_history: sk_model = tree.DecisionTreeClassifier() - mlflow.sklearn.log_model(sk_model, "sk_models") - mlflow.sklearn.log_model(Pipeline([("estimator", sk_model)]), "sk_pipeline") + mlflow.sklearn.log_model(sk_model, "model") with tempfile.TemporaryDirectory() as tmpdir: - pickle_fpath = os.path.join(tmpdir, f"tmp_{int(time.time()*1000)}") + pickle_fpath = os.path.join(tmpdir, f"tmp_{int(time.time() * 1000)}") with open(pickle_fpath, "wb") as f: pickle.dump(sk_model, f) - mlflow.log_artifact(pickle_fpath, "sk_model1") - mlflow.log_artifact(pickle_fpath, "sk_model2") + mlflow.log_artifact(pickle_fpath, "sk_model") mlflow.set_tag("synapseml.ui.visible", "false") # not shown inline in fabric - mlflow.delete_run(run.info.run_id) + if delete_run: + mlflow.delete_run(run.info.run_id) et = time.time() - return et - st + return 3 * (et - st) def infer_signature(X_train=None, y_train=None, dataframe=None, label=None): @@ -98,12 +188,76 @@ def infer_signature(X_train=None, y_train=None, dataframe=None, label=None): ) +def update_and_install_requirements( + run_id=None, + model_name=None, + model_version=None, + remove_list=None, + artifact_path="model", + dst_path=None, + install_with_ipython=False, +): + if not (run_id or (model_name and model_version)): + raise ValueError( + "Please provide `run_id` or both `model_name` and `model_version`. If all three are provided, `run_id` will be used." + ) + + if install_with_ipython: + from IPython import get_ipython + + if not remove_list: + remove_list = [ + "synapseml-cognitive", + "synapseml-core", + "synapseml-deep-learning", + "synapseml-internal", + "synapseml-mlflow", + "synapseml-opencv", + "synapseml-vw", + "synapseml-lightgbm", + "synapseml-utils", + "flaml", # flaml is needed for AutoML models, should be pre-installed in the runtime + "pyspark", # fabric internal pyspark should be pre-installed in the runtime + ] + + # Download model artifacts + client = mlflow.MlflowClient() + if not run_id: + run_id = client.get_model_version(model_name, model_version).run_id + if not dst_path: + dst_path = os.path.join(tempfile.gettempdir(), "model_artifacts") + os.makedirs(dst_path, exist_ok=True) + client.download_artifacts(run_id, artifact_path, dst_path) + requirements_path = os.path.join(dst_path, artifact_path, "requirements.txt") + with open(requirements_path) as f: + reqs = f.read().splitlines() + old_reqs = [Requirement(req) for req in reqs if req] + old_reqs_dict = {req.name: str(req) for req in old_reqs} + for req in remove_list: + req = Requirement(req) + if req.name in old_reqs_dict: + old_reqs_dict.pop(req.name, None) + new_reqs_list = list(old_reqs_dict.values()) + + with open(requirements_path, "w") as f: + f.write("\n".join(new_reqs_list)) + + if install_with_ipython: + get_ipython().run_line_magic("pip", f"install -r {requirements_path} -q") + else: + logger.info(f"You can run `pip install -r {requirements_path}` to install dependencies.") + return requirements_path + + def _mlflow_wrapper(evaluation_func, mlflow_exp_id, mlflow_config=None, extra_tags=None, autolog=False): def wrapped(*args, **kwargs): if mlflow_config is not None: - from synapse.ml.mlflow import set_mlflow_env_config + try: + from synapse.ml.mlflow import set_mlflow_env_config - set_mlflow_env_config(mlflow_config) + set_mlflow_env_config(mlflow_config) + except Exception: + pass import mlflow if mlflow_exp_id is not None: @@ -124,7 +278,20 @@ def wrapped(*args, **kwargs): def _get_notebook_name(): - return None + try: + import re + + from synapse.ml.mlflow import get_mlflow_env_config + from synapse.ml.mlflow.shared_platform_utils import get_artifact + + notebook_id = get_mlflow_env_config(False).artifact_id + current_notebook = get_artifact(notebook_id) + notebook_name = re.sub("\\W+", "-", current_notebook.displayName).strip() + + return notebook_name + except Exception as e: + logger.debug(f"Failed to get notebook name: {e}") + return None def safe_json_dumps(obj): @@ -163,6 +330,8 @@ def __init__(self, experiment_type="automl", mlflow_exp_name=None, extra_tag=Non self.has_model = False self.only_history = False self._do_log_model = True + self.futures = {} + self.futures_log_model = {} self.extra_tag = ( extra_tag @@ -170,6 +339,9 @@ def __init__(self, experiment_type="automl", mlflow_exp_name=None, extra_tag=Non else {"extra_tag.sid": f"flaml_{__version__}_{int(time.time())}_{random.randint(1001, 9999)}"} ) self.start_time = time.time() + self.experiment_type = experiment_type + self.update_autolog_state() + self.mlflow_client = mlflow.tracking.MlflowClient() parent_run_info = mlflow.active_run().info if mlflow.active_run() is not None else None if parent_run_info: @@ -188,8 +360,6 @@ def __init__(self, experiment_type="automl", mlflow_exp_name=None, extra_tag=Non mlflow.set_experiment(experiment_name=mlflow_exp_name) self.experiment_id = mlflow.tracking.fluent._active_experiment_id self.experiment_name = mlflow.get_experiment(self.experiment_id).name - self.experiment_type = experiment_type - self.update_autolog_state() if self.autolog: # only end user created parent run in autolog scenario @@ -197,9 +367,12 @@ def __init__(self, experiment_type="automl", mlflow_exp_name=None, extra_tag=Non def set_mlflow_config(self): if self.driver_mlflow_env_config is not None: - from synapse.ml.mlflow import set_mlflow_env_config + try: + from synapse.ml.mlflow import set_mlflow_env_config - set_mlflow_env_config(self.driver_mlflow_env_config) + set_mlflow_env_config(self.driver_mlflow_env_config) + except Exception: + pass def wrap_evaluation_function(self, evaluation_function): wrapped_evaluation_function = _mlflow_wrapper( @@ -267,6 +440,7 @@ def copy_mlflow_run(self, src_id, target_id, components=["param", "metric", "tag else: _tags = [] self.mlflow_client.log_batch(run_id=target_id, metrics=_metrics, params=[], tags=_tags) + return f"Successfully copy_mlflow_run run_id {src_id} to run_id {target_id}" def record_trial(self, result, trial, metric): if isinstance(result, dict): @@ -285,12 +459,12 @@ def record_trial(self, result, trial, metric): "metrics": metrics, "params": params, "tags": { - "flaml.best_run": False, - "flaml.iteration_number": self.child_counter, - "flaml.version": __version__, - "flaml.meric": metric_name, - "flaml.run_source": "flaml-tune", - "flaml.log_type": self.log_type, + "synapseml.flaml.best_run": False, + "synapseml.flaml.iteration_number": self.child_counter, + "synapseml.flaml.version": __version__, + "synapseml.flaml.meric": metric_name, + "synapseml.flaml.run_source": "flaml-tune", + "synapseml.flaml.log_type": self.log_type, }, "submetrics": { "values": [], @@ -328,18 +502,32 @@ def log_tune(self, analysis, metric): best_mlflow_run_name = self.mlflow_client.get_run(best_mlflow_run_id).info.run_name analysis.best_run_id = best_mlflow_run_id analysis.best_run_name = best_mlflow_run_name - self.mlflow_client.set_tag(best_mlflow_run_id, "flaml.best_run", True) + self.mlflow_client.set_tag(best_mlflow_run_id, "synapseml.flaml.best_run", True) self.best_run_id = best_mlflow_run_id if not self.has_summary: self.copy_mlflow_run(best_mlflow_run_id, self.parent_run_id) self.has_summary = True - def log_model(self, model, estimator, signature=None): + def log_model(self, model, estimator, signature=None, run_id=None): if not self._do_log_model: return logger.debug(f"logging model {estimator}") + ret_message = f"Successfully log_model {estimator} to run_id {run_id}" + optional_remove_list = ( + [] if estimator in ["transformer", "transformer_ms", "tcn", "tft"] else OPTIONAL_REMOVE_REQUIREMENT_LIST + ) + run = mlflow.active_run() + if run and run.info.run_id == self.parent_run_id: + mlflow.start_run(run_id=run_id, nested=True) + elif run and run.info.run_id != run_id: + ret_message = ( + f"Error: Should log_model {estimator} to run_id {run_id}, but logged to run_id {run.info.run_id}" + ) + logger.error(ret_message) + else: + mlflow.start_run(run_id=run_id) if estimator.endswith("_spark"): - mlflow.spark.log_model(model, estimator, signature=signature) + # mlflow.spark.log_model(model, estimator, signature=signature) mlflow.spark.log_model(model, "model", signature=signature) elif estimator in ["lgbm"]: mlflow.lightgbm.log_model(model, estimator, signature=signature) @@ -352,67 +540,115 @@ def log_model(self, model, estimator, signature=None): elif estimator in ["prophet"]: mlflow.prophet.log_model(model, estimator, signature=signature) elif estimator in ["orbit"]: - pass + logger.warning(f"Unsupported model: {estimator}. No model logged.") else: mlflow.sklearn.log_model(model, estimator, signature=signature) + future = executor.submit( + lambda: mlflow.models.model.update_model_requirements( + model_uri=f"runs:/{run_id}/{'model' if estimator.endswith('_spark') else estimator}", + operation="remove", + requirement_list=convert_requirement(REMOVE_REQUIREMENT_LIST + optional_remove_list), + ) + ) + self.futures[future] = f"run_{run_id}_requirements_updated" + if not run or run.info.run_id == self.parent_run_id: + mlflow.end_run() + return ret_message - def _pickle_and_log_artifact(self, obj, artifact_name, pickle_fname="temp_.pkl"): + def _pickle_and_log_artifact(self, obj, artifact_name, pickle_fname="temp_.pkl", run_id=None): if not self._do_log_model: - return + return True with tempfile.TemporaryDirectory() as tmpdir: pickle_fpath = os.path.join(tmpdir, pickle_fname) try: with open(pickle_fpath, "wb") as f: pickle.dump(obj, f) - mlflow.log_artifact(pickle_fpath, artifact_name) + mlflow.log_artifact(pickle_fpath, artifact_name, run_id) + return True except Exception as e: - logger.debug(f"Failed to pickle and log artifact {artifact_name}, error: {e}") + logger.debug(f"Failed to pickle and log {artifact_name}, error: {e}") + return False + + def _log_pipeline(self, pipeline, flavor_name, pipeline_name, signature, run_id, estimator=None): + logger.debug(f"logging pipeline {flavor_name}:{pipeline_name}:{estimator}") + ret_message = f"Successfully _log_pipeline {flavor_name}:{pipeline_name}:{estimator} to run_id {run_id}" + optional_remove_list = ( + [] if estimator in ["transformer", "transformer_ms", "tcn", "tft"] else OPTIONAL_REMOVE_REQUIREMENT_LIST + ) + run = mlflow.active_run() + if run and run.info.run_id == self.parent_run_id: + mlflow.start_run(run_id=run_id, nested=True) + elif run and run.info.run_id != run_id: + ret_message = f"Error: Should _log_pipeline {flavor_name}:{pipeline_name}:{estimator} model to run_id {run_id}, but logged to run_id {run.info.run_id}" + logger.error(ret_message) + else: + mlflow.start_run(run_id=run_id) + if flavor_name == "sklearn": + mlflow.sklearn.log_model(pipeline, pipeline_name, signature=signature) + elif flavor_name == "spark": + mlflow.spark.log_model(pipeline, pipeline_name, signature=signature) + else: + logger.warning(f"Unsupported pipeline flavor: {flavor_name}. No model logged.") + future = executor.submit( + lambda: mlflow.models.model.update_model_requirements( + model_uri=f"runs:/{run_id}/{pipeline_name}", + operation="remove", + requirement_list=convert_requirement(REMOVE_REQUIREMENT_LIST + optional_remove_list), + ) + ) + self.futures[future] = f"run_{run_id}_requirements_updated" + if not run or run.info.run_id == self.parent_run_id: + mlflow.end_run() + return ret_message - def pickle_and_log_automl_artifacts(self, automl, model, estimator, signature=None): + def pickle_and_log_automl_artifacts(self, automl, model, estimator, signature=None, run_id=None): """log automl artifacts to mlflow load back with `automl = mlflow.pyfunc.load_model(model_run_id_or_uri)`, then do prediction with `automl.predict(X)` """ - logger.debug(f"logging automl artifacts {estimator}") - self._pickle_and_log_artifact(automl.feature_transformer, "feature_transformer", "feature_transformer.pkl") - self._pickle_and_log_artifact(automl.label_transformer, "label_transformer", "label_transformer.pkl") - # Test test_mlflow 1 and 4 will get error: TypeError: cannot pickle '_io.TextIOWrapper' object - # try: - # self._pickle_and_log_artifact(automl, "automl", "automl.pkl") - # except TypeError: - # pass + logger.debug(f"logging automl estimator {estimator}") + # self._pickle_and_log_artifact( + # automl.feature_transformer, "feature_transformer", "feature_transformer.pkl", run_id + # ) + # self._pickle_and_log_artifact(automl.label_transformer, "label_transformer", "label_transformer.pkl", run_id) if estimator.endswith("_spark"): # spark pipeline is not supported yet return feature_transformer = automl.feature_transformer - if isinstance(feature_transformer, Pipeline): + if isinstance(feature_transformer, Pipeline) and not estimator.endswith("_spark"): pipeline = feature_transformer pipeline.steps.append(("estimator", model)) - elif isinstance(feature_transformer, SparkPipeline): + elif isinstance(feature_transformer, SparkPipelineModel) and estimator.endswith("_spark"): pipeline = feature_transformer pipeline.stages.append(model) elif not estimator.endswith("_spark"): steps = [("feature_transformer", feature_transformer)] + if model.autofe is not None: + steps.append(("autofe", model.autofe)) steps.append(("estimator", model)) pipeline = Pipeline(steps) else: - stages = [feature_transformer] + stages = [] + if feature_transformer is not None: + stages.append(feature_transformer) + if model.autofe is not None: + stages.append(model.autofe) stages.append(model) - pipeline = SparkPipeline(stages=stages) - if isinstance(pipeline, SparkPipeline): + pipeline = SparkPipelineModel(stages=stages) + if isinstance(pipeline, SparkPipelineModel): logger.debug(f"logging spark pipeline {estimator}") - mlflow.spark.log_model(pipeline, "automl_pipeline", signature=signature) + self._log_pipeline(pipeline, "spark", "model", signature, run_id, estimator) else: # Add a log named "model" to fit default settings logger.debug(f"logging sklearn pipeline {estimator}") - mlflow.sklearn.log_model(pipeline, "automl_pipeline", signature=signature) - mlflow.sklearn.log_model(pipeline, "model", signature=signature) + self._log_pipeline(pipeline, "sklearn", "model", signature, run_id, estimator) + return f"Successfully pickle_and_log_automl_artifacts {estimator} to run_id {run_id}" + @time_it def record_state(self, automl, search_state, estimator): _st = time.time() automl_metric_name = ( automl._state.metric if isinstance(automl._state.metric, str) else automl._state.error_metric ) - if automl._state.error_metric.startswith("1-"): automl_metric_value = 1 - search_state.val_loss elif automl._state.error_metric.startswith("-"): @@ -425,6 +661,14 @@ def record_state(self, automl, search_state, estimator): else: config = search_state.config + self.automl_user_configurations = safe_json_dumps(automl._automl_user_configurations) + self.automl_display_configurations = safe_json_dumps( + { + k: automl._automl_user_configurations[k] if k in automl._automl_user_configurations else None + for k in AUTOML_DISPLAY_CONFIGURATIONS + } + ) + info = { "metrics": { "iter_counter": automl._track_iter, @@ -435,17 +679,18 @@ def record_state(self, automl, search_state, estimator): automl_metric_name: automl_metric_value, }, "tags": { - "flaml.best_run": False, - "flaml.estimator_name": estimator, - "flaml.estimator_class": search_state.learner_class.__name__, - "flaml.iteration_number": automl._track_iter, - "flaml.version": __version__, - "flaml.learner": estimator, - "flaml.sample_size": search_state.sample_size, - "flaml.meric": automl_metric_name, - "flaml.run_source": "flaml-automl", - "flaml.log_type": self.log_type, - "flaml.automl_user_configurations": safe_json_dumps(automl._automl_user_configurations), + "synapseml.flaml.best_run": False, + "synapseml.flaml.estimator_name": estimator, + "synapseml.flaml.estimator_class": search_state.learner_class.__name__, + "synapseml.flaml.iteration_number": automl._track_iter, + "synapseml.flaml.version": __version__, + "synapseml.flaml.learner": estimator, + "synapseml.flaml.sample_size": search_state.sample_size, + "synapseml.flaml.meric": automl_metric_name, + "synapseml.flaml.run_source": "flaml-automl", + "synapseml.flaml.log_type": self.log_type, + "synapseml.flaml.automl_user_configurations": self.automl_user_configurations, + "synapseml.flaml.automl_display_configurations": self.automl_display_configurations, }, "params": { "sample_size": search_state.sample_size, @@ -472,37 +717,70 @@ def record_state(self, automl, search_state, estimator): run_name = f"{self.parent_run_name}_child_{self.child_counter}" else: run_name = None + _t1 = time.time() + wait(self.futures_log_model) + _t2 = time.time() - _t1 + logger.debug(f"wait futures_log_model in record_state took {_t2} seconds") with mlflow.start_run(nested=True, run_name=run_name) as child_run: - self._log_info_to_run(info, child_run.info.run_id, log_params=True) + future = executor.submit(lambda: self._log_info_to_run(info, child_run.info.run_id, log_params=True)) + self.futures[future] = f"iter_{automl._track_iter}_log_info_to_run" + future = executor.submit(lambda: self._log_automl_configurations(child_run.info.run_id)) + self.futures[future] = f"iter_{automl._track_iter}_log_automl_configurations" if automl._state.model_history: - self.log_model( - search_state.trained_estimator._model, estimator, signature=automl.estimator_signature - ) - self.pickle_and_log_automl_artifacts( - automl, search_state.trained_estimator, estimator, signature=automl.pipeline_signature - ) + if estimator.endswith("_spark"): + future = executor.submit( + lambda: self.log_model( + search_state.trained_estimator._model, + estimator, + automl.estimator_signature, + child_run.info.run_id, + ) + ) + self.futures_log_model[future] = f"record_state-log_model_{estimator}" + else: + future = executor.submit( + lambda: self.pickle_and_log_automl_artifacts( + automl, + search_state.trained_estimator, + estimator, + automl.pipeline_signature, + child_run.info.run_id, + ) + ) + self.futures_log_model[future] = f"record_state-pickle_and_log_automl_artifacts_{estimator}" self.manual_run_ids.append(child_run.info.run_id) self.child_counter += 1 + return f"Successfully record_state iteration {automl._track_iter}" + @time_it def log_automl(self, automl): self.set_best_iter(automl) if self.autolog: if self.parent_run_id is not None: mlflow.start_run(run_id=self.parent_run_id, experiment_id=self.experiment_id) - mlflow.log_metric("best_validation_loss", automl._state.best_loss) - mlflow.log_metric("best_iteration", automl._best_iteration) - mlflow.log_metric("num_child_runs", len(self.infos)) + mlflow.log_metrics( + { + "best_validation_loss": automl._state.best_loss, + "best_iteration": automl._best_iteration, + "num_child_runs": len(self.infos), + } + ) if ( automl._trained_estimator is not None and not self.has_model and automl._trained_estimator._model is not None ): - self.log_model( - automl._trained_estimator._model, automl.best_estimator, signature=automl.estimator_signature - ) - self.pickle_and_log_automl_artifacts( - automl, automl.model, automl.best_estimator, signature=automl.pipeline_signature - ) + if automl.best_estimator.endswith("_spark"): + self.log_model( + automl._trained_estimator._model, + automl.best_estimator, + automl.estimator_signature, + self.parent_run_id, + ) + else: + self.pickle_and_log_automl_artifacts( + automl, automl.model, automl.best_estimator, automl.pipeline_signature, self.parent_run_id + ) self.has_model = True self.adopt_children(automl) @@ -512,41 +790,77 @@ def log_automl(self, automl): best_run_name = self.mlflow_client.get_run(best_mlflow_run_id).info.run_name automl.best_run_id = best_mlflow_run_id automl.best_run_name = best_run_name - self.mlflow_client.set_tag(best_mlflow_run_id, "flaml.best_run", True) + self.mlflow_client.set_tag(best_mlflow_run_id, "synapseml.flaml.best_run", True) self.best_run_id = best_mlflow_run_id if self.parent_run_id is not None: conf = automl._config_history[automl._best_iteration][1].copy() if "ml" in conf.keys(): conf = conf["ml"] - mlflow.log_params(conf) - mlflow.log_param("best_learner", automl._best_estimator) + mlflow.log_params({**conf, "best_learner": automl._best_estimator}, run_id=self.parent_run_id) if not self.has_summary: logger.info(f"logging best model {automl.best_estimator}") - self.copy_mlflow_run(best_mlflow_run_id, self.parent_run_id) + future = executor.submit(lambda: self.copy_mlflow_run(best_mlflow_run_id, self.parent_run_id)) + self.futures[future] = "log_automl_copy_mlflow_run" + future = executor.submit(lambda: self._log_automl_configurations(self.parent_run_id)) + self.futures[future] = "log_automl_log_automl_configurations" self.has_summary = True + _t1 = time.time() + wait(self.futures_log_model) + _t2 = time.time() - _t1 + logger.debug(f"wait futures_log_model in log_automl took {_t2} seconds") if ( automl._trained_estimator is not None and not self.has_model and automl._trained_estimator._model is not None ): - self.log_model( - automl._trained_estimator._model, - automl.best_estimator, - signature=automl.estimator_signature, - ) - self.pickle_and_log_automl_artifacts( - automl, automl.model, automl.best_estimator, signature=automl.pipeline_signature - ) + if automl.best_estimator.endswith("_spark"): + future = executor.submit( + lambda: self.log_model( + automl._trained_estimator._model, + automl.best_estimator, + signature=automl.estimator_signature, + run_id=self.parent_run_id, + ) + ) + self.futures_log_model[future] = f"log_automl-log_model_{automl.best_estimator}" + else: + future = executor.submit( + lambda: self.pickle_and_log_automl_artifacts( + automl, + automl.model, + automl.best_estimator, + signature=automl.pipeline_signature, + run_id=self.parent_run_id, + ) + ) + self.futures_log_model[ + future + ] = f"log_automl-pickle_and_log_automl_artifacts_{automl.best_estimator}" self.has_model = True def resume_mlflow(self): if len(self.resume_params) > 0: mlflow.autolog(**self.resume_params) + def _log_automl_configurations(self, run_id): + self.mlflow_client.log_text( + run_id=run_id, + text=self.automl_user_configurations, + artifact_file="automl_configurations/automl_user_configurations.json", + ) + self.mlflow_client.log_text( + run_id=run_id, + text=self.automl_display_configurations, + artifact_file="automl_configurations/automl_display_configurations.json", + ) + return f"Successfully _log_automl_configurations to run_id {run_id}" + def _log_info_to_run(self, info, run_id, log_params=False): _metrics = [Metric(key, value, int(time.time() * 1000), 0) for key, value in info["metrics"].items()] - _tags = [RunTag(key, str(value)) for key, value in info["tags"].items()] + _tags = [ + RunTag(key, str(value)[:5000]) for key, value in info["tags"].items() + ] # AML will raise error if value length > 5000 _params = [ Param(key, str(value)) for key, value in info["params"].items() @@ -562,6 +876,7 @@ def _log_info_to_run(self, info, run_id, log_params=False): _tags = [RunTag("mlflow.parentRunId", run_id)] self.mlflow_client.log_batch(run_id=run.info.run_id, metrics=_metrics, params=[], tags=_tags) del info["submetrics"]["values"] + return f"Successfully _log_info_to_run to run_id {run_id}" def adopt_children(self, result=None): """ @@ -645,7 +960,7 @@ def adopt_children(self, result=None): "mlflow.runName", f"{self.parent_run_name}_child_{self.child_counter}", ) - self.mlflow_client.set_tag(child_run_id, "flaml.child_counter", self.child_counter) + self.mlflow_client.set_tag(child_run_id, "synapseml.flaml.child_counter", self.child_counter) # merge autolog child run and corresponding manual run flaml_info = self.infos[self.child_counter] @@ -661,7 +976,7 @@ def adopt_children(self, result=None): ) if self.child_counter == best_iteration: - self.mlflow_client.set_tag(child_run_id, "flaml.best_run", True) + self.mlflow_client.set_tag(child_run_id, "synapseml.flaml.best_run", True) if result is not None: result.best_run_id = child_run_id result.best_run_name = child_run.info.run_name diff --git a/flaml/tune/logger.py b/flaml/tune/logger.py new file mode 100644 index 0000000000..959337b257 --- /dev/null +++ b/flaml/tune/logger.py @@ -0,0 +1,37 @@ +import logging +import os + + +class ColoredFormatter(logging.Formatter): + # ANSI escape codes for colors + COLORS = { + # logging.DEBUG: "\033[36m", # Cyan + # logging.INFO: "\033[32m", # Green + logging.WARNING: "\033[33m", # Yellow + logging.ERROR: "\033[31m", # Red + logging.CRITICAL: "\033[1;31m", # Bright Red + } + RESET = "\033[0m" # Reset to default + + def __init__(self, fmt, datefmt, use_color=True): + super().__init__(fmt, datefmt) + self.use_color = use_color + + def format(self, record): + formatted = super().format(record) + if self.use_color: + color = self.COLORS.get(record.levelno, "") + if color: + return f"{color}{formatted}{self.RESET}" + return formatted + + +logger = logging.getLogger(__name__) +use_color = True +if os.getenv("FLAML_LOG_NO_COLOR"): + use_color = False + +logger_formatter = ColoredFormatter( + "[%(name)s: %(asctime)s] {%(lineno)d} %(levelname)s - %(message)s", "%m-%d %H:%M:%S", use_color +) +logger.propagate = False diff --git a/flaml/tune/spark/utils.py b/flaml/tune/spark/utils.py index 5113549c12..959fb373fd 100644 --- a/flaml/tune/spark/utils.py +++ b/flaml/tune/spark/utils.py @@ -162,6 +162,10 @@ def search_space(cls, **params): assert isinstance(MyLargeLGBM(), LGBMEstimator) ``` """ + # Check if Spark is available + spark_available, _ = check_spark() + + # Write to local driver file system flaml_path = os.path.dirname(os.path.abspath(__file__)) custom_code = textwrap.dedent(custom_code) custom_path = os.path.join(flaml_path, file_name + ".py") @@ -169,6 +173,24 @@ def search_space(cls, **params): with open(custom_path, "w") as f: f.write(custom_code) + # If using Spark, broadcast the code content to executors + if spark_available: + spark = SparkSession.builder.getOrCreate() + bc_code = spark.sparkContext.broadcast(custom_code) + + # Execute a job to ensure the code is distributed to all executors + def _write_code(bc): + code = bc.value + import os + + module_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_name + ".py") + os.makedirs(os.path.dirname(module_path), exist_ok=True) + with open(module_path, "w") as f: + f.write(code) + return True + + spark.sparkContext.parallelize(range(1)).map(lambda _: _write_code(bc_code)).collect() + return custom_path diff --git a/flaml/tune/tune.py b/flaml/tune/tune.py index 84c1133499..08a45c72c1 100644 --- a/flaml/tune/tune.py +++ b/flaml/tune/tune.py @@ -21,11 +21,11 @@ from .analysis import ExperimentAnalysis as EA else: ray_available = True - import logging from flaml.tune.spark.utils import PySparkOvertimeMonitor, check_spark +from .logger import logger, logger_formatter from .result import DEFAULT_METRIC from .trial import Trial @@ -41,8 +41,6 @@ internal_mlflow = False -logger = logging.getLogger(__name__) -logger.propagate = False _use_ray = True _runner = None _verbose = 0 @@ -521,10 +519,6 @@ def easy_objective(config): elif not logger.hasHandlers(): # Add the console handler. _ch = logging.StreamHandler(stream=sys.stdout) - logger_formatter = logging.Formatter( - "[%(name)s: %(asctime)s] {%(lineno)d} %(levelname)s - %(message)s", - "%m-%d %H:%M:%S", - ) _ch.setFormatter(logger_formatter) logger.addHandler(_ch) if verbose <= 2: @@ -752,10 +746,16 @@ def easy_objective(config): max_concurrent = max(1, search_alg.max_concurrent) else: max_concurrent = max(1, max_spark_parallelism) + passed_in_n_concurrent_trials = max(n_concurrent_trials, max_concurrent) n_concurrent_trials = min( n_concurrent_trials if n_concurrent_trials > 0 else num_executors, max_concurrent, ) + if n_concurrent_trials < passed_in_n_concurrent_trials: + logger.warning( + f"The actual concurrent trials is {n_concurrent_trials}. You can set the environment " + f"variable `FLAML_MAX_CONCURRENT` to '{passed_in_n_concurrent_trials}' to override the detected num of executors." + ) with parallel_backend("spark"): with Parallel(n_jobs=n_concurrent_trials, verbose=max(0, (verbose - 1) * 50)) as parallel: try: @@ -777,7 +777,7 @@ def easy_objective(config): and num_failures < upperbound_num_failures ): if automl_info and automl_info[0] > 0 and time_budget_s < np.inf: - time_budget_s -= automl_info[0] + time_budget_s -= automl_info[0] * n_concurrent_trials logger.debug(f"Remaining time budget with mlflow log latency: {time_budget_s} seconds.") while len(_runner.running_trials) < n_concurrent_trials: # suggest trials for spark diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000000..2b1db9c48e --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +markers = + spark: mark a test as requiring Spark diff --git a/test/automl/test_extra_models.py b/test/automl/test_extra_models.py index 6c5cac0992..e19f2cb201 100644 --- a/test/automl/test_extra_models.py +++ b/test/automl/test_extra_models.py @@ -17,6 +17,8 @@ from flaml.automl.ml import sklearn_metric_loss_score from flaml.tune.spark.utils import check_spark +pytestmark = pytest.mark.spark + leaderboard = defaultdict(dict) warnings.simplefilter(action="ignore") diff --git a/test/automl/test_forecast.py b/test/automl/test_forecast.py index 6e5d97d4f8..21e8bacfe3 100644 --- a/test/automl/test_forecast.py +++ b/test/automl/test_forecast.py @@ -477,7 +477,10 @@ def test_forecast_classification(budget=5): def get_stalliion_data(): from pytorch_forecasting.data.examples import get_stallion_data - data = get_stallion_data() + # data = get_stallion_data() + data = pd.read_parquet( + "https://raw.githubusercontent.com/sktime/pytorch-forecasting/refs/heads/main/examples/data/stallion.parquet" + ) # add time index - For datasets with no missing values, FLAML will automate this process data["time_idx"] = data["date"].dt.year * 12 + data["date"].dt.month data["time_idx"] -= data["time_idx"].min() diff --git a/test/automl/test_mlflow.py b/test/automl/test_mlflow.py index 58d749d379..5185da9c8e 100644 --- a/test/automl/test_mlflow.py +++ b/test/automl/test_mlflow.py @@ -10,6 +10,18 @@ class TestMLFlowLoggingParam: + def test_update_and_install_requirements(self): + import mlflow + from sklearn import tree + + from flaml.fabric.mlflow import update_and_install_requirements + + with mlflow.start_run(run_name="test") as run: + sk_model = tree.DecisionTreeClassifier() + mlflow.sklearn.log_model(sk_model, "model", registered_model_name="test") + + update_and_install_requirements(run_id=run.info.run_id) + def test_should_start_new_run_by_default(self, automl_settings): with mlflow.start_run() as parent_run: automl = AutoML() diff --git a/test/nlp/test_autohf_classificationhead.py b/test/nlp/test_autohf_classificationhead.py index 9ce023f966..325e4aeb02 100644 --- a/test/nlp/test_autohf_classificationhead.py +++ b/test/nlp/test_autohf_classificationhead.py @@ -24,6 +24,8 @@ if sys.platform.startswith("darwin") and sys.version_info[0] == 3 and sys.version_info[1] == 11: pytest.skip("skipping Python 3.11 on MacOS", allow_module_level=True) +pytestmark = pytest.mark.spark # set to spark as parallel testing raised RuntimeError + def test_switch_1_1(): data_idx, model_path_idx = 0, 0 diff --git a/test/nlp/test_autohf_cv.py b/test/nlp/test_autohf_cv.py index e5429a7f58..4ec6113070 100644 --- a/test/nlp/test_autohf_cv.py +++ b/test/nlp/test_autohf_cv.py @@ -5,6 +5,8 @@ import pytest from utils import get_automl_settings, get_toy_data_seqclassification +pytestmark = pytest.mark.spark # set to spark as parallel testing raised MlflowException of changing parameter + @pytest.mark.skipif(sys.platform in ["darwin", "win32"], reason="do not run on mac os or windows") def test_cv(): diff --git a/test/nlp/test_default.py b/test/nlp/test_default.py index b3c9016486..239fce2276 100644 --- a/test/nlp/test_default.py +++ b/test/nlp/test_default.py @@ -10,6 +10,10 @@ if sys.platform.startswith("darwin") and sys.version_info[0] == 3 and sys.version_info[1] == 11: pytest.skip("skipping Python 3.11 on MacOS", allow_module_level=True) +pytestmark = ( + pytest.mark.spark +) # set to spark as parallel testing raised ValueError: Feature NonExisting not implemented. + def pop_args(fit_kwargs): fit_kwargs.pop("max_iter", None) diff --git a/test/spark/test_0sparkml.py b/test/spark/test_0sparkml.py index 3f2198241c..cb51b292f5 100644 --- a/test/spark/test_0sparkml.py +++ b/test/spark/test_0sparkml.py @@ -3,11 +3,13 @@ import warnings import mlflow +import numpy as np import pytest import sklearn.datasets as skds from packaging.version import Version from flaml import AutoML +from flaml.automl.data import auto_convert_dtypes_pandas, auto_convert_dtypes_spark, get_random_dataframe from flaml.tune.spark.utils import check_spark warnings.simplefilter(action="ignore") @@ -58,7 +60,7 @@ else: skip_py311 = False -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] def _test_spark_synapseml_lightgbm(spark=None, task="classification"): @@ -296,11 +298,88 @@ def prepare_data(dataset_name="higgs"): print("time cost in minutes: ", (end_time - start_time) / 60) +def test_get_random_dataframe(): + # Test with default parameters + df = get_random_dataframe(n_rows=50, ratio_none=0.2, seed=123) + assert df.shape == (50, 14) # Default is 200 rows and 14 columns + + # Test column types + assert "timestamp" in df.columns and np.issubdtype(df["timestamp"].dtype, np.datetime64) + assert "id" in df.columns and np.issubdtype(df["id"].dtype, np.integer) + assert "score" in df.columns and np.issubdtype(df["score"].dtype, np.floating) + assert "category" in df.columns and df["category"].dtype.name == "category" + + +def test_auto_convert_dtypes_pandas(): + # Create a test DataFrame with various types + import pandas as pd + + test_df = pd.DataFrame( + { + "int_col": ["1", "2", "3", "4", "5", "6", "6"], + "float_col": ["1.1", "2.2", "3.3", "NULL", "5.5", "6.6", "6.6"], + "date_col": ["2021-01-01", "2021-02-01", "NA", "2021-04-01", "2021-05-01", "2021-06-01", "2021-06-01"], + "cat_col": ["A", "B", "A", "A", "B", "A", "B"], + "string_col": ["text1", "text2", "text3", "text4", "text5", "text6", "text7"], + } + ) + + # Convert dtypes + converted_df, schema = auto_convert_dtypes_pandas(test_df) + + # Check conversions + assert schema["int_col"] == "int" + assert schema["float_col"] == "double" + assert schema["date_col"] == "timestamp" + assert schema["cat_col"] == "category" + assert schema["string_col"] == "string" + + +def test_auto_convert_dtypes_spark(): + """Test auto_convert_dtypes_spark function with various data types.""" + import pandas as pd + + # Create a test DataFrame with various types + test_pdf = pd.DataFrame( + { + "int_col": ["1", "2", "3", "4", "NA"], + "float_col": ["1.1", "2.2", "3.3", "NULL", "5.5"], + "date_col": ["2021-01-01", "2021-02-01", "NA", "2021-04-01", "2021-05-01"], + "cat_col": ["A", "B", "A", "C", "B"], + "string_col": ["text1", "text2", "text3", "text4", "text5"], + } + ) + + # Convert pandas DataFrame to Spark DataFrame + test_df = spark.createDataFrame(test_pdf) + + # Convert dtypes + converted_df, schema = auto_convert_dtypes_spark(test_df) + + # Check conversions + assert schema["int_col"] == "int" + assert schema["float_col"] == "double" + assert schema["date_col"] == "timestamp" + assert schema["cat_col"] == "string" # Conceptual category in schema + assert schema["string_col"] == "string" + + # Verify the actual data types from the Spark DataFrame + spark_dtypes = dict(converted_df.dtypes) + assert spark_dtypes["int_col"] == "int" + assert spark_dtypes["float_col"] == "double" + assert spark_dtypes["date_col"] == "timestamp" + assert spark_dtypes["cat_col"] == "string" # In Spark, categories are still strings + assert spark_dtypes["string_col"] == "string" + + if __name__ == "__main__": test_spark_synapseml_classification() test_spark_synapseml_regression() test_spark_synapseml_rank() test_spark_input_df() + test_get_random_dataframe() + test_auto_convert_dtypes_pandas() + test_auto_convert_dtypes_spark() # import cProfile # import pstats diff --git a/test/spark/test_automl.py b/test/spark/test_automl.py index 269161711c..8785a8bfd3 100644 --- a/test/spark/test_automl.py +++ b/test/spark/test_automl.py @@ -25,7 +25,7 @@ spark_available, _ = check_spark() skip_spark = not spark_available -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] def test_parallel_xgboost(hpo_method=None, data_size=1000): diff --git a/test/spark/test_ensemble.py b/test/spark/test_ensemble.py index 74dd17fd4b..9278791e86 100644 --- a/test/spark/test_ensemble.py +++ b/test/spark/test_ensemble.py @@ -1,6 +1,7 @@ import os import unittest +import pytest from sklearn.datasets import load_wine from flaml import AutoML @@ -24,6 +25,8 @@ else: skip_my_learner = True +pytestmark = pytest.mark.spark + class TestEnsemble(unittest.TestCase): def setUp(self) -> None: diff --git a/test/spark/test_exceptions.py b/test/spark/test_exceptions.py index a20de7fc28..13c265d37c 100644 --- a/test/spark/test_exceptions.py +++ b/test/spark/test_exceptions.py @@ -9,7 +9,7 @@ spark_available, _ = check_spark() skip_spark = not spark_available -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] os.environ["FLAML_MAX_CONCURRENT"] = "2" diff --git a/test/spark/test_mlflow.py b/test/spark/test_mlflow.py index 5a809d5acd..b61b978146 100644 --- a/test/spark/test_mlflow.py +++ b/test/spark/test_mlflow.py @@ -21,6 +21,7 @@ from pyspark.ml.feature import VectorAssembler except ImportError: pass +pytestmark = pytest.mark.spark warnings.filterwarnings("ignore") skip_spark = importlib.util.find_spec("pyspark") is None diff --git a/test/spark/test_multiclass.py b/test/spark/test_multiclass.py index c3cf579e12..45f6e5b45a 100644 --- a/test/spark/test_multiclass.py +++ b/test/spark/test_multiclass.py @@ -2,6 +2,7 @@ import unittest import numpy as np +import pytest import scipy.sparse from sklearn.datasets import load_iris, load_wine @@ -12,6 +13,7 @@ spark_available, _ = check_spark() skip_spark = not spark_available +pytestmark = pytest.mark.spark os.environ["FLAML_MAX_CONCURRENT"] = "2" diff --git a/test/spark/test_notebook.py b/test/spark/test_notebook.py index 88650e4239..31071bec97 100644 --- a/test/spark/test_notebook.py +++ b/test/spark/test_notebook.py @@ -9,7 +9,7 @@ spark_available, _ = check_spark() skip_spark = not spark_available -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] here = os.path.abspath(os.path.dirname(__file__)) os.environ["FLAML_MAX_CONCURRENT"] = "2" diff --git a/test/spark/test_overtime.py b/test/spark/test_overtime.py index 13fec32e90..82d1aa81f1 100644 --- a/test/spark/test_overtime.py +++ b/test/spark/test_overtime.py @@ -25,7 +25,7 @@ except ImportError: skip_spark = True -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] def test_overtime(): diff --git a/test/spark/test_performance.py b/test/spark/test_performance.py index 8e4da7efd7..febd1a4b88 100644 --- a/test/spark/test_performance.py +++ b/test/spark/test_performance.py @@ -11,7 +11,7 @@ spark_available, _ = check_spark() skip_spark = not spark_available -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] os.environ["FLAML_MAX_CONCURRENT"] = "2" diff --git a/test/spark/test_tune.py b/test/spark/test_tune.py index 117a4ad80a..0f9b5a9a25 100644 --- a/test/spark/test_tune.py +++ b/test/spark/test_tune.py @@ -14,7 +14,7 @@ spark_available, _ = check_spark() skip_spark = not spark_available -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] os.environ["FLAML_MAX_CONCURRENT"] = "2" X, y = load_breast_cancer(return_X_y=True) diff --git a/test/spark/test_utils.py b/test/spark/test_utils.py index 0f772c7c49..1141972bee 100644 --- a/test/spark/test_utils.py +++ b/test/spark/test_utils.py @@ -36,7 +36,7 @@ print("Spark is not installed. Skip all spark tests.") skip_spark = True -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] def test_with_parameters_spark(): From f4e88dae82723fd2fbffdfafe336684eb064adb0 Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Wed, 21 May 2025 10:04:55 +0000 Subject: [PATCH 2/8] Remove synapseml from tag names --- flaml/fabric/mlflow.py | 44 +++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/flaml/fabric/mlflow.py b/flaml/fabric/mlflow.py index b5655b98a2..04dfd14dbb 100644 --- a/flaml/fabric/mlflow.py +++ b/flaml/fabric/mlflow.py @@ -459,12 +459,12 @@ def record_trial(self, result, trial, metric): "metrics": metrics, "params": params, "tags": { - "synapseml.flaml.best_run": False, - "synapseml.flaml.iteration_number": self.child_counter, - "synapseml.flaml.version": __version__, - "synapseml.flaml.meric": metric_name, - "synapseml.flaml.run_source": "flaml-tune", - "synapseml.flaml.log_type": self.log_type, + "flaml.best_run": False, + "flaml.iteration_number": self.child_counter, + "flaml.version": __version__, + "flaml.meric": metric_name, + "flaml.run_source": "flaml-tune", + "flaml.log_type": self.log_type, }, "submetrics": { "values": [], @@ -502,7 +502,7 @@ def log_tune(self, analysis, metric): best_mlflow_run_name = self.mlflow_client.get_run(best_mlflow_run_id).info.run_name analysis.best_run_id = best_mlflow_run_id analysis.best_run_name = best_mlflow_run_name - self.mlflow_client.set_tag(best_mlflow_run_id, "synapseml.flaml.best_run", True) + self.mlflow_client.set_tag(best_mlflow_run_id, "flaml.best_run", True) self.best_run_id = best_mlflow_run_id if not self.has_summary: self.copy_mlflow_run(best_mlflow_run_id, self.parent_run_id) @@ -679,18 +679,18 @@ def record_state(self, automl, search_state, estimator): automl_metric_name: automl_metric_value, }, "tags": { - "synapseml.flaml.best_run": False, - "synapseml.flaml.estimator_name": estimator, - "synapseml.flaml.estimator_class": search_state.learner_class.__name__, - "synapseml.flaml.iteration_number": automl._track_iter, - "synapseml.flaml.version": __version__, - "synapseml.flaml.learner": estimator, - "synapseml.flaml.sample_size": search_state.sample_size, - "synapseml.flaml.meric": automl_metric_name, - "synapseml.flaml.run_source": "flaml-automl", - "synapseml.flaml.log_type": self.log_type, - "synapseml.flaml.automl_user_configurations": self.automl_user_configurations, - "synapseml.flaml.automl_display_configurations": self.automl_display_configurations, + "flaml.best_run": False, + "flaml.estimator_name": estimator, + "flaml.estimator_class": search_state.learner_class.__name__, + "flaml.iteration_number": automl._track_iter, + "flaml.version": __version__, + "flaml.learner": estimator, + "flaml.sample_size": search_state.sample_size, + "flaml.meric": automl_metric_name, + "flaml.run_source": "flaml-automl", + "flaml.log_type": self.log_type, + "flaml.automl_user_configurations": self.automl_user_configurations, + "flaml.automl_display_configurations": self.automl_display_configurations, }, "params": { "sample_size": search_state.sample_size, @@ -790,7 +790,7 @@ def log_automl(self, automl): best_run_name = self.mlflow_client.get_run(best_mlflow_run_id).info.run_name automl.best_run_id = best_mlflow_run_id automl.best_run_name = best_run_name - self.mlflow_client.set_tag(best_mlflow_run_id, "synapseml.flaml.best_run", True) + self.mlflow_client.set_tag(best_mlflow_run_id, "flaml.best_run", True) self.best_run_id = best_mlflow_run_id if self.parent_run_id is not None: conf = automl._config_history[automl._best_iteration][1].copy() @@ -960,7 +960,7 @@ def adopt_children(self, result=None): "mlflow.runName", f"{self.parent_run_name}_child_{self.child_counter}", ) - self.mlflow_client.set_tag(child_run_id, "synapseml.flaml.child_counter", self.child_counter) + self.mlflow_client.set_tag(child_run_id, "flaml.child_counter", self.child_counter) # merge autolog child run and corresponding manual run flaml_info = self.infos[self.child_counter] @@ -976,7 +976,7 @@ def adopt_children(self, result=None): ) if self.child_counter == best_iteration: - self.mlflow_client.set_tag(child_run_id, "synapseml.flaml.best_run", True) + self.mlflow_client.set_tag(child_run_id, "flaml.best_run", True) if result is not None: result.best_run_id = child_run_id result.best_run_name = child_run.info.run_name From 8914b6404a4b87ecc51541e0b74425916a7a6955 Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Wed, 21 May 2025 10:11:00 +0000 Subject: [PATCH 3/8] Fix 'NoneType' object has no attribute 'DataFrame' --- flaml/automl/data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flaml/automl/data.py b/flaml/automl/data.py index b7c04bd345..441aef3697 100644 --- a/flaml/automl/data.py +++ b/flaml/automl/data.py @@ -452,7 +452,7 @@ def group_counts(groups): return c[np.argsort(i)] -def get_random_dataframe(n_rows: int = 200, ratio_none: float = 0.1, seed: int = 42) -> pd.DataFrame: +def get_random_dataframe(n_rows: int = 200, ratio_none: float = 0.1, seed: int = 42) -> DataFrame: """Generate a random pandas DataFrame with various data types for testing. This function creates a DataFrame with multiple column types including: - Timestamps From c21e2d144b076661724560e67d2e818bd4ae98bb Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Wed, 21 May 2025 10:20:18 +0000 Subject: [PATCH 4/8] Deprecated 3.8 support --- .github/workflows/CD.yml | 2 +- .github/workflows/deploy-website.yml | 4 ++-- .github/workflows/python-package.yml | 4 ++-- Dockerfile | 2 +- README.md | 2 +- setup.py | 3 +-- 6 files changed, 8 insertions(+), 9 deletions(-) diff --git a/.github/workflows/CD.yml b/.github/workflows/CD.yml index 7cf8fe2459..26ee24c4c5 100644 --- a/.github/workflows/CD.yml +++ b/.github/workflows/CD.yml @@ -13,7 +13,7 @@ jobs: strategy: matrix: os: ['ubuntu-latest'] - python-version: [3.8] + python-version: [3.10] runs-on: ${{ matrix.os }} environment: package steps: diff --git a/.github/workflows/deploy-website.yml b/.github/workflows/deploy-website.yml index 1df66381c7..9c973dfbc5 100644 --- a/.github/workflows/deploy-website.yml +++ b/.github/workflows/deploy-website.yml @@ -37,7 +37,7 @@ jobs: - name: setup python uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.10" - name: pydoc-markdown install run: | python -m pip install --upgrade pip @@ -73,7 +73,7 @@ jobs: - name: setup python uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.10" - name: pydoc-markdown install run: | python -m pip install --upgrade pip diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 507567f688..a38023deeb 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -30,7 +30,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, macos-latest, windows-2019] - python-version: ["3.8", "3.9", "3.10", "3.11"] + python-version: ["3.9", "3.10", "3.11"] steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} @@ -108,7 +108,7 @@ jobs: # - name: Setup Python # uses: actions/setup-python@v4 # with: - # python-version: '3.8' + # python-version: '3.10' # - name: Compile documentation # run: | # pip install -e . diff --git a/Dockerfile b/Dockerfile index 35d4eed39f..47a246fe58 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # basic setup -FROM mcr.microsoft.com/devcontainers/python:3.8 +FROM mcr.microsoft.com/devcontainers/python:3.10 RUN apt-get update && apt-get -y update RUN apt-get install -y sudo git npm diff --git a/README.md b/README.md index 302880408e..0998931dd5 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ FLAML has a .NET implementation in [ML.NET](http://dot.net/ml), an open-source, ## Installation -FLAML requires **Python version >= 3.8**. It can be installed from pip: +FLAML requires **Python version >= 3.9**. It can be installed from pip: ```bash pip install flaml diff --git a/setup.py b/setup.py index e30db68a20..31cc563726 100644 --- a/setup.py +++ b/setup.py @@ -170,10 +170,9 @@ "Operating System :: OS Independent", # Specify the Python versions you support here. "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", ], - python_requires=">=3.8", + python_requires=">=3.9", ) From 96e2a21f34223af5a3b6f78a8d8bff710566092d Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Wed, 21 May 2025 10:22:40 +0000 Subject: [PATCH 5/8] Fix 'NoneType' object has no attribute 'DataFrame' --- flaml/automl/data.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flaml/automl/data.py b/flaml/automl/data.py index 441aef3697..4c473963f0 100644 --- a/flaml/automl/data.py +++ b/flaml/automl/data.py @@ -676,12 +676,12 @@ def auto_convert_dtypes_spark( def auto_convert_dtypes_pandas( - df: pd.DataFrame, + df: DataFrame, na_values: list = None, category_threshold: float = 0.3, convert_threshold: float = 0.6, sample_ratio: float = 1.0, -) -> tuple[pd.DataFrame, dict]: +) -> tuple[DataFrame, dict]: """Automatically convert data types in a pandas DataFrame using heuristics. This function analyzes the DataFrame to infer appropriate data types From 4332e167cea2c6f1a50ddc40e6ecb9e762608b09 Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Wed, 21 May 2025 10:26:12 +0000 Subject: [PATCH 6/8] Still use python 3.8 for pydoc --- .github/workflows/CD.yml | 2 +- .github/workflows/deploy-website.yml | 4 ++-- .github/workflows/python-package.yml | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/CD.yml b/.github/workflows/CD.yml index 26ee24c4c5..7cf8fe2459 100644 --- a/.github/workflows/CD.yml +++ b/.github/workflows/CD.yml @@ -13,7 +13,7 @@ jobs: strategy: matrix: os: ['ubuntu-latest'] - python-version: [3.10] + python-version: [3.8] runs-on: ${{ matrix.os }} environment: package steps: diff --git a/.github/workflows/deploy-website.yml b/.github/workflows/deploy-website.yml index 9c973dfbc5..1df66381c7 100644 --- a/.github/workflows/deploy-website.yml +++ b/.github/workflows/deploy-website.yml @@ -37,7 +37,7 @@ jobs: - name: setup python uses: actions/setup-python@v4 with: - python-version: "3.10" + python-version: "3.8" - name: pydoc-markdown install run: | python -m pip install --upgrade pip @@ -73,7 +73,7 @@ jobs: - name: setup python uses: actions/setup-python@v4 with: - python-version: "3.10" + python-version: "3.8" - name: pydoc-markdown install run: | python -m pip install --upgrade pip diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index a38023deeb..56009f8abf 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -108,7 +108,7 @@ jobs: # - name: Setup Python # uses: actions/setup-python@v4 # with: - # python-version: '3.10' + # python-version: '3.8' # - name: Compile documentation # run: | # pip install -e . From 3608bab30edf3a208a26698dbb2666dbb900cc22 Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Wed, 21 May 2025 12:54:44 +0000 Subject: [PATCH 7/8] Don't run tests in parallel --- .coveragerc | 2 -- 1 file changed, 2 deletions(-) diff --git a/.coveragerc b/.coveragerc index 05ab59fb37..91c9b36bc3 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,7 +1,5 @@ [run] branch = True source = flaml -concurrency = multiprocessing -parallel = true omit = *test* From 6ec1d92c9241b6e294260c9b4ec0561628b6987d Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Thu, 22 May 2025 03:11:42 +0000 Subject: [PATCH 8/8] Remove autofe and lowcode --- flaml/fabric/mlflow.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/flaml/fabric/mlflow.py b/flaml/fabric/mlflow.py index 04dfd14dbb..106fa1973d 100644 --- a/flaml/fabric/mlflow.py +++ b/flaml/fabric/mlflow.py @@ -35,8 +35,6 @@ class SparkPipelineModel: from flaml.automl.spark import DataFrame, Series, psDataFrame, psSeries from flaml.version import __version__ -from .lowcode import AUTOML_DISPLAY_CONFIGURATIONS - SEARCH_MAX_RESULTS = 5000 # Each train should not have more than 5000 trials IS_RENAME_CHILD_RUN = os.environ.get("FLAML_IS_RENAME_CHILD_RUN", "false").lower() == "true" REMOVE_REQUIREMENT_LIST = [ @@ -622,16 +620,12 @@ def pickle_and_log_automl_artifacts(self, automl, model, estimator, signature=No pipeline.stages.append(model) elif not estimator.endswith("_spark"): steps = [("feature_transformer", feature_transformer)] - if model.autofe is not None: - steps.append(("autofe", model.autofe)) steps.append(("estimator", model)) pipeline = Pipeline(steps) else: stages = [] if feature_transformer is not None: stages.append(feature_transformer) - if model.autofe is not None: - stages.append(model.autofe) stages.append(model) pipeline = SparkPipelineModel(stages=stages) if isinstance(pipeline, SparkPipelineModel): @@ -662,12 +656,6 @@ def record_state(self, automl, search_state, estimator): config = search_state.config self.automl_user_configurations = safe_json_dumps(automl._automl_user_configurations) - self.automl_display_configurations = safe_json_dumps( - { - k: automl._automl_user_configurations[k] if k in automl._automl_user_configurations else None - for k in AUTOML_DISPLAY_CONFIGURATIONS - } - ) info = { "metrics": { @@ -690,7 +678,6 @@ def record_state(self, automl, search_state, estimator): "flaml.run_source": "flaml-automl", "flaml.log_type": self.log_type, "flaml.automl_user_configurations": self.automl_user_configurations, - "flaml.automl_display_configurations": self.automl_display_configurations, }, "params": { "sample_size": search_state.sample_size, @@ -849,11 +836,6 @@ def _log_automl_configurations(self, run_id): text=self.automl_user_configurations, artifact_file="automl_configurations/automl_user_configurations.json", ) - self.mlflow_client.log_text( - run_id=run_id, - text=self.automl_display_configurations, - artifact_file="automl_configurations/automl_display_configurations.json", - ) return f"Successfully _log_automl_configurations to run_id {run_id}" def _log_info_to_run(self, info, run_id, log_params=False):