diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 231ec46e6c..da5300437c 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -25,7 +25,6 @@ jobs: matrix: os: [ubuntu-latest, macos-latest, windows-2019] python-version: ["3.7", "3.8", "3.9", "3.10"] - steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} @@ -45,21 +44,18 @@ jobs: export CFLAGS="$CFLAGS -I/usr/local/opt/libomp/include" export CXXFLAGS="$CXXFLAGS -I/usr/local/opt/libomp/include" export LDFLAGS="$LDFLAGS -Wl,-rpath,/usr/local/opt/libomp/lib -L/usr/local/opt/libomp/lib -lomp" - - name: On Linux, install Spark stand-alone cluster and PySpark - if: matrix.os == 'ubuntu-latest' + - name: On Linux + python 3.8, install pyspark 3.2.3 + if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.8' run: | - sudo apt-get update && sudo apt-get install -y --allow-downgrades --allow-change-held-packages --no-install-recommends ca-certificates-java ca-certificates openjdk-17-jdk-headless && sudo apt-get clean && sudo rm -rf /var/lib/apt/lists/* - wget --progress=dot:giga "https://www.apache.org/dyn/closer.lua/spark/spark-3.3.0/spark-3.3.0-bin-hadoop2.tgz?action=download" -O - | tar -xzC /tmp; archive=$(basename "spark-3.3.0/spark-3.3.0-bin-hadoop2.tgz") bash -c "sudo mv -v /tmp/\${archive/%.tgz/} /spark" - pip install --no-cache-dir pyspark>=3.0 - export SPARK_HOME=/spark - export PYTHONPATH=/spark/python/lib/py4j-0.10.9.5-src.zip:/spark/python - export PATH=$PATH:$SPARK_HOME/bin + python -m pip install --upgrade pip wheel + pip install pyspark==3.2.3 - name: Install packages and dependencies run: | python -m pip install --upgrade pip wheel pip install -e . python -c "import flaml" pip install -e .[test] + pip list | grep "pyspark" - name: If linux, install ray 2 if: matrix.os == 'ubuntu-latest' run: | @@ -76,6 +72,11 @@ jobs: if: matrix.python-version != '3.10' run: | pip install -e .[vw] + - name: Uninstall pyspark on python 3.9 + if: matrix.python-version == '3.9' + run: | + # Uninstall pyspark to test env without pyspark + pip uninstall -y pyspark - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names diff --git a/.gitignore b/.gitignore index e87598aa12..641a5d3ee3 100644 --- a/.gitignore +++ b/.gitignore @@ -159,3 +159,6 @@ automl.pkl test/nlp/testtmp.py test/nlp/testtmpfl.py + +flaml/tune/spark/mylearner.py +*.pkl diff --git a/flaml/automl/automl.py b/flaml/automl/automl.py index 8bab84e0cd..2d8ea04a1a 100644 --- a/flaml/automl/automl.py +++ b/flaml/automl/automl.py @@ -7,7 +7,6 @@ import time import os import sys from typing import Callable, List, Union, Optional -import inspect from functools import partial import numpy as np from sklearn.base import BaseEstimator @@ -17,7 +16,6 @@ import json from flaml.automl.state import SearchState, AutoMLState from flaml.automl.ml import ( - compute_estimator, train_estimator, get_estimator_class, ) @@ -31,7 +29,6 @@ from flaml.config import ( N_SPLITS, SAMPLE_MULTIPLY_FACTOR, ) -from flaml.automl.data import concat # TODO check to see when we can remove these from flaml.automl.task.task import CLASSIFICATION, TS_FORECAST, Task @@ -43,6 +40,34 @@ from flaml.default import suggest_learner from flaml.version import __version__ as flaml_version from flaml.tune.spark.utils import check_spark, get_broadcast_data +try: + from flaml.automl.spark.utils import ( + train_test_split_pyspark, + unique_pandas_on_spark, + len_labels, + unique_value_first_index, + ) +except ImportError: + train_test_split_pyspark = None + unique_pandas_on_spark = None + from flaml.automl.utils import ( + len_labels, + unique_value_first_index, + ) +try: + os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" + import pyspark.pandas as ps + from pyspark.pandas import DataFrame as psDataFrame, Series as psSeries + from pyspark.pandas.config import set_option, reset_option +except ImportError: + ps = None + + class psDataFrame: + pass + + class psSeries: + pass + try: import mlflow @@ -511,7 +536,12 @@ class AutoML(BaseEstimator): """Time taken to find best model in seconds.""" return self.__dict__.get("_time_taken_best_iter") - def score(self, X: pd.DataFrame, y: pd.Series, **kwargs): + def score( + self, + X: Union[pd.DataFrame, psDataFrame], + y: Union[pd.Series, psSeries], + **kwargs, + ): estimator = getattr(self, "_trained_estimator", None) if estimator is None: logger.warning( @@ -525,13 +555,14 @@ class AutoML(BaseEstimator): def predict( self, - X: Union[np.array, pd.DataFrame, List[str], List[List[str]]], + X: Union[np.array, pd.DataFrame, List[str], List[List[str]], psDataFrame], **pred_kwargs, ): """Predict label from features. Args: - X: A numpy array of featurized instances, shape n * m, + X: A numpy array or pandas dataframe or pyspark.pandas dataframe + of featurized instances, shape n * m, or for time series forcast tasks: a pandas dataframe with the first column containing timestamp values (datetime type) or an integer n for @@ -1859,7 +1890,19 @@ class AutoML(BaseEstimator): error_metric = "customized metric" logger.info(f"Minimizing error metric: {error_metric}") - estimator_list = task.default_estimator_list(estimator_list) + is_spark_dataframe = isinstance(X_train, psDataFrame) or isinstance( + dataframe, psDataFrame + ) + estimator_list = task.default_estimator_list(estimator_list, is_spark_dataframe) + + if is_spark_dataframe and self._use_spark: + # For spark dataframe, use_spark must be False because spark models are trained in parallel themselves + self._use_spark = False + logger.warning( + "Spark dataframes support only spark.ml type models, which will be trained " + "with spark themselves, no need to start spark trials in flaml. " + "`use_spark` is set to False." + ) # When no search budget is specified if no_budget: diff --git a/flaml/automl/data.py b/flaml/automl/data.py index bfb5bbd5f5..8f3e994fce 100644 --- a/flaml/automl/data.py +++ b/flaml/automl/data.py @@ -12,6 +12,22 @@ from flaml.automl.training_log import training_log_reader from datetime import datetime from typing import TYPE_CHECKING, Union +import os + +try: + os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" + import pyspark.pandas as ps + from pyspark.pandas import DataFrame as psDataFrame, Series as psSeries +except ImportError: + ps = None + + class psDataFrame: + pass + + class psSeries: + pass + + if TYPE_CHECKING: from flaml.automl.task import Task @@ -198,6 +214,15 @@ def get_output_from_log(filename, time_budget): def concat(X1, X2): """concatenate two matrices vertically.""" + if type(X1) != type(X2): + if isinstance(X2, (psDataFrame, psSeries)): + X1 = ps.from_pandas(pd.DataFrame(X1)) + elif isinstance(X1, (psDataFrame, psSeries)): + X2 = ps.from_pandas(pd.DataFrame(X2)) + else: + X1 = pd.DataFrame(X1) + X2 = pd.DataFrame(X2) + if isinstance(X1, (DataFrame, Series)): df = pd.concat([X1, X2], sort=False) df.reset_index(drop=True, inplace=True) @@ -206,6 +231,13 @@ def concat(X1, X2): if len(cat_columns): df[cat_columns] = df[cat_columns].astype("category") return df + if isinstance(X1, (psDataFrame, psSeries)): + df = ps.concat([X1, X2], ignore_index=True) + if isinstance(X1, psDataFrame): + cat_columns = X1.select_dtypes(include="category").columns.values.tolist() + if len(cat_columns): + df[cat_columns] = df[cat_columns].astype("category") + return df if issparse(X1): return vstack((X1, X2)) else: diff --git a/flaml/automl/ml.py b/flaml/automl/ml.py index ae3573a17b..1d717e02e8 100644 --- a/flaml/automl/ml.py +++ b/flaml/automl/ml.py @@ -2,6 +2,7 @@ # * Copyright (c) FLAML authors. All rights reserved. # * Licensed under the MIT License. See LICENSE file in the # * project root for license information. +import os import time import numpy as np import pandas as pd @@ -19,12 +20,6 @@ from sklearn.metrics import ( mean_absolute_percentage_error, ndcg_score, ) -from sklearn.model_selection import ( - RepeatedStratifiedKFold, - GroupKFold, - TimeSeriesSplit, - StratifiedGroupKFold, -) from flaml.automl.model import ( XGBoostSklearnEstimator, XGBoost_TS, @@ -46,14 +41,33 @@ from flaml.automl.model import ( TransformersEstimator, TemporalFusionTransformerEstimator, TransformersEstimatorModelSelection, + SparkLGBMEstimator, ) from flaml.automl.data import group_counts from flaml.automl.task.task import TS_FORECAST, Task from flaml.automl.model import BaseEstimator -import logging +try: + from flaml.automl.spark.utils import len_labels +except ImportError: + from flaml.automl.utils import len_labels +try: + os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" + from pyspark.sql.functions import col + import pyspark.pandas as ps + from pyspark.pandas import DataFrame as psDataFrame, Series as psSeries + from flaml.automl.spark.utils import to_pandas_on_spark, iloc_pandas_on_spark + from flaml.automl.spark.metrics import spark_metric_loss_score +except ImportError: + ps = None + + class psDataFrame: + pass + + class psSeries: + pass + -logger = logging.getLogger(__name__) EstimatorSubclass = TypeVar("EstimatorSubclass", bound=BaseEstimator) sklearn_metric_name_set = { @@ -124,6 +138,8 @@ def get_estimator_class(task: str, estimator_name: str) -> EstimatorSubclass: estimator_class = RF_TS if task in TS_FORECAST else RandomForestEstimator elif "lgbm" == estimator_name: estimator_class = LGBM_TS if task in TS_FORECAST else LGBMEstimator + elif "lgbm_spark" == estimator_name: + estimator_class = SparkLGBMEstimator elif "lrl1" == estimator_name: estimator_class = LRL1Classifier elif "lrl2" == estimator_name: @@ -163,7 +179,15 @@ def metric_loss_score( groups=None, ): # y_processed_predict and y_processed_true are processed id labels if the original were the token labels - if is_in_sklearn_metric_name_set(metric_name): + if isinstance(y_processed_predict, (psDataFrame, psSeries)): + return spark_metric_loss_score( + metric_name, + y_processed_predict, + y_processed_true, + sample_weight, + groups, + ) + elif is_in_sklearn_metric_name_set(metric_name): return sklearn_metric_loss_score( metric_name, y_processed_predict, @@ -359,7 +383,10 @@ def sklearn_metric_loss_score( def get_y_pred(estimator, X, eval_metric, task: Task): if eval_metric in ["roc_auc", "ap", "roc_auc_weighted"] and task.is_binary(): y_pred_classes = estimator.predict_proba(X) - y_pred = y_pred_classes[:, 1] if y_pred_classes.ndim > 1 else y_pred_classes + if isinstance(y_pred_classes, (psSeries, psDataFrame)): + y_pred = y_pred_classes + else: + y_pred = y_pred_classes[:, 1] if y_pred_classes.ndim > 1 else y_pred_classes elif eval_metric in [ "log_loss", "roc_auc", @@ -525,7 +552,7 @@ def compute_estimator( fit_kwargs: Optional[dict] = None, free_mem_ratio=0, ): - if not fit_kwargs: + if fit_kwargs is None: fit_kwargs = {} estimator_class = estimator_class or get_estimator_class(task, estimator_name) @@ -605,7 +632,7 @@ def train_estimator( task=task, n_jobs=n_jobs, ) - if not fit_kwargs: + if fit_kwargs is None: fit_kwargs = {} if isinstance(estimator, TransformersEstimator): diff --git a/flaml/automl/model.py b/flaml/automl/model.py index 96ada5e158..12606a62f8 100644 --- a/flaml/automl/model.py +++ b/flaml/automl/model.py @@ -6,7 +6,7 @@ from contextlib import contextmanager from functools import partial import signal import os -from typing import Callable, List +from typing import Callable, List, Union import numpy as np import time from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier @@ -36,6 +36,38 @@ from flaml.automl.task.task import ( NLG_TASKS, ) +try: + from flaml.automl.spark.utils import len_labels, to_pandas_on_spark +except ImportError: + from flaml.automl.utils import len_labels + + to_pandas_on_spark = None +from flaml.automl.spark.configs import ( + ParamList_LightGBM_Classifier, + ParamList_LightGBM_Regressor, + ParamList_LightGBM_Ranker, +) + +try: + os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" + from pyspark.sql.dataframe import DataFrame as sparkDataFrame + from pyspark.sql import SparkSession + from pyspark.pandas import DataFrame as psDataFrame, Series as psSeries + + _have_spark = True +except ImportError: + _have_spark = False + + class psDataFrame: + pass + + class psSeries: + pass + + class sparkDataFrame: + pass + + try: import psutil except ImportError: @@ -388,6 +420,323 @@ class BaseEstimator: return params +class SparkEstimator(BaseEstimator): + """The base class for fine-tuning spark models, using pyspark.ml and SynapseML API.""" + + def __init__(self, task="binary", **config): + if not _have_spark: + raise ImportError( + "pyspark is not installed. Try `pip install flaml[spark]`." + ) + super().__init__(task, **config) + self.df_train = None + + def _preprocess( + self, + X_train: Union[psDataFrame, sparkDataFrame], + y_train: psSeries = None, + index_col: str = "tmp_index_col", + ): + # TODO: optimize this, support pyspark.sql.DataFrame + if y_train is not None: + self.df_train = X_train.join(y_train) + else: + self.df_train = X_train + if isinstance(self.df_train, psDataFrame): + self.df_train = self.df_train.to_spark(index_col=index_col) + return self.df_train + + def fit( + self, + X_train: psDataFrame, + y_train: psSeries = None, + budget=None, + free_mem_ratio=0, + index_col: str = "tmp_index_col", + **kwargs, + ): + """Train the model from given training data. + Args: + X_train: A pyspark.pandas DataFrame of training data in shape n*m. + y_train: A pyspark.pandas Series in shape n*1. None if X_train is a pyspark.pandas + Dataframe contains y_train. + budget: A float of the time budget in seconds. + free_mem_ratio: A float between 0 and 1 for the free memory ratio to keep during training. + Returns: + train_time: A float of the training time in seconds. + """ + df_train = self._preprocess(X_train, y_train, index_col=index_col) + train_time = self._fit(df_train, **kwargs) + return train_time + + def _fit(self, df_train: sparkDataFrame, **kwargs): + current_time = time.time() + pipeline_model = self.estimator_class(**self.params, **kwargs) + if logger.level == logging.DEBUG: + logger.debug( + f"flaml.model - {pipeline_model} fit started with params {self.params}" + ) + pipeline_model.fit(df_train) + if logger.level == logging.DEBUG: + logger.debug(f"flaml.model - {pipeline_model} fit finished") + train_time = time.time() - current_time + self._model = pipeline_model + return train_time + + def predict(self, X, index_col="tmp_index_col", return_all=False, **kwargs): + """Predict label from features. + Args: + X: A pyspark or pyspark.pandas dataframe of featurized instances, shape n*m. + index_col: A str of the index column name. Default to "tmp_index_col". + return_all: A bool of whether to return all the prediction results. Default to False. + Returns: + A pyspark.pandas series of shape n*1 if return_all is False. Otherwise, a pyspark.pandas dataframe. + """ + if self._model is not None: + X = self._preprocess(X, index_col=index_col) + predictions = to_pandas_on_spark( + self._model.transform(X), index_col=index_col + ) + predictions.index.name = None + pred_y = predictions["prediction"] + if return_all: + return predictions + else: + return pred_y + else: + logger.warning( + "Estimator is not fit yet. Please run fit() before predict()." + ) + return np.ones(X.shape[0]) + + def predict_proba(self, X, index_col="tmp_index_col", return_all=False, **kwargs): + """Predict the probability of each class from features. + Only works for classification problems + Args: + X: A pyspark or pyspark.pandas dataframe of featurized instances, shape n*m. + index_col: A str of the index column name. Default to "tmp_index_col". + return_all: A bool of whether to return all the prediction results. Default to False. + Returns: + A pyspark.pandas dataframe of shape n*c. c is the # classes. + Each element at (i,j) is the probability for instance i to be in + class j. + """ + assert self._task in CLASSIFICATION, "predict_proba() only for classification." + if self._model is not None: + X = self._preprocess(X, index_col=index_col) + predictions = to_pandas_on_spark( + self._model.transform(X), index_col=index_col + ) + predictions.index.name = None + pred_y = predictions["probability"] + + if return_all: + return predictions + else: + return pred_y + else: + logger.warning( + "Estimator is not fit yet. Please run fit() before predict()." + ) + return np.ones(X.shape[0]) + + +class SparkLGBMEstimator(SparkEstimator): + """The class for fine-tuning spark version lightgbm models, using SynapseML API.""" + + """The class for tuning LGBM, using sklearn API.""" + + ITER_HP = "numIterations" + DEFAULT_ITER = 100 + + @classmethod + def search_space(cls, data_size, **params): + upper = max(5, min(32768, int(data_size[0]))) # upper must be larger than lower + # https://github.com/microsoft/SynapseML/blob/master/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala + return { + "numIterations": { + "domain": tune.lograndint(lower=4, upper=upper), + "init_value": 4, + "low_cost_init_value": 4, + }, + "numLeaves": { + "domain": tune.lograndint(lower=4, upper=upper), + "init_value": 4, + "low_cost_init_value": 4, + }, + "minDataInLeaf": { + "domain": tune.lograndint(lower=2, upper=2**7 + 1), + "init_value": 20, + }, + "learningRate": { + "domain": tune.loguniform(lower=1 / 1024, upper=1.0), + "init_value": 0.1, + }, + "log_max_bin": { # log transformed with base 2 + "domain": tune.lograndint(lower=3, upper=11), + "init_value": 8, + }, + "featureFraction": { + "domain": tune.uniform(lower=0.01, upper=1.0), + "init_value": 1.0, + }, + "lambdaL1": { + "domain": tune.loguniform(lower=1 / 1024, upper=1024), + "init_value": 1 / 1024, + }, + "lambdaL2": { + "domain": tune.loguniform(lower=1 / 1024, upper=1024), + "init_value": 1.0, + }, + } + + def config2params(self, config: dict) -> dict: + params = super().config2params(config) + if "n_jobs" in params: + params.pop("n_jobs") + if "log_max_bin" in params: + params["maxBin"] = (1 << params.pop("log_max_bin")) - 1 + return params + + @classmethod + def size(cls, config): + num_leaves = int( + round(config.get("numLeaves") or 1 << config.get("maxDepth", 16)) + ) + n_estimators = int(round(config["numIterations"])) + return (num_leaves * 3 + (num_leaves - 1) * 4 + 1.0) * n_estimators * 8 + + def __init__(self, task="binary", **config): + super().__init__(task, **config) + err_msg = ( + "SynapseML is not installed. Please refer to [SynapseML]" + + "(https://github.com/microsoft/SynapseML) for installation instructions." + ) + if "regression" == task: + try: + from synapse.ml.lightgbm import LightGBMRegressor + except ImportError: + raise ImportError(err_msg) + + self.estimator_class = LightGBMRegressor + self.estimator_params = ParamList_LightGBM_Regressor + elif "rank" == task: + try: + from synapse.ml.lightgbm import LightGBMRanker + except ImportError: + raise ImportError(err_msg) + + self.estimator_class = LightGBMRanker + self.estimator_params = ParamList_LightGBM_Ranker + else: + try: + from synapse.ml.lightgbm import LightGBMClassifier + except ImportError: + raise ImportError(err_msg) + + self.estimator_class = LightGBMClassifier + self.estimator_params = ParamList_LightGBM_Classifier + self._time_per_iter = None + self._train_size = 0 + self._mem_per_iter = -1 + self.model_classes_ = None + self.model_n_classes_ = None + + def fit( + self, + X_train, + y_train=None, + budget=None, + free_mem_ratio=0, + index_col="tmp_index_col", + **kwargs, + ): + start_time = time.time() + if self.model_n_classes_ is None and self._task not in ["regression", "rank"]: + self.model_n_classes_, self.model_classes_ = len_labels( + y_train, return_labels=True + ) + df_train = self._preprocess(X_train, y_train, index_col=index_col) + # n_iter = self.params.get(self.ITER_HP, self.DEFAULT_ITER) + # trained = False + # mem0 = psutil.virtual_memory().available if psutil is not None else 1 + _kwargs = kwargs.copy() + if self._task not in ["regression", "rank"] and "objective" not in _kwargs: + _kwargs["objective"] = ( + "binary" if self.model_n_classes_ == 2 else "multiclass" + ) + for k in list(_kwargs.keys()): + if k not in self.estimator_params: + logger.warning( + f"[SparkLGBMEstimator] [Warning] Ignored unknown parameter: {k}" + ) + _kwargs.pop(k) + # TODO: find a better estimation of early stopping + # if ( + # (not self._time_per_iter or abs(self._train_size - df_train.count()) > 4) + # and budget is not None + # or self._mem_per_iter < 0 + # and psutil is not None + # ) and n_iter > 1: + # self.params[self.ITER_HP] = 1 + # self._t1 = self._fit(df_train, **_kwargs) + # if budget is not None and self._t1 >= budget or n_iter == 1: + # return self._t1 + # mem1 = psutil.virtual_memory().available if psutil is not None else 1 + # self._mem1 = mem0 - mem1 + # self.params[self.ITER_HP] = min(n_iter, 4) + # self._t2 = self._fit(df_train, **_kwargs) + # mem2 = psutil.virtual_memory().available if psutil is not None else 1 + # self._mem2 = max(mem0 - mem2, self._mem1) + # self._mem_per_iter = min(self._mem1, self._mem2 / self.params[self.ITER_HP]) + # self._time_per_iter = ( + # (self._t2 - self._t1) / (self.params[self.ITER_HP] - 1) + # if self._t2 > self._t1 + # else self._t1 + # if self._t1 + # else 0.001 + # ) + # self._train_size = df_train.count() + # if ( + # budget is not None + # and self._t1 + self._t2 >= budget + # or n_iter == self.params[self.ITER_HP] + # ): + # # self.params[self.ITER_HP] = n_iter + # return time.time() - start_time + # trained = True + # if n_iter > 1: + # max_iter = min( + # n_iter, + # int( + # (budget - time.time() + start_time - self._t1) / self._time_per_iter + # + 1 + # ) + # if budget is not None + # else n_iter, + # ) + # if trained and max_iter <= self.params[self.ITER_HP]: + # return time.time() - start_time + # # when not trained, train at least one iter + # self.params[self.ITER_HP] = max(max_iter, 1) + self._fit(df_train, **_kwargs) + train_time = time.time() - start_time + return train_time + + def _fit(self, df_train: sparkDataFrame, **kwargs): + current_time = time.time() + model = self.estimator_class(**self.params, **kwargs) + if logger.level == logging.DEBUG: + logger.debug(f"flaml.model - {model} fit started with params {self.params}") + self._model = model.fit(df_train) + self._model.classes_ = self.model_classes_ + self._model.n_classes_ = self.model_n_classes_ + if logger.level == logging.DEBUG: + logger.debug(f"flaml.model - {model} fit finished") + train_time = time.time() - current_time + return train_time + + class TransformersEstimator(BaseEstimator): """The class for fine-tuning language models, using huggingface transformers API.""" diff --git a/flaml/automl/spark/__init__.py b/flaml/automl/spark/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/flaml/automl/spark/configs.py b/flaml/automl/spark/configs.py new file mode 100644 index 0000000000..26584dc479 --- /dev/null +++ b/flaml/automl/spark/configs.py @@ -0,0 +1,97 @@ +ParamList_LightGBM_Base = [ + "baggingFraction", + "baggingFreq", + "baggingSeed", + "binSampleCount", + "boostFromAverage", + "boostingType", + "catSmooth", + "categoricalSlotIndexes", + "categoricalSlotNames", + "catl2", + "chunkSize", + "dataRandomSeed", + "defaultListenPort", + "deterministic", + "driverListenPort", + "dropRate", + "dropSeed", + "earlyStoppingRound", + "executionMode", + "extraSeed" "featureFraction", + "featureFractionByNode", + "featureFractionSeed", + "featuresCol", + "featuresShapCol", + "fobj" "improvementTolerance", + "initScoreCol", + "isEnableSparse", + "isProvideTrainingMetric", + "labelCol", + "lambdaL1", + "lambdaL2", + "leafPredictionCol", + "learningRate", + "matrixType", + "maxBin", + "maxBinByFeature", + "maxCatThreshold", + "maxCatToOnehot", + "maxDeltaStep", + "maxDepth", + "maxDrop", + "metric", + "microBatchSize", + "minDataInLeaf", + "minDataPerBin", + "minDataPerGroup", + "minGainToSplit", + "minSumHessianInLeaf", + "modelString", + "monotoneConstraints", + "monotoneConstraintsMethod", + "monotonePenalty", + "negBaggingFraction", + "numBatches", + "numIterations", + "numLeaves", + "numTasks", + "numThreads", + "objectiveSeed", + "otherRate", + "parallelism", + "passThroughArgs", + "posBaggingFraction", + "predictDisableShapeCheck", + "predictionCol", + "repartitionByGroupingColumn", + "seed", + "skipDrop", + "slotNames", + "timeout", + "topK", + "topRate", + "uniformDrop", + "useBarrierExecutionMode", + "useMissing", + "useSingleDatasetMode", + "validationIndicatorCol", + "verbosity", + "weightCol", + "xGBoostDartMode", + "zeroAsMissing", + "objective", +] +ParamList_LightGBM_Classifier = ParamList_LightGBM_Base + [ + "isUnbalance", + "probabilityCol", + "rawPredictionCol", + "thresholds", +] +ParamList_LightGBM_Regressor = ParamList_LightGBM_Base + ["tweedieVariancePower"] +ParamList_LightGBM_Ranker = ParamList_LightGBM_Base + [ + "groupCol", + "evalAt", + "labelGain", + "maxPosition", +] diff --git a/flaml/automl/spark/metrics.py b/flaml/automl/spark/metrics.py new file mode 100644 index 0000000000..bd9840d9e1 --- /dev/null +++ b/flaml/automl/spark/metrics.py @@ -0,0 +1,230 @@ +import logging +import os +import numpy as np +from typing import Union + +try: + os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" + from pyspark.sql import DataFrame + import pyspark.pandas as ps + from pyspark.ml.evaluation import ( + BinaryClassificationEvaluator, + RegressionEvaluator, + MulticlassClassificationEvaluator, + MultilabelClassificationEvaluator, + RankingEvaluator, + ) + import pyspark.sql.functions as F +except ImportError: + msg = """use_spark=True requires installation of PySpark. Please run pip install flaml[spark] + and check [here](https://spark.apache.org/docs/latest/api/python/getting_started/install.html) + for more details about installing Spark.""" + raise ImportError(msg) + + +def ps_group_counts(groups: Union[ps.Series, np.ndarray]) -> np.ndarray: + if isinstance(groups, np.ndarray): + _, i, c = np.unique(groups, return_counts=True, return_index=True) + else: + i = groups.drop_duplicates().index.values + c = groups.value_counts().sort_index().to_numpy() + return c[np.argsort(i)].tolist() + + +def _process_df(df, label_col, prediction_col): + df = df.withColumn(label_col, F.array([df[label_col]])) + df = df.withColumn(prediction_col, F.array([df[prediction_col]])) + return df + + +def _compute_label_from_probability(df, probability_col, prediction_col): + # array_max finds the maximum value in the 'probability' array + # array_position finds the index of the maximum value in the 'probability' array + max_index_expr = F.expr( + f"array_position({probability_col}, array_max({probability_col}))-1" + ) + # Create a new column 'prediction' based on the maximum probability value + df = df.withColumn(prediction_col, max_index_expr.cast("double")) + return df + + +def spark_metric_loss_score( + metric_name: str, + y_predict: ps.Series, + y_true: ps.Series, + sample_weight: ps.Series = None, + groups: ps.Series = None, +) -> float: + """ + Compute the loss score of a metric for spark models. + + Args: + metric_name: str | the name of the metric. + y_predict: ps.Series | the predicted values. + y_true: ps.Series | the true values. + sample_weight: ps.Series | the sample weights. Default: None. + groups: ps.Series | the group of each row. Default: None. + + Returns: + float | the loss score. A lower value indicates a better model. + """ + label_col = "label" + prediction_col = "prediction" + kwargs = {} + + y_predict.name = prediction_col + y_true.name = label_col + df = y_predict.to_frame().join(y_true) + if sample_weight is not None: + sample_weight.name = "weight" + df = df.join(sample_weight) + kwargs = {"weightCol": "weight"} + + df = df.to_spark() + + metric_name = metric_name.lower() + min_mode_metrics = ["log_loss", "rmse", "mse", "mae"] + + if metric_name == "rmse": + evaluator = RegressionEvaluator( + metricName="rmse", + labelCol=label_col, + predictionCol=prediction_col, + **kwargs, + ) + elif metric_name == "mse": + evaluator = RegressionEvaluator( + metricName="mse", + labelCol=label_col, + predictionCol=prediction_col, + **kwargs, + ) + elif metric_name == "mae": + evaluator = RegressionEvaluator( + metricName="mae", + labelCol=label_col, + predictionCol=prediction_col, + **kwargs, + ) + elif metric_name == "r2": + evaluator = RegressionEvaluator( + metricName="r2", + labelCol=label_col, + predictionCol=prediction_col, + **kwargs, + ) + elif metric_name == "var": + evaluator = RegressionEvaluator( + metricName="var", + labelCol=label_col, + predictionCol=prediction_col, + **kwargs, + ) + elif metric_name == "roc_auc": + evaluator = BinaryClassificationEvaluator( + metricName="areaUnderROC", + labelCol=label_col, + rawPredictionCol=prediction_col, + **kwargs, + ) + elif metric_name == "pr_auc": + evaluator = BinaryClassificationEvaluator( + metricName="areaUnderPR", + labelCol=label_col, + rawPredictionCol=prediction_col, + **kwargs, + ) + elif metric_name == "accuracy": + evaluator = MulticlassClassificationEvaluator( + metricName="accuracy", + labelCol=label_col, + predictionCol=prediction_col, + **kwargs, + ) + elif metric_name == "log_loss": + # For log_loss, prediction_col should be probability, and we need to convert it to label + df = _compute_label_from_probability( + df, prediction_col, prediction_col + "_label" + ) + evaluator = MulticlassClassificationEvaluator( + metricName="logLoss", + labelCol=label_col, + predictionCol=prediction_col + "_label", + probabilityCol=prediction_col, + **kwargs, + ) + elif metric_name == "f1": + evaluator = MulticlassClassificationEvaluator( + metricName="f1", + labelCol=label_col, + predictionCol=prediction_col, + **kwargs, + ) + elif metric_name == "micro_f1": + evaluator = MultilabelClassificationEvaluator( + metricName="microF1Measure", + labelCol=label_col, + predictionCol=prediction_col, + **kwargs, + ) + elif metric_name == "macro_f1": + evaluator = MultilabelClassificationEvaluator( + metricName="f1MeasureByLabel", + labelCol=label_col, + predictionCol=prediction_col, + **kwargs, + ) + elif metric_name == "ap": + evaluator = RankingEvaluator( + metricName="meanAveragePrecision", + labelCol=label_col, + predictionCol=prediction_col, + ) + elif "ndcg" in metric_name: + # TODO: check if spark.ml ranker has the same format with + # synapseML ranker, may need to adjust the format of df + if "@" in metric_name: + k = int(metric_name.split("@", 1)[-1]) + if groups is None: + evaluator = RankingEvaluator( + metricName="ndcgAtK", + labelCol=label_col, + predictionCol=prediction_col, + k=k, + ) + df = _process_df(df, label_col, prediction_col) + score = 1 - evaluator.evaluate(df) + else: + counts = ps_group_counts(groups) + score = 0 + psum = 0 + for c in counts: + y_true_ = y_true[psum : psum + c] + y_predict_ = y_predict[psum : psum + c] + df = y_true_.to_frame().join(y_predict_).to_spark() + df = _process_df(df, label_col, prediction_col) + evaluator = RankingEvaluator( + metricName="ndcgAtK", + labelCol=label_col, + predictionCol=prediction_col, + k=k, + ) + score -= evaluator.evaluate(df) + psum += c + score /= len(counts) + score += 1 + else: + evaluator = RankingEvaluator( + metricName="ndcgAtK", labelCol=label_col, predictionCol=prediction_col + ) + df = _process_df(df, label_col, prediction_col) + score = 1 - evaluator.evaluate(df) + return score + else: + raise ValueError(f"Unknown metric name: {metric_name} for spark models.") + + return ( + evaluator.evaluate(df) + if metric_name in min_mode_metrics + else 1 - evaluator.evaluate(df) + ) diff --git a/flaml/automl/spark/utils.py b/flaml/automl/spark/utils.py new file mode 100644 index 0000000000..1e7eac7f8d --- /dev/null +++ b/flaml/automl/spark/utils.py @@ -0,0 +1,264 @@ +import logging +import os +from typing import Union, List, Optional, Tuple +import pandas as pd +import numpy as np + +logger = logging.getLogger(__name__) +logger_formatter = logging.Formatter( + "[%(name)s: %(asctime)s] {%(lineno)d} %(levelname)s - %(message)s", "%m-%d %H:%M:%S" +) +logger.propagate = False +try: + os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" + from pyspark.sql import SparkSession + from pyspark.sql import DataFrame + import pyspark.pandas as ps + from pyspark.util import VersionUtils + import pyspark.sql.functions as F + import pyspark.sql.types as T + import pyspark + + _spark_major_minor_version = VersionUtils.majorMinorVersion(pyspark.__version__) +except ImportError: + msg = """use_spark=True requires installation of PySpark. Please run pip install flaml[spark] + and check [here](https://spark.apache.org/docs/latest/api/python/getting_started/install.html) + for more details about installing Spark.""" + raise ImportError(msg) + + +def to_pandas_on_spark( + df: Union[pd.DataFrame, DataFrame, pd.Series, ps.DataFrame, ps.Series], + index_col: Optional[str] = None, + default_index_type: Optional[str] = "distributed-sequence", +) -> Union[ps.DataFrame, ps.Series]: + """Convert pandas or pyspark dataframe/series to pandas_on_Spark dataframe/series. + + Args: + df: pandas.DataFrame/series or pyspark dataframe | The input dataframe/series. + index_col: str, optional | The column name to use as index, default None. + default_index_type: str, optional | The default index type, default "distributed-sequence". + + Returns: + pyspark.pandas.DataFrame/Series: The converted pandas-on-Spark dataframe/series. + + ```python + import pandas as pd + from flaml.automl.spark.utils import to_pandas_on_spark + + pdf = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) + psdf = to_pandas_on_spark(pdf) + print(psdf) + + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + sdf = spark.createDataFrame(pdf) + psdf = to_pandas_on_spark(sdf) + print(psdf) + + pds = pd.Series([1, 2, 3]) + pss = to_pandas_on_spark(pds) + print(pss) + ``` + """ + ps.set_option("compute.default_index_type", default_index_type) + if isinstance(df, (pd.DataFrame, pd.Series)): + return ps.from_pandas(df) + elif isinstance(df, DataFrame): + if _spark_major_minor_version[0] == 3 and _spark_major_minor_version[1] < 3: + return df.to_pandas_on_spark(index_col=index_col) + else: + return df.pandas_api(index_col=index_col) + elif isinstance(df, (ps.DataFrame, ps.Series)): + return df + else: + raise TypeError( + f"{type(df)} is not one of pandas.DataFrame, pandas.Series and pyspark.sql.DataFrame" + ) + + +def train_test_split_pyspark( + df: Union[DataFrame, ps.DataFrame], + stratify_column: Optional[str] = None, + test_fraction: Optional[float] = 0.2, + seed: Optional[int] = 1234, + to_pandas_spark: Optional[bool] = True, + index_col: Optional[str] = "tmp_index_col", +) -> Tuple[Union[DataFrame, ps.DataFrame], Union[DataFrame, ps.DataFrame]]: + """Split a pyspark dataframe into train and test dataframes. + + Args: + df: pyspark.sql.DataFrame | The input dataframe. + stratify_column: str | The column name to stratify the split. Default None. + test_fraction: float | The fraction of the test data. Default 0.2. + seed: int | The random seed. Default 1234. + to_pandas_spark: bool | Whether to convert the output to pandas_on_spark. Default True. + index_col: str | The column name to use as index. Default None. + + Returns: + pyspark.sql.DataFrame/pandas_on_spark DataFrame | The train dataframe. + pyspark.sql.DataFrame/pandas_on_spark DataFrame | The test dataframe. + """ + if isinstance(df, ps.DataFrame): + df = df.to_spark(index_col=index_col) + + if stratify_column: + # Test data + test_fraction_dict = ( + df.select(stratify_column) + .distinct() + .withColumn("fraction", F.lit(test_fraction)) + .rdd.collectAsMap() + ) + df_test = df.stat.sampleBy(stratify_column, test_fraction_dict, seed) + # Train data + df_train = df.subtract(df_test) + else: + df_train, df_test = df.randomSplit([1 - test_fraction, test_fraction], seed) + + if to_pandas_spark: + df_train = to_pandas_on_spark(df_train, index_col=index_col) + df_test = to_pandas_on_spark(df_test, index_col=index_col) + df_train.index.name = None + df_test.index.name = None + elif index_col == "tmp_index_col": + df_train = df_train.drop(index_col) + df_test = df_test.drop(index_col) + return [df_train, df_test] + + +def unique_pandas_on_spark( + psds: Union[ps.Series, ps.DataFrame] +) -> Tuple[np.ndarray, np.ndarray]: + """Get the unique values and counts of a pandas_on_spark series.""" + if isinstance(psds, ps.DataFrame): + psds = psds.iloc[:, 0] + _tmp = psds.value_counts().to_pandas() + label_set = _tmp.index.values + counts = _tmp.values + return label_set, counts + + +def len_labels( + y: Union[ps.Series, np.ndarray], return_labels=False +) -> Union[int, Optional[np.ndarray]]: + """Get the number of unique labels in y.""" + if not isinstance(y, (ps.DataFrame, ps.Series)): + labels = np.unique(y) + else: + labels = y.unique() if isinstance(y, ps.Series) else y.iloc[:, 0].unique() + if return_labels: + return len(labels), labels + return len(labels) + + +def unique_value_first_index( + y: Union[pd.Series, ps.Series, np.ndarray] +) -> Tuple[np.ndarray, np.ndarray]: + """Get the unique values and indices of a pandas series, + pandas_on_spark series or numpy array.""" + if isinstance(y, ps.Series): + y_unique = y.drop_duplicates().sort_index() + label_set = y_unique.values + first_index = y_unique.index.values + else: + label_set, first_index = np.unique(y, return_index=True) + return label_set, first_index + + +def iloc_pandas_on_spark( + psdf: Union[ps.DataFrame, ps.Series, pd.DataFrame, pd.Series], + index: Union[int, slice, list], + index_col: Optional[str] = "tmp_index_col", +) -> Union[ps.DataFrame, ps.Series]: + """Get the rows of a pandas_on_spark dataframe/series by index.""" + if isinstance(psdf, (pd.DataFrame, pd.Series)): + return psdf.iloc[index] + if isinstance(index, (int, slice)): + if isinstance(psdf, ps.Series): + return psdf.iloc[index] + else: + return psdf.iloc[index, :] + elif isinstance(index, list): + if isinstance(psdf, ps.Series): + sdf = psdf.to_frame().to_spark(index_col=index_col) + else: + if index_col not in psdf.columns: + sdf = psdf.to_spark(index_col=index_col) + else: + sdf = psdf.to_spark() + sdfiloc = sdf.filter(F.col(index_col).isin(index)) + psdfiloc = to_pandas_on_spark(sdfiloc) + if isinstance(psdf, ps.Series): + psdfiloc = psdfiloc[psdfiloc.columns.drop(index_col)[0]] + elif index_col not in psdf.columns: + psdfiloc = psdfiloc.drop(columns=[index_col]) + return psdfiloc + else: + raise TypeError( + f"{type(index)} is not one of int, slice and list for pandas_on_spark iloc" + ) + + +def spark_kFold( + dataset: Union[DataFrame, ps.DataFrame], + nFolds: int = 3, + foldCol: str = "", + seed: int = 42, + index_col: Optional[str] = "tmp_index_col", +) -> List[Tuple[ps.DataFrame, ps.DataFrame]]: + """Generate k-fold splits for a Spark DataFrame. + Adopted from https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/tuning.html#CrossValidator + + Args: + dataset: DataFrame / ps.DataFrame. | The DataFrame to split. + nFolds: int | The number of folds. Default is 3. + foldCol: str | The column name to use for fold numbers. If not specified, + the DataFrame will be randomly split. Default is "". + The same group will not appear in two different folds (the number of + distinct groups has to be at least equal to the number of folds). + The folds are approximately balanced in the sense that the number of + distinct groups is approximately the same in each fold. + seed: int | The random seed. Default is 42. + index_col: str | The name of the index column. Default is "tmp_index_col". + + Returns: + A list of (train, validation) DataFrames. + """ + if isinstance(dataset, ps.DataFrame): + dataset = dataset.to_spark(index_col=index_col) + + datasets = [] + if not foldCol: + # Do random k-fold split. + h = 1.0 / nFolds + randCol = f"rand_col_{seed}" + df = dataset.select("*", F.rand(seed).alias(randCol)) + for i in range(nFolds): + validateLB = i * h + validateUB = (i + 1) * h + condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB) + validation = to_pandas_on_spark(df.filter(condition), index_col=index_col) + train = to_pandas_on_spark(df.filter(~condition), index_col=index_col) + datasets.append( + (train.drop(columns=[randCol]), validation.drop(columns=[randCol])) + ) + else: + # Use user-specified fold column + def get_fold_num(foldNum: int) -> int: + return int(foldNum % nFolds) + + get_fold_num_udf = F.UserDefinedFunction(get_fold_num, T.IntegerType()) + for i in range(nFolds): + training = dataset.filter(get_fold_num_udf(dataset[foldCol]) != F.lit(i)) + validation = dataset.filter(get_fold_num_udf(dataset[foldCol]) == F.lit(i)) + if training.rdd.getNumPartitions() == 0 or len(training.take(1)) == 0: + raise ValueError("The training data at fold %s is empty." % i) + if validation.rdd.getNumPartitions() == 0 or len(validation.take(1)) == 0: + raise ValueError("The validation data at fold %s is empty." % i) + training = to_pandas_on_spark(training, index_col=index_col) + validation = to_pandas_on_spark(validation, index_col=index_col) + datasets.append((training, validation)) + + return datasets diff --git a/flaml/automl/state.py b/flaml/automl/state.py index c10a2c83f9..773275020e 100644 --- a/flaml/automl/state.py +++ b/flaml/automl/state.py @@ -1,5 +1,6 @@ import inspect import time +import os from typing import Any, Optional import numpy as np @@ -10,6 +11,34 @@ from flaml.automl.logger import logger from flaml.automl.ml import compute_estimator, train_estimator from flaml.automl.task.task import TS_FORECAST +try: + from flaml.automl.spark.utils import ( + train_test_split_pyspark, + unique_pandas_on_spark, + len_labels, + unique_value_first_index, + ) +except ImportError: + train_test_split_pyspark = None + unique_pandas_on_spark = None + from flaml.automl.utils import ( + len_labels, + unique_value_first_index, + ) +try: + os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" + import pyspark.pandas as ps + from pyspark.pandas import DataFrame as psDataFrame, Series as psSeries + from pyspark.pandas.config import set_option, reset_option +except ImportError: + ps = None + + class psDataFrame: + pass + + class psSeries: + pass + class SearchState: @property @@ -241,11 +270,11 @@ class AutoMLState: def _prepare_sample_train_data(self, sample_size: int): sampled_weight = groups = None if sample_size <= self.data_size[0]: - if isinstance(self.X_train, pd.DataFrame): + if isinstance(self.X_train, (pd.DataFrame, psDataFrame)): sampled_X_train = self.X_train.iloc[:sample_size] else: sampled_X_train = self.X_train[:sample_size] - if isinstance(self.y_train, pd.Series): + if isinstance(self.y_train, (pd.Series, psSeries)): sampled_y_train = self.y_train.iloc[:sample_size] else: sampled_y_train = self.y_train[:sample_size] @@ -255,13 +284,13 @@ class AutoMLState: if weight is not None: sampled_weight = ( weight.iloc[:sample_size] - if isinstance(weight, pd.Series) + if isinstance(weight, (pd.Series, psSeries)) else weight[:sample_size] ) if self.groups is not None: groups = ( self.groups.iloc[:sample_size] - if isinstance(self.groups, pd.Series) + if isinstance(self.groups, (pd.Series, psSeries)) else self.groups[:sample_size] ) else: diff --git a/flaml/automl/task/generic_task.py b/flaml/automl/task/generic_task.py index dc812bc80d..aed8866452 100644 --- a/flaml/automl/task/generic_task.py +++ b/flaml/automl/task/generic_task.py @@ -1,3 +1,4 @@ +import os import logging import time from typing import List, Optional @@ -30,6 +31,7 @@ from flaml.automl.model import ( KNeighborsEstimator, TransformersEstimator, TransformersEstimatorModelSelection, + SparkLGBMEstimator, ) from flaml.automl.task.task import ( Task, @@ -39,6 +41,40 @@ from flaml.automl.task.task import ( ) from flaml.config import RANDOM_SEED +try: + os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" + from pyspark.sql.functions import col + import pyspark.pandas as ps + from pyspark.pandas import DataFrame as psDataFrame, Series as psSeries + from pyspark.pandas.config import set_option, reset_option + from flaml.automl.spark.utils import ( + to_pandas_on_spark, + iloc_pandas_on_spark, + spark_kFold, + train_test_split_pyspark, + unique_pandas_on_spark, + unique_value_first_index, + len_labels, + ) + from flaml.automl.spark.metrics import spark_metric_loss_score +except ImportError: + train_test_split_pyspark = None + unique_pandas_on_spark = None + iloc_pandas_on_spark = None + from flaml.automl.utils import ( + len_labels, + unique_value_first_index, + ) + + ps = None + + class psDataFrame: + pass + + class psSeries: + pass + + logger = logging.getLogger(__name__) @@ -55,6 +91,7 @@ class GenericTask(Task): "kneighbor": KNeighborsEstimator, "transformer": TransformersEstimator, "transformer_ms": TransformersEstimatorModelSelection, + "lgbm_spark": SparkLGBMEstimator, } def validate_data( @@ -71,17 +108,15 @@ class GenericTask(Task): groups=None, ): if X_train_all is not None and y_train_all is not None: - assert ( - isinstance(X_train_all, np.ndarray) - or issparse(X_train_all) - or isinstance(X_train_all, pd.DataFrame) - ), ( + assert isinstance( + X_train_all, (np.ndarray, pd.DataFrame, psDataFrame) + ) or issparse(X_train_all), ( "X_train_all must be a numpy array, a pandas dataframe, " - "or Scipy sparse matrix." + "a Scipy sparse matrix or a pyspark.pandas dataframe." ) - assert isinstance(y_train_all, np.ndarray) or isinstance( - y_train_all, pd.Series - ), "y_train_all must be a numpy array or a pandas series." + assert isinstance( + y_train_all, (np.ndarray, pd.Series, psSeries) + ), "y_train_all must be a numpy array, a pandas series or a pyspark.pandas series." assert ( X_train_all.size != 0 and y_train_all.size != 0 ), "Input data must not be empty." @@ -92,22 +127,42 @@ class GenericTask(Task): assert ( X_train_all.shape[0] == y_train_all.shape[0] ), "# rows in X_train must match length of y_train." - automl._df = isinstance(X_train_all, pd.DataFrame) + if isinstance(X_train_all, psDataFrame): + X_train_all = ( + X_train_all.spark.cache() + ) # cache data to improve compute speed + y_train_all = y_train_all.to_frame().spark.cache()[y_train_all.name] + logger.debug( + f"X_train_all and y_train_all cached, shape of X_train_all: {X_train_all.shape}" + ) + automl._df = isinstance(X_train_all, (pd.DataFrame, psDataFrame)) automl._nrow, automl._ndim = X_train_all.shape if self.is_ts_forecast(): - X_train_all = pd.DataFrame(X_train_all) + X_train_all = ( + pd.DataFrame(X_train_all) + if isinstance(X_train_all, np.ndarray) + else X_train_all + ) X_train_all, y_train_all = self._validate_ts_data( X_train_all, y_train_all ) X, y = X_train_all, y_train_all elif dataframe is not None and label is not None: assert isinstance( - dataframe, pd.DataFrame - ), "dataframe must be a pandas DataFrame" - assert label in dataframe.columns, "label must a column name in dataframe" + dataframe, (pd.DataFrame, psDataFrame) + ), "dataframe must be a pandas DataFrame or a pyspark.pandas DataFrame." + assert ( + label in dataframe.columns + ), f"The provided label column name `{label}` doesn't exist in the provided dataframe." + if isinstance(dataframe, psDataFrame): + dataframe = ( + dataframe.spark.cache() + ) # cache data to improve compute speed + logger.debug(f"dataframe cached, shape of dataframe: {dataframe.shape}") automl._df = True if self.is_ts_forecast(): dataframe = self._validate_ts_data(dataframe) + # TODO: to support pyspark.sql.DataFrame and pure dataframe mode X = dataframe.drop(columns=label) automl._nrow, automl._ndim = X.shape y = dataframe[label] @@ -125,7 +180,7 @@ class GenericTask(Task): "object", "string", ), "If the task is an NLP task, X can only contain text columns" - for each_cell in X[column]: + for _, each_cell in X[column].items(): if each_cell is not None: is_str = isinstance(each_cell, str) is_list_of_int = isinstance(each_cell, list) and all( @@ -149,8 +204,10 @@ class GenericTask(Task): "Currently FLAML only supports two modes for NLP: either all columns of X are string (non-tokenized), " "or all columns of X are integer ids (tokenized)" ) - - if issparse(X_train_all) or automl._skip_transform: + if isinstance(X, psDataFrame): + # TODO: support pyspark.pandas dataframe in DataTransformer + automl._skip_transform = True + if automl._skip_transform or issparse(X_train_all): automl._transformer = automl._label_transformer = False automl._X_train_all, automl._y_train_all = X, y else: @@ -184,17 +241,16 @@ class GenericTask(Task): "sample_weight" ) # NOTE: _validate_data is before kwargs is updated to fit_kwargs_by_estimator if X_val is not None and y_val is not None: - assert ( - isinstance(X_val, np.ndarray) - or issparse(X_val) - or isinstance(X_val, pd.DataFrame) - ), ( + assert isinstance( + X_val, (np.ndarray, pd.DataFrame, psDataFrame) + ) or issparse(X_train_all), ( "X_val must be None, a numpy array, a pandas dataframe, " - "or Scipy sparse matrix." + "a Scipy sparse matrix or a pyspark.pandas dataframe." + ) + assert isinstance(y_val, (np.ndarray, pd.Series, psSeries)), ( + "y_val must be None, a numpy array, a pandas series " + "or a pyspark.pandas series." ) - assert isinstance(y_val, np.ndarray) or isinstance( - y_val, pd.Series - ), "y_val must be None, a numpy array or a pandas series." assert X_val.size != 0 and y_val.size != 0, ( "Validation data are expected to be nonempty. " "Use None for X_val and y_val if no validation data." @@ -241,25 +297,39 @@ class GenericTask(Task): dataframe[dataframe.columns[0]].dtype.name == "datetime64[ns]" ), f"For '{TS_FORECAST}' task, the first column must contain timestamp values." if y_train_all is not None: - y_df = ( - pd.DataFrame(y_train_all) - if isinstance(y_train_all, pd.Series) - else pd.DataFrame(y_train_all, columns=["labels"]) - ) + if isinstance(y_train_all, pd.Series): + y_df = pd.DataFrame(y_train_all) + elif isinstance(y_train_all, np.ndarray): + y_df = pd.DataFrame(y_train_all, columns=["labels"]) + elif isinstance(y_train_all, (psDataFrame, psSeries)): + # TODO: optimize this + set_option("compute.ops_on_diff_frames", True) + y_df = y_train_all dataframe = dataframe.join(y_df) duplicates = dataframe.duplicated() - if any(duplicates): - logger.warning( - "Duplicate timestamp values found in timestamp column. " - f"\n{dataframe.loc[duplicates, dataframe][dataframe.columns[0]]}" - ) - dataframe = dataframe.drop_duplicates() - logger.warning("Removed duplicate rows based on all columns") - assert ( - dataframe[[dataframe.columns[0]]].duplicated() is None - ), "Duplicate timestamp values with different values for other columns." - ts_series = pd.to_datetime(dataframe[dataframe.columns[0]]) - inferred_freq = pd.infer_freq(ts_series) + if isinstance(dataframe, psDataFrame): + if duplicates.any(): + logger.warning("Duplicate timestamp values found in timestamp column.") + dataframe = dataframe.drop_duplicates() + logger.warning("Removed duplicate rows based on all columns") + assert ( + dataframe[[dataframe.columns[0]]].duplicated().any() is False + ), "Duplicate timestamp values with different values for other columns." + ts_series = ps.to_datetime(dataframe[dataframe.columns[0]]) + inferred_freq = None # TODO: `pd.infer_freq()` is not implemented yet. + else: + if any(duplicates): + logger.warning( + "Duplicate timestamp values found in timestamp column. " + f"\n{dataframe.loc[duplicates, dataframe][dataframe.columns[0]]}" + ) + dataframe = dataframe.drop_duplicates() + logger.warning("Removed duplicate rows based on all columns") + assert ( + dataframe[[dataframe.columns[0]]].duplicated() is None + ), "Duplicate timestamp values with different values for other columns." + ts_series = pd.to_datetime(dataframe[dataframe.columns[0]]) + inferred_freq = pd.infer_freq(ts_series) if inferred_freq is None: logger.warning( "Missing timestamps detected. To avoid error with estimators, set estimator list to ['prophet']. " @@ -268,6 +338,121 @@ class GenericTask(Task): return dataframe.iloc[:, :-1], dataframe.iloc[:, -1] return dataframe + @staticmethod + def _split_pyspark(state, X_train_all, y_train_all, split_ratio, stratify=None): + # TODO: optimize this + set_option("compute.ops_on_diff_frames", True) + if not isinstance(y_train_all, (psDataFrame, psSeries)): + raise ValueError("y_train_all must be a pyspark.pandas dataframe or series") + df_all_in_one = X_train_all.join(y_train_all) + stratify_column = ( + y_train_all.name + if isinstance(y_train_all, psSeries) + else y_train_all.columns[0] + ) + ret_sample_weight = False + if ( + "sample_weight" in state.fit_kwargs + ): # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + # fit_kwargs["sample_weight"] is an numpy array + ps_sample_weight = ps.DataFrame( + state.fit_kwargs["sample_weight"], + columns=["sample_weight"], + ) + df_all_in_one = df_all_in_one.join(ps_sample_weight) + ret_sample_weight = True + df_all_train, df_all_val = train_test_split_pyspark( + df_all_in_one, + None if stratify is None else stratify_column, + test_fraction=split_ratio, + seed=RANDOM_SEED, + ) + columns_to_drop = [ + c for c in df_all_train.columns if c in [stratify_column, "sample_weight"] + ] + X_train = df_all_train.drop(columns_to_drop) + X_val = df_all_val.drop(columns_to_drop) + y_train = df_all_train[stratify_column] + y_val = df_all_val[stratify_column] + + if ret_sample_weight: + return ( + X_train, + X_val, + y_train, + y_val, + df_all_train["sample_weight"], + df_all_val["sample_weight"], + ) + return X_train, X_val, y_train, y_val + + @staticmethod + def _train_test_split( + state, X, y, first=None, rest=None, split_ratio=0.2, stratify=None + ): + condition_type = isinstance(X, (psDataFrame, psSeries)) + # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + condition_param = "sample_weight" in state.fit_kwargs + if not condition_type and condition_param: + sample_weight = ( + state.fit_kwargs["sample_weight"] + if rest is None + else state.fit_kwargs["sample_weight"][rest] + ) + ( + X_train, + X_val, + y_train, + y_val, + weight_train, + weight_val, + ) = train_test_split( + X, + y, + sample_weight, + test_size=split_ratio, + stratify=stratify, + random_state=RANDOM_SEED, + ) + + if first is not None: + weight1 = state.fit_kwargs["sample_weight"][first] + state.weight_val = concat(weight1, weight_val) + state.fit_kwargs["sample_weight"] = concat(weight1, weight_train) + else: + state.weight_val = weight_val + state.fit_kwargs["sample_weight"] = weight_train + elif not condition_type and not condition_param: + X_train, X_val, y_train, y_val = train_test_split( + X, + y, + test_size=split_ratio, + stratify=stratify, + random_state=RANDOM_SEED, + ) + elif condition_type and condition_param: + ( + X_train, + X_val, + y_train, + y_val, + weight_train, + weight_val, + ) = GenericTask._split_pyspark(state, X, y, split_ratio, stratify) + + if first is not None: + weight1 = state.fit_kwargs["sample_weight"][first] + state.weight_val = concat(weight1, weight_val) + state.fit_kwargs["sample_weight"] = concat(weight1, weight_train) + else: + state.weight_val = weight_val + state.fit_kwargs["sample_weight"] = weight_train + else: + X_train, X_val, y_train, y_val = GenericTask._split_pyspark( + state, X, y, split_ratio, stratify + ) + return X_train, X_val, y_train, y_val + def prepare_data( self, state, @@ -286,6 +471,8 @@ class GenericTask(Task): X_val = X_val.tocsr() if issparse(X_train_all): X_train_all = X_train_all.tocsr() + is_spark_dataframe = isinstance(X_train_all, (psDataFrame, psSeries)) + self.is_spark_dataframe = is_spark_dataframe if ( self.is_classification() and auto_augment @@ -295,12 +482,17 @@ class GenericTask(Task): and not self.is_token_classification() ): # logger.info(f"label {pd.unique(y_train_all)}") - label_set, counts = np.unique(y_train_all, return_counts=True) + if is_spark_dataframe: + label_set, counts = unique_pandas_on_spark(y_train_all) + # TODO: optimize this + set_option("compute.ops_on_diff_frames", True) + else: + label_set, counts = np.unique(y_train_all, return_counts=True) # augment rare classes rare_threshld = 20 rare = counts < rare_threshld rare_label, rare_counts = label_set[rare], counts[rare] - for i, label in enumerate(rare_label): + for i, label in enumerate(rare_label.tolist()): count = rare_count = rare_counts[i] rare_index = y_train_all == label n = len(y_train_all) @@ -313,7 +505,7 @@ class GenericTask(Task): X_train_all = concat( X_train_all, X_train_all[:n][rare_index, :] ) - if isinstance(y_train_all, pd.Series): + if isinstance(y_train_all, (pd.Series, psSeries)): y_train_all = concat( y_train_all, y_train_all.iloc[:n].loc[rare_index] ) @@ -324,7 +516,10 @@ class GenericTask(Task): count += rare_count logger.info(f"class {label} augmented from {rare_count} to {count}") SHUFFLE_SPLIT_TYPES = ["uniform", "stratified"] - if split_type in SHUFFLE_SPLIT_TYPES: + if is_spark_dataframe: + # no need to shuffle pyspark dataframe + pass + elif split_type in SHUFFLE_SPLIT_TYPES: if sample_weight_full is not None: X_train_all, y_train_all, state.sample_weight_all = shuffle( X_train_all, @@ -363,18 +558,26 @@ class GenericTask(Task): ids = state.fit_kwargs["group_ids"].copy() ids.append(TS_TIMESTAMP_COL) ids.append("time_idx") - y_train_all = pd.DataFrame(y_train_all) + y_train_all = ( + pd.DataFrame(y_train_all) + if not is_spark_dataframe + else ps.DataFrame(y_train_all) + if isinstance(y_train_all, psSeries) + else y_train_all + ) y_train_all[ids] = X_train_all[ids] X_train_all = X_train_all.sort_values(ids) y_train_all = y_train_all.sort_values(ids) training_cutoff = X_train_all["time_idx"].max() - period - X_train = X_train_all[lambda x: x.time_idx <= training_cutoff] + X_train = X_train_all[ + X_train_all["time_idx"] <= training_cutoff + ] y_train = y_train_all[ - lambda x: x.time_idx <= training_cutoff + y_train_all["time_idx"] <= training_cutoff ].drop(columns=ids) - X_val = X_train_all[lambda x: x.time_idx > training_cutoff] + X_val = X_train_all[X_train_all["time_idx"] > training_cutoff] y_val = y_train_all[ - lambda x: x.time_idx > training_cutoff + y_train_all["time_idx"] > training_cutoff ].drop(columns=ids) else: num_samples = X_train_all.shape[0] @@ -387,9 +590,8 @@ class GenericTask(Task): X_val = X_train_all[split_idx:] y_val = y_train_all[split_idx:] else: - if ( - "sample_weight" in state.fit_kwargs - ): # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + is_sample_weight = "sample_weight" in state.fit_kwargs + if not is_spark_dataframe and is_sample_weight: ( X_train, X_val, @@ -408,13 +610,30 @@ class GenericTask(Task): test_size=split_ratio, shuffle=False, ) - else: + elif not is_spark_dataframe and not is_sample_weight: X_train, X_val, y_train, y_val = train_test_split( X_train_all, y_train_all, test_size=split_ratio, shuffle=False, ) + elif is_spark_dataframe and is_sample_weight: + ( + X_train, + X_val, + y_train, + y_val, + state.fit_kwargs[ + "sample_weight" + ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + state.weight_val, + ) = self._split_pyspark( + state, X_train_all, y_train_all, split_ratio + ) + else: + X_train, X_val, y_train, y_val = self._split_pyspark( + state, X_train_all, y_train_all, split_ratio + ) elif split_type == "group": gss = GroupShuffleSplit( n_splits=1, test_size=split_ratio, random_state=RANDOM_SEED @@ -433,7 +652,7 @@ class GenericTask(Task): elif self.is_classification(): # for classification, make sure the labels are complete in both # training and validation data - label_set, first = np.unique(y_train_all, return_index=True) + label_set, first = unique_value_first_index(y_train_all) rest = [] last = 0 first.sort() @@ -443,45 +662,17 @@ class GenericTask(Task): rest.extend(range(last, len(y_train_all))) X_first = X_train_all.iloc[first] if data_is_df else X_train_all[first] X_rest = X_train_all.iloc[rest] if data_is_df else X_train_all[rest] - y_rest = y_train_all[rest] + y_rest = ( + y_train_all[rest] + if isinstance(y_train_all, np.ndarray) + else iloc_pandas_on_spark(y_train_all, rest) + if is_spark_dataframe + else y_train_all.iloc[rest] + ) stratify = y_rest if split_type == "stratified" else None - if ( - "sample_weight" in state.fit_kwargs - ): # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - ( - X_train, - X_val, - y_train, - y_val, - weight_train, - weight_val, - ) = train_test_split( - X_rest, - y_rest, - state.fit_kwargs["sample_weight"][ - rest - ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - test_size=split_ratio, - stratify=stratify, - random_state=RANDOM_SEED, - ) - weight1 = state.fit_kwargs["sample_weight"][ - first - ] # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - state.weight_val = concat(weight1, weight_val) - state.fit_kwargs[ - "sample_weight" - ] = concat( # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - weight1, weight_train - ) - else: - X_train, X_val, y_train, y_val = train_test_split( - X_rest, - y_rest, - test_size=split_ratio, - stratify=stratify, - random_state=RANDOM_SEED, - ) + X_train, X_val, y_train, y_val = self._train_test_split( + state, X_rest, y_rest, first, rest, split_ratio, stratify + ) X_train = concat(X_first, X_train) y_train = ( concat(label_set, y_train) @@ -495,58 +686,34 @@ class GenericTask(Task): else np.concatenate([label_set, y_val]) ) elif self.is_regression(): - if ( - "sample_weight" in state.fit_kwargs - ): # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - ( - X_train, - X_val, - y_train, - y_val, - state.fit_kwargs[ - "sample_weight" - ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - state.weight_val, - ) = train_test_split( - X_train_all, - y_train_all, - state.fit_kwargs[ - "sample_weight" - ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - test_size=split_ratio, - random_state=RANDOM_SEED, - ) - else: - X_train, X_val, y_train, y_val = train_test_split( - X_train_all, - y_train_all, - test_size=split_ratio, - random_state=RANDOM_SEED, - ) + X_train, X_val, y_train, y_val = self._train_test_split( + state, X_train_all, y_train_all, split_ratio=split_ratio + ) state.data_size = X_train.shape state.X_train, state.y_train = X_train, y_train state.X_val, state.y_val = X_val, y_val state.X_train_all = X_train_all state.y_train_all = y_train_all + y_train_all_size = y_train_all.size if eval_method == "holdout": state.kf = None return if split_type == "group": # logger.info("Using GroupKFold") assert ( - len(state.groups_all) == y_train_all.size + len(state.groups_all) == y_train_all_size ), "the length of groups must match the number of examples" assert ( - len(np.unique(state.groups_all)) >= n_splits + len_labels(state.groups_all) >= n_splits ), "the number of groups must be equal or larger than n_splits" state.kf = GroupKFold(n_splits) elif split_type == "stratified": # logger.info("Using StratifiedKFold") - assert y_train_all.size >= n_splits, ( + assert y_train_all_size >= n_splits, ( f"{n_splits}-fold cross validation" f" requires input data with at least {n_splits} examples." ) - assert y_train_all.size >= 2 * n_splits, ( + assert y_train_all_size >= 2 * n_splits, ( f"{n_splits}-fold cross validation with metric=r2 " f"requires input data with at least {n_splits*2} examples." ) @@ -559,8 +726,8 @@ class GenericTask(Task): period = state.fit_kwargs[ "period" ] # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - if period * (n_splits + 1) > y_train_all.size: - n_splits = int(y_train_all.size / period - 1) + if period * (n_splits + 1) > y_train_all_size: + n_splits = int(y_train_all_size / period - 1) assert n_splits >= 2, ( f"cross validation for forecasting period={period}" f" requires input data with at least {3 * period} examples." @@ -568,7 +735,9 @@ class GenericTask(Task): logger.info(f"Using nsplits={n_splits} due to data size limit.") state.kf = TimeSeriesSplit(n_splits=n_splits, test_size=period) elif self.is_ts_forecastpanel(): - n_groups = X_train.groupby(state.fit_kwargs.get("group_ids")).ngroups + n_groups = len( + X_train.groupby(state.fit_kwargs.get("group_ids")).size() + ) period = state.fit_kwargs.get("period") state.kf = TimeSeriesSplit( n_splits=n_splits, test_size=period * n_groups @@ -595,7 +764,7 @@ class GenericTask(Task): groups=None, ) -> str: if self.name == "classification": - self.name = get_classification_objective(len(np.unique(y_train_all))) + self.name = get_classification_objective(len_labels(y_train_all)) if not isinstance(split_type, str): assert hasattr(split_type, "split") and hasattr( split_type, "get_n_splits" @@ -661,6 +830,8 @@ class GenericTask(Task): ) elif isinstance(X, int): return X + elif isinstance(X, psDataFrame): + return X elif issparse(X): X = X.tocsr() if self.is_ts_forecast(): @@ -695,60 +866,87 @@ class GenericTask(Task): train_time = pred_time = 0 total_fold_num = 0 n = kf.get_n_splits() - X_train_split, y_train_split = X_train_all, y_train_all + rng = np.random.RandomState(2020) + budget_per_train = budget and budget / n + groups = None if self.is_classification(): - labels = np.unique(y_train_all) + labels = _, labels = len_labels(y_train_all, return_labels=True) else: labels = fit_kwargs.get( "label_list" ) # pass the label list on to compute the evaluation metric - groups = None - shuffle = getattr(kf, "shuffle", not self.is_ts_forecast()) - if isinstance(kf, RepeatedStratifiedKFold): - kf = kf.split(X_train_split, y_train_split) - elif isinstance(kf, (GroupKFold, StratifiedGroupKFold)): - groups = kf.groups - kf = kf.split(X_train_split, y_train_split, groups) - shuffle = False - elif isinstance(kf, TimeSeriesSplit): - kf = kf.split(X_train_split, y_train_split) - else: - kf = kf.split(X_train_split) - rng = np.random.RandomState(2020) - budget_per_train = budget and budget / n if "sample_weight" in fit_kwargs: weight = fit_kwargs["sample_weight"] weight_val = None else: weight = weight_val = None + + is_spark_dataframe = isinstance(X_train_all, (psDataFrame, psSeries)) + if is_spark_dataframe: + dataframe = X_train_all.join(y_train_all) + if weight is not None: + dataframe = dataframe.join(weight) + if isinstance(kf, (GroupKFold, StratifiedGroupKFold)): + groups = kf.groups + dataframe = dataframe.join(groups) + kf = spark_kFold( + dataframe, nFolds=n, foldCol=groups.name if groups is not None else "" + ) + shuffle = False + else: + X_train_split, y_train_split = X_train_all, y_train_all + shuffle = getattr(kf, "shuffle", not self.is_ts_forecast()) + if isinstance(kf, RepeatedStratifiedKFold): + kf = kf.split(X_train_split, y_train_split) + elif isinstance(kf, (GroupKFold, StratifiedGroupKFold)): + groups = kf.groups + kf = kf.split(X_train_split, y_train_split, groups) + shuffle = False + elif isinstance(kf, TimeSeriesSplit): + kf = kf.split(X_train_split, y_train_split) + else: + kf = kf.split(X_train_split) + for train_index, val_index in kf: if shuffle: train_index = rng.permutation(train_index) - if isinstance(X_train_all, pd.DataFrame): + if is_spark_dataframe: + # cache data to increase compute speed + X_train = train_index.spark.cache() + X_val = val_index.spark.cache() + y_train = X_train.pop(y_train_all.name) + y_val = X_val.pop(y_train_all.name) + if weight is not None: + weight_val = X_val.pop(weight.name) + fit_kwargs["sample_weight"] = X_train.pop(weight.name) + groups_val = None + elif isinstance(X_train_all, pd.DataFrame): X_train = X_train_split.iloc[train_index] X_val = X_train_split.iloc[val_index] else: X_train, X_val = X_train_split[train_index], X_train_split[val_index] - y_train, y_val = y_train_split[train_index], y_train_split[val_index] + if not is_spark_dataframe: + y_train, y_val = y_train_split[train_index], y_train_split[val_index] + if weight is not None: + fit_kwargs["sample_weight"], weight_val = ( + weight[train_index], + weight[val_index], + ) + if groups is not None: + fit_kwargs["groups"] = ( + groups[train_index] + if isinstance(groups, np.ndarray) + else groups.iloc[train_index] + ) + groups_val = ( + groups[val_index] + if isinstance(groups, np.ndarray) + else groups.iloc[val_index] + ) + else: + groups_val = None + estimator.cleanup() - if weight is not None: - fit_kwargs["sample_weight"], weight_val = ( - weight[train_index], - weight[val_index], - ) - if groups is not None: - fit_kwargs["groups"] = ( - groups[train_index] - if isinstance(groups, np.ndarray) - else groups.iloc[train_index] - ) - groups_val = ( - groups[val_index] - if isinstance(groups, np.ndarray) - else groups.iloc[val_index] - ) - else: - groups_val = None val_loss_i, metric_i, train_time_i, pred_time_i = get_val_loss( config, estimator, @@ -775,6 +973,9 @@ class GenericTask(Task): log_metric_folds.append(metric_i) train_time += train_time_i pred_time += pred_time_i + if is_spark_dataframe: + X_train.spark.unpersist() # uncache data to free memory + X_val.spark.unpersist() # uncache data to free memory if budget and time.time() - start_time >= budget: break val_loss, metric = cv_score_agg_func(val_loss_folds, log_metric_folds) @@ -782,11 +983,44 @@ class GenericTask(Task): pred_time /= n return val_loss, metric, train_time, pred_time - def default_estimator_list(self, estimator_list: List[str]) -> List[str]: + def default_estimator_list( + self, estimator_list: List[str], is_spark_dataframe: bool = False + ) -> List[str]: if "auto" != estimator_list: + n_estimators = len(estimator_list) + if is_spark_dataframe: + # For spark dataframe, only estimators ending with '_spark' are supported + estimator_list = [ + est for est in estimator_list if est.endswith("_spark") + ] + if len(estimator_list) == 0: + raise ValueError( + "Spark dataframes only support estimator names ending with `_spark`. Non-supported " + "estimators are removed. No estimator is left." + ) + elif n_estimators != len(estimator_list): + logger.warning( + "Spark dataframes only support estimator names ending with `_spark`. Non-supported " + "estimators are removed." + ) + else: + # For non-spark dataframe, only estimators not ending with '_spark' are supported + estimator_list = [ + est for est in estimator_list if not est.endswith("_spark") + ] + if len(estimator_list) == 0: + raise ValueError( + "Non-spark dataframes only support estimator names not ending with `_spark`. Non-supported " + "estimators are removed. No estimator is left." + ) + elif n_estimators != len(estimator_list): + logger.warning( + "Non-spark dataframes only support estimator names not ending with `_spark`. Non-supported " + "estimators are removed." + ) return estimator_list if self.is_rank(): - estimator_list = ["lgbm", "xgboost", "xgb_limitdepth"] + estimator_list = ["lgbm", "xgboost", "xgb_limitdepth", "lgbm_spark"] elif self.is_nlp(): estimator_list = ["transformer"] elif self.is_ts_forecastpanel(): @@ -802,6 +1036,7 @@ class GenericTask(Task): "xgboost", "extra_tree", "xgb_limitdepth", + "lgbm_spark", ] except ImportError: estimator_list = [ @@ -810,6 +1045,7 @@ class GenericTask(Task): "xgboost", "extra_tree", "xgb_limitdepth", + "lgbm_spark", ] if self.is_ts_forecast(): # catboost is removed because it has a `name` parameter, making it incompatible with hcrystalball @@ -825,6 +1061,15 @@ class GenericTask(Task): elif not self.is_regression(): estimator_list += ["lrl1"] + estimator_list = [ + est + for est in estimator_list + if ( + est.endswith("_spark") + if is_spark_dataframe + else not est.endswith("_spark") + ) + ] return estimator_list def default_metric(self, metric: str) -> str: diff --git a/flaml/automl/task/task.py b/flaml/automl/task/task.py index 09c38a52a5..0f7275bb8c 100644 --- a/flaml/automl/task/task.py +++ b/flaml/automl/task/task.py @@ -255,7 +255,9 @@ class Task(ABC): @abstractmethod def default_estimator_list( - self, estimator_list: Union[List[str], str] = "auto" + self, + estimator_list: Union[List[str], str] = "auto", + is_spark_dataframe: bool = False, ) -> List[str]: """Return the list of default estimators registered for this task type. @@ -264,6 +266,7 @@ class Task(ABC): Args: estimator_list: Either 'auto' or a list of estimator names to be validated. + is_spark_dataframe: True if the data is a spark dataframe. Returns: A list of valid estimator names for this task type. diff --git a/flaml/automl/utils.py b/flaml/automl/utils.py new file mode 100644 index 0000000000..e6322f272a --- /dev/null +++ b/flaml/automl/utils.py @@ -0,0 +1,18 @@ +from typing import Optional, Union, Tuple +import numpy as np + + +def len_labels(y: np.ndarray, return_labels=False) -> Union[int, Optional[np.ndarray]]: + """Get the number of unique labels in y. The non-spark version of + flaml.automl.spark.utils.len_labels""" + labels = np.unique(y) + if return_labels: + return len(labels), labels + return len(labels) + + +def unique_value_first_index(y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """Get the unique values and indices of a pandas series or numpy array. + The non-spark version of flaml.automl.spark.utils.unique_value_first_index""" + label_set, first_index = np.unique(y, return_index=True) + return label_set, first_index diff --git a/flaml/default/suggest.py b/flaml/default/suggest.py index dc6503f4f2..429ff67a27 100644 --- a/flaml/default/suggest.py +++ b/flaml/default/suggest.py @@ -1,3 +1,4 @@ +import os import numpy as np from sklearn.neighbors import NearestNeighbors import logging @@ -8,6 +9,24 @@ from flaml.automl.task.task import CLASSIFICATION, get_classification_objective from flaml.automl.ml import get_estimator_class from flaml.version import __version__ +try: + from flaml.automl.spark.utils import len_labels +except ImportError: + from flaml.automl.utils import len_labels +try: + os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" + import pyspark.pandas as ps + from pyspark.pandas import DataFrame as psDataFrame, Series as psSeries +except ImportError: + ps = None + + class psDataFrame: + pass + + class psSeries: + pass + + LOCATION = pathlib.Path(__file__).parent.resolve() logger = logging.getLogger(__name__) CONFIG_PREDICTORS = {} @@ -29,12 +48,15 @@ def meta_feature(task, X_train, y_train, meta_feature_names): elif each_feature_name == "NumberOfFeatures": this_feature.append(n_feat) elif each_feature_name == "NumberOfClasses": - this_feature.append(len(np.unique(y_train)) if is_classification else 0) + this_feature.append(len_labels(y_train) if is_classification else 0) elif each_feature_name == "PercentageOfNumericFeatures": try: - # this is feature is only supported for dataframe + # this feature is only supported for dataframe this_feature.append( - X_train.select_dtypes(include=np.number).shape[1] / n_feat + X_train.select_dtypes( + include=[np.number, "float", "int", "long"] + ).shape[1] + / n_feat ) except AttributeError: # 'numpy.ndarray' object has no attribute 'select_dtypes' @@ -78,7 +100,7 @@ def suggest_config( `FLAML_sample_size` is removed from the configs. """ task = ( - get_classification_objective(len(np.unique(y))) + get_classification_objective(len_labels(y)) if task == "classification" and y is not None else task ) diff --git a/flaml/tune/spark/utils.py b/flaml/tune/spark/utils.py index 6b8b46166e..19ce77b4a8 100644 --- a/flaml/tune/spark/utils.py +++ b/flaml/tune/spark/utils.py @@ -10,8 +10,9 @@ logger = logging.getLogger(__name__) logger_formatter = logging.Formatter( "[%(name)s: %(asctime)s] {%(lineno)d} %(levelname)s - %(message)s", "%m-%d %H:%M:%S" ) - +logger.propagate = False try: + os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" import pyspark from pyspark.sql import SparkSession from pyspark.util import VersionUtils diff --git a/flaml/tune/tune.py b/flaml/tune/tune.py index e00ae1143c..f447ac112c 100644 --- a/flaml/tune/tune.py +++ b/flaml/tune/tune.py @@ -248,6 +248,7 @@ def run( log_file_name: Optional[str] = None, lexico_objectives: Optional[dict] = None, force_cancel: Optional[bool] = False, + n_concurrent_trials: Optional[int] = 0, **ray_args, ): """The trigger for HPO. @@ -437,6 +438,14 @@ def run( "targets": {"error_rate": 0.0}, } ``` + force_cancel: boolean, default=False | Whether to forcely cancel the PySpark job if overtime. + n_concurrent_trials: int, default=0 | The number of concurrent trials when perform hyperparameter + tuning with Spark. Only valid when use_spark=True and spark is required: + `pip install flaml[spark]`. Please check + [here](https://spark.apache.org/docs/latest/api/python/getting_started/install.html) + for more details about installing Spark. When tune.run() is called from AutoML, it will be + overwritten by the value of `n_concurrent_trials` in AutoML. When <= 0, the concurrent trials + will be set to the number of executors. **ray_args: keyword arguments to pass to ray.tune.run(). Only valid when use_ray=True. """ @@ -674,18 +683,30 @@ def run( is not an instance of `ConcurrencyLimiter`. The final number of concurrent trials is the minimum of `max_concurrent` and - `num_executors`. + `num_executors` if `n_concurrent_trials<=0` (default, automl cases), otherwise the + minimum of `max_concurrent` and `n_concurrent_trials` (tuning cases). """ - num_executors = max(num_executors, int(os.getenv("FLAML_MAX_CONCURRENT", 1)), 1) time_start = time.time() + try: + FLAML_MAX_CONCURRENT = int(os.getenv("FLAML_MAX_CONCURRENT", 0)) + num_executors = max(num_executors, FLAML_MAX_CONCURRENT, 1) + except ValueError: + FLAML_MAX_CONCURRENT = 0 + max_spark_parallelism = ( + min(spark.sparkContext.defaultParallelism, FLAML_MAX_CONCURRENT) + if FLAML_MAX_CONCURRENT > 0 + else spark.sparkContext.defaultParallelism + ) if scheduler: scheduler.set_search_properties(metric=metric, mode=mode) if isinstance(search_alg, ConcurrencyLimiter): max_concurrent = max(1, search_alg.max_concurrent) else: - max_concurrent = max(1, int(os.getenv("FLAML_MAX_CONCURRENT", 1))) - - n_concurrent_trials = min(num_executors, max_concurrent) + max_concurrent = max(1, max_spark_parallelism) + n_concurrent_trials = min( + n_concurrent_trials if n_concurrent_trials > 0 else num_executors, + max_concurrent, + ) with parallel_backend("spark"): with Parallel( n_jobs=n_concurrent_trials, verbose=max(0, (verbose - 1) * 50) diff --git a/notebook/automl_synapseML.ipynb b/notebook/automl_synapseML.ipynb new file mode 100644 index 0000000000..6dff0f081c --- /dev/null +++ b/notebook/automl_synapseML.ipynb @@ -0,0 +1,831 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# AutoML with FLAML Library for synapseML models and spark dataframes\n", + "\n", + "\n", + "## 1. Introduction\n", + "\n", + "FLAML is a Python library (https://github.com/microsoft/FLAML) designed to automatically produce accurate machine learning models \n", + "with low computational cost. It is fast and economical. The simple and lightweight design makes it easy \n", + "to use and extend, such as adding new learners. FLAML can \n", + "- serve as an economical AutoML engine,\n", + "- be used as a fast hyperparameter tuning tool, or \n", + "- be embedded in self-tuning software that requires low latency & resource in repetitive\n", + " tuning tasks.\n", + "\n", + "In this notebook, we demonstrate how to use FLAML library to do AutoML for synapseML models and spark dataframes. We also compare the results between FLAML AutoML and default SynapseML. \n", + "In this example, we use LightGBM to build a classification model in order to predict bankruptcy.\n", + "\n", + "Since the dataset is unbalanced, `AUC` is a better metric than `Accuracy`. FLAML (1 min of training) achieved AUC **0.79**, the default SynapseML model only got AUC **0.64**. \n", + "\n", + "FLAML requires `Python>=3.7`. To run this notebook example, please install flaml with the `synapse` option:\n", + "```bash\n", + "pip install flaml[synapse]>=1.1.3; \n", + "```\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "# %pip install \"flaml[synapse]>=1.1.3\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Load data and preprocess" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + ":: loading settings :: url = jar:file:/datadrive/spark/spark33/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Ivy Default Cache set to: /home/lijiang1/.ivy2/cache\n", + "The jars for the packages stored in: /home/lijiang1/.ivy2/jars\n", + "com.microsoft.azure#synapseml_2.12 added as a dependency\n", + "org.apache.hadoop#hadoop-azure added as a dependency\n", + "com.microsoft.azure#azure-storage added as a dependency\n", + ":: resolving dependencies :: org.apache.spark#spark-submit-parent-bfb2447b-61c5-4941-bf9b-0548472077eb;1.0\n", + "\tconfs: [default]\n", + "\tfound com.microsoft.azure#synapseml_2.12;0.10.2 in central\n", + "\tfound com.microsoft.azure#synapseml-core_2.12;0.10.2 in central\n", + "\tfound org.scalactic#scalactic_2.12;3.2.14 in local-m2-cache\n", + "\tfound org.scala-lang#scala-reflect;2.12.15 in central\n", + "\tfound io.spray#spray-json_2.12;1.3.5 in central\n", + "\tfound com.jcraft#jsch;0.1.54 in central\n", + "\tfound org.apache.httpcomponents.client5#httpclient5;5.1.3 in central\n", + "\tfound org.apache.httpcomponents.core5#httpcore5;5.1.3 in central\n", + "\tfound org.apache.httpcomponents.core5#httpcore5-h2;5.1.3 in central\n", + "\tfound org.slf4j#slf4j-api;1.7.25 in local-m2-cache\n", + "\tfound commons-codec#commons-codec;1.15 in local-m2-cache\n", + "\tfound org.apache.httpcomponents#httpmime;4.5.13 in local-m2-cache\n", + "\tfound org.apache.httpcomponents#httpclient;4.5.13 in local-m2-cache\n", + "\tfound org.apache.httpcomponents#httpcore;4.4.13 in central\n", + "\tfound commons-logging#commons-logging;1.2 in central\n", + "\tfound com.linkedin.isolation-forest#isolation-forest_3.2.0_2.12;2.0.8 in central\n", + "\tfound com.chuusai#shapeless_2.12;2.3.2 in central\n", + "\tfound org.typelevel#macro-compat_2.12;1.1.1 in central\n", + "\tfound org.apache.spark#spark-avro_2.12;3.2.0 in central\n", + "\tfound org.tukaani#xz;1.8 in central\n", + "\tfound org.spark-project.spark#unused;1.0.0 in central\n", + "\tfound org.testng#testng;6.8.8 in central\n", + "\tfound org.beanshell#bsh;2.0b4 in central\n", + "\tfound com.beust#jcommander;1.27 in central\n", + "\tfound com.microsoft.azure#synapseml-deep-learning_2.12;0.10.2 in central\n", + "\tfound com.microsoft.azure#synapseml-opencv_2.12;0.10.2 in central\n", + "\tfound org.openpnp#opencv;3.2.0-1 in central\n", + "\tfound com.microsoft.azure#onnx-protobuf_2.12;0.9.1 in central\n", + "\tfound com.microsoft.cntk#cntk;2.4 in central\n", + "\tfound com.microsoft.onnxruntime#onnxruntime_gpu;1.8.1 in central\n", + "\tfound com.microsoft.azure#synapseml-cognitive_2.12;0.10.2 in central\n", + "\tfound com.microsoft.cognitiveservices.speech#client-jar-sdk;1.14.0 in central\n", + "\tfound com.microsoft.azure#synapseml-vw_2.12;0.10.2 in central\n", + "\tfound com.github.vowpalwabbit#vw-jni;8.9.1 in central\n", + "\tfound com.microsoft.azure#synapseml-lightgbm_2.12;0.10.2 in central\n", + "\tfound com.microsoft.ml.lightgbm#lightgbmlib;3.2.110 in central\n", + "\tfound org.apache.hadoop#hadoop-azure;3.3.1 in central\n", + "\tfound org.apache.hadoop.thirdparty#hadoop-shaded-guava;1.1.1 in local-m2-cache\n", + "\tfound org.eclipse.jetty#jetty-util-ajax;9.4.40.v20210413 in central\n", + "\tfound org.eclipse.jetty#jetty-util;9.4.40.v20210413 in central\n", + "\tfound org.codehaus.jackson#jackson-mapper-asl;1.9.13 in local-m2-cache\n", + "\tfound org.codehaus.jackson#jackson-core-asl;1.9.13 in local-m2-cache\n", + "\tfound org.wildfly.openssl#wildfly-openssl;1.0.7.Final in local-m2-cache\n", + "\tfound com.microsoft.azure#azure-storage;8.6.6 in central\n", + "\tfound com.fasterxml.jackson.core#jackson-core;2.9.4 in central\n", + "\tfound org.apache.commons#commons-lang3;3.4 in local-m2-cache\n", + "\tfound com.microsoft.azure#azure-keyvault-core;1.2.4 in central\n", + "\tfound com.google.guava#guava;24.1.1-jre in central\n", + "\tfound com.google.code.findbugs#jsr305;1.3.9 in central\n", + "\tfound org.checkerframework#checker-compat-qual;2.0.0 in central\n", + "\tfound com.google.errorprone#error_prone_annotations;2.1.3 in central\n", + "\tfound com.google.j2objc#j2objc-annotations;1.1 in central\n", + "\tfound org.codehaus.mojo#animal-sniffer-annotations;1.14 in central\n", + ":: resolution report :: resolve 992ms :: artifacts dl 77ms\n", + "\t:: modules in use:\n", + "\tcom.beust#jcommander;1.27 from central in [default]\n", + "\tcom.chuusai#shapeless_2.12;2.3.2 from central in [default]\n", + "\tcom.fasterxml.jackson.core#jackson-core;2.9.4 from central in [default]\n", + "\tcom.github.vowpalwabbit#vw-jni;8.9.1 from central in [default]\n", + "\tcom.google.code.findbugs#jsr305;1.3.9 from central in [default]\n", + "\tcom.google.errorprone#error_prone_annotations;2.1.3 from central in [default]\n", + "\tcom.google.guava#guava;24.1.1-jre from central in [default]\n", + "\tcom.google.j2objc#j2objc-annotations;1.1 from central in [default]\n", + "\tcom.jcraft#jsch;0.1.54 from central in [default]\n", + "\tcom.linkedin.isolation-forest#isolation-forest_3.2.0_2.12;2.0.8 from central in [default]\n", + "\tcom.microsoft.azure#azure-keyvault-core;1.2.4 from central in [default]\n", + "\tcom.microsoft.azure#azure-storage;8.6.6 from central in [default]\n", + "\tcom.microsoft.azure#onnx-protobuf_2.12;0.9.1 from central in [default]\n", + "\tcom.microsoft.azure#synapseml-cognitive_2.12;0.10.2 from central in [default]\n", + "\tcom.microsoft.azure#synapseml-core_2.12;0.10.2 from central in [default]\n", + "\tcom.microsoft.azure#synapseml-deep-learning_2.12;0.10.2 from central in [default]\n", + "\tcom.microsoft.azure#synapseml-lightgbm_2.12;0.10.2 from central in [default]\n", + "\tcom.microsoft.azure#synapseml-opencv_2.12;0.10.2 from central in [default]\n", + "\tcom.microsoft.azure#synapseml-vw_2.12;0.10.2 from central in [default]\n", + "\tcom.microsoft.azure#synapseml_2.12;0.10.2 from central in [default]\n", + "\tcom.microsoft.cntk#cntk;2.4 from central in [default]\n", + "\tcom.microsoft.cognitiveservices.speech#client-jar-sdk;1.14.0 from central in [default]\n", + "\tcom.microsoft.ml.lightgbm#lightgbmlib;3.2.110 from central in [default]\n", + "\tcom.microsoft.onnxruntime#onnxruntime_gpu;1.8.1 from central in [default]\n", + "\tcommons-codec#commons-codec;1.15 from local-m2-cache in [default]\n", + "\tcommons-logging#commons-logging;1.2 from central in [default]\n", + "\tio.spray#spray-json_2.12;1.3.5 from central in [default]\n", + "\torg.apache.commons#commons-lang3;3.4 from local-m2-cache in [default]\n", + "\torg.apache.hadoop#hadoop-azure;3.3.1 from central in [default]\n", + "\torg.apache.hadoop.thirdparty#hadoop-shaded-guava;1.1.1 from local-m2-cache in [default]\n", + "\torg.apache.httpcomponents#httpclient;4.5.13 from local-m2-cache in [default]\n", + "\torg.apache.httpcomponents#httpcore;4.4.13 from central in [default]\n", + "\torg.apache.httpcomponents#httpmime;4.5.13 from local-m2-cache in [default]\n", + "\torg.apache.httpcomponents.client5#httpclient5;5.1.3 from central in [default]\n", + "\torg.apache.httpcomponents.core5#httpcore5;5.1.3 from central in [default]\n", + "\torg.apache.httpcomponents.core5#httpcore5-h2;5.1.3 from central in [default]\n", + "\torg.apache.spark#spark-avro_2.12;3.2.0 from central in [default]\n", + "\torg.beanshell#bsh;2.0b4 from central in [default]\n", + "\torg.checkerframework#checker-compat-qual;2.0.0 from central in [default]\n", + "\torg.codehaus.jackson#jackson-core-asl;1.9.13 from local-m2-cache in [default]\n", + "\torg.codehaus.jackson#jackson-mapper-asl;1.9.13 from local-m2-cache in [default]\n", + "\torg.codehaus.mojo#animal-sniffer-annotations;1.14 from central in [default]\n", + "\torg.eclipse.jetty#jetty-util;9.4.40.v20210413 from central in [default]\n", + "\torg.eclipse.jetty#jetty-util-ajax;9.4.40.v20210413 from central in [default]\n", + "\torg.openpnp#opencv;3.2.0-1 from central in [default]\n", + "\torg.scala-lang#scala-reflect;2.12.15 from central in [default]\n", + "\torg.scalactic#scalactic_2.12;3.2.14 from local-m2-cache in [default]\n", + "\torg.slf4j#slf4j-api;1.7.25 from local-m2-cache in [default]\n", + "\torg.spark-project.spark#unused;1.0.0 from central in [default]\n", + "\torg.testng#testng;6.8.8 from central in [default]\n", + "\torg.tukaani#xz;1.8 from central in [default]\n", + "\torg.typelevel#macro-compat_2.12;1.1.1 from central in [default]\n", + "\torg.wildfly.openssl#wildfly-openssl;1.0.7.Final from local-m2-cache in [default]\n", + "\t:: evicted modules:\n", + "\tcommons-codec#commons-codec;1.11 by [commons-codec#commons-codec;1.15] in [default]\n", + "\tcom.microsoft.azure#azure-storage;7.0.1 by [com.microsoft.azure#azure-storage;8.6.6] in [default]\n", + "\torg.slf4j#slf4j-api;1.7.12 by [org.slf4j#slf4j-api;1.7.25] in [default]\n", + "\torg.apache.commons#commons-lang3;3.8.1 by [org.apache.commons#commons-lang3;3.4] in [default]\n", + "\t---------------------------------------------------------------------\n", + "\t| | modules || artifacts |\n", + "\t| conf | number| search|dwnlded|evicted|| number|dwnlded|\n", + "\t---------------------------------------------------------------------\n", + "\t| default | 57 | 0 | 0 | 4 || 53 | 0 |\n", + "\t---------------------------------------------------------------------\n", + ":: retrieving :: org.apache.spark#spark-submit-parent-bfb2447b-61c5-4941-bf9b-0548472077eb\n", + "\tconfs: [default]\n", + "\t0 artifacts copied, 53 already retrieved (0kB/20ms)\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "23/02/28 02:12:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Setting default log level to \"WARN\".\n", + "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n" + ] + } + ], + "source": [ + "import pyspark\n", + "\n", + "spark = (\n", + " pyspark.sql.SparkSession.builder.appName(\"MyApp\")\n", + " .config(\n", + " \"spark.jars.packages\",\n", + " f\"com.microsoft.azure:synapseml_2.12:0.10.2,org.apache.hadoop:hadoop-azure:{pyspark.__version__},com.microsoft.azure:azure-storage:8.6.6\",\n", + " )\n", + " .config(\"spark.sql.debug.maxToStringFields\", \"100\")\n", + " .getOrCreate()\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "23/02/28 02:12:32 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-azure-file-system.properties,hadoop-metrics2.properties\n", + "records read: 6819\n", + "Schema: \n", + "root\n", + " |-- Bankrupt?: integer (nullable = true)\n", + " |-- ROA(C) before interest and depreciation before interest: double (nullable = true)\n", + " |-- ROA(A) before interest and % after tax: double (nullable = true)\n", + " |-- ROA(B) before interest and depreciation after tax: double (nullable = true)\n", + " |-- Operating Gross Margin: double (nullable = true)\n", + " |-- Realized Sales Gross Margin: double (nullable = true)\n", + " |-- Operating Profit Rate: double (nullable = true)\n", + " |-- Pre-tax net Interest Rate: double (nullable = true)\n", + " |-- After-tax net Interest Rate: double (nullable = true)\n", + " |-- Non-industry income and expenditure/revenue: double (nullable = true)\n", + " |-- Continuous interest rate (after tax): double (nullable = true)\n", + " |-- Operating Expense Rate: double (nullable = true)\n", + " |-- Research and development expense rate: double (nullable = true)\n", + " |-- Cash flow rate: double (nullable = true)\n", + " |-- Interest-bearing debt interest rate: double (nullable = true)\n", + " |-- Tax rate (A): double (nullable = true)\n", + " |-- Net Value Per Share (B): double (nullable = true)\n", + " |-- Net Value Per Share (A): double (nullable = true)\n", + " |-- Net Value Per Share (C): double (nullable = true)\n", + " |-- Persistent EPS in the Last Four Seasons: double (nullable = true)\n", + " |-- Cash Flow Per Share: double (nullable = true)\n", + " |-- Revenue Per Share (Yuan ??): double (nullable = true)\n", + " |-- Operating Profit Per Share (Yuan ??): double (nullable = true)\n", + " |-- Per Share Net profit before tax (Yuan ??): double (nullable = true)\n", + " |-- Realized Sales Gross Profit Growth Rate: double (nullable = true)\n", + " |-- Operating Profit Growth Rate: double (nullable = true)\n", + " |-- After-tax Net Profit Growth Rate: double (nullable = true)\n", + " |-- Regular Net Profit Growth Rate: double (nullable = true)\n", + " |-- Continuous Net Profit Growth Rate: double (nullable = true)\n", + " |-- Total Asset Growth Rate: double (nullable = true)\n", + " |-- Net Value Growth Rate: double (nullable = true)\n", + " |-- Total Asset Return Growth Rate Ratio: double (nullable = true)\n", + " |-- Cash Reinvestment %: double (nullable = true)\n", + " |-- Current Ratio: double (nullable = true)\n", + " |-- Quick Ratio: double (nullable = true)\n", + " |-- Interest Expense Ratio: double (nullable = true)\n", + " |-- Total debt/Total net worth: double (nullable = true)\n", + " |-- Debt ratio %: double (nullable = true)\n", + " |-- Net worth/Assets: double (nullable = true)\n", + " |-- Long-term fund suitability ratio (A): double (nullable = true)\n", + " |-- Borrowing dependency: double (nullable = true)\n", + " |-- Contingent liabilities/Net worth: double (nullable = true)\n", + " |-- Operating profit/Paid-in capital: double (nullable = true)\n", + " |-- Net profit before tax/Paid-in capital: double (nullable = true)\n", + " |-- Inventory and accounts receivable/Net value: double (nullable = true)\n", + " |-- Total Asset Turnover: double (nullable = true)\n", + " |-- Accounts Receivable Turnover: double (nullable = true)\n", + " |-- Average Collection Days: double (nullable = true)\n", + " |-- Inventory Turnover Rate (times): double (nullable = true)\n", + " |-- Fixed Assets Turnover Frequency: double (nullable = true)\n", + " |-- Net Worth Turnover Rate (times): double (nullable = true)\n", + " |-- Revenue per person: double (nullable = true)\n", + " |-- Operating profit per person: double (nullable = true)\n", + " |-- Allocation rate per person: double (nullable = true)\n", + " |-- Working Capital to Total Assets: double (nullable = true)\n", + " |-- Quick Assets/Total Assets: double (nullable = true)\n", + " |-- Current Assets/Total Assets: double (nullable = true)\n", + " |-- Cash/Total Assets: double (nullable = true)\n", + " |-- Quick Assets/Current Liability: double (nullable = true)\n", + " |-- Cash/Current Liability: double (nullable = true)\n", + " |-- Current Liability to Assets: double (nullable = true)\n", + " |-- Operating Funds to Liability: double (nullable = true)\n", + " |-- Inventory/Working Capital: double (nullable = true)\n", + " |-- Inventory/Current Liability: double (nullable = true)\n", + " |-- Current Liabilities/Liability: double (nullable = true)\n", + " |-- Working Capital/Equity: double (nullable = true)\n", + " |-- Current Liabilities/Equity: double (nullable = true)\n", + " |-- Long-term Liability to Current Assets: double (nullable = true)\n", + " |-- Retained Earnings to Total Assets: double (nullable = true)\n", + " |-- Total income/Total expense: double (nullable = true)\n", + " |-- Total expense/Assets: double (nullable = true)\n", + " |-- Current Asset Turnover Rate: double (nullable = true)\n", + " |-- Quick Asset Turnover Rate: double (nullable = true)\n", + " |-- Working capitcal Turnover Rate: double (nullable = true)\n", + " |-- Cash Turnover Rate: double (nullable = true)\n", + " |-- Cash Flow to Sales: double (nullable = true)\n", + " |-- Fixed Assets to Assets: double (nullable = true)\n", + " |-- Current Liability to Liability: double (nullable = true)\n", + " |-- Current Liability to Equity: double (nullable = true)\n", + " |-- Equity to Long-term Liability: double (nullable = true)\n", + " |-- Cash Flow to Total Assets: double (nullable = true)\n", + " |-- Cash Flow to Liability: double (nullable = true)\n", + " |-- CFO to Assets: double (nullable = true)\n", + " |-- Cash Flow to Equity: double (nullable = true)\n", + " |-- Current Liability to Current Assets: double (nullable = true)\n", + " |-- Liability-Assets Flag: double (nullable = true)\n", + " |-- Net Income to Total Assets: double (nullable = true)\n", + " |-- Total assets to GNP price: double (nullable = true)\n", + " |-- No-credit Interval: double (nullable = true)\n", + " |-- Gross Profit to Sales: double (nullable = true)\n", + " |-- Net Income to Stockholder's Equity: double (nullable = true)\n", + " |-- Liability to Equity: double (nullable = true)\n", + " |-- Degree of Financial Leverage (DFL): double (nullable = true)\n", + " |-- Interest Coverage Ratio (Interest expense to EBIT): double (nullable = true)\n", + " |-- Net Income Flag: double (nullable = true)\n", + " |-- Equity to Liability: double (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "df = (\n", + " spark.read.format(\"csv\")\n", + " .option(\"header\", True)\n", + " .option(\"inferSchema\", True)\n", + " .load(\n", + " \"wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv\"\n", + " )\n", + ")\n", + "# print dataset size\n", + "print(\"records read: \" + str(df.count()))\n", + "print(\"Schema: \")\n", + "df.printSchema()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Split the dataset into train and test" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "train, test = df.randomSplit([0.8, 0.2], seed=41)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Add featurizer to convert features to vector" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.ml.feature import VectorAssembler\n", + "\n", + "feature_cols = df.columns[1:]\n", + "featurizer = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n", + "train_data = featurizer.transform(train)[\"Bankrupt?\", \"features\"]\n", + "test_data = featurizer.transform(test)[\"Bankrupt?\", \"features\"]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Default SynapseML LightGBM" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "23/02/28 02:12:42 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n", + "[LightGBM] [Warning] Find whitespaces in feature_names, replace with underlines\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + } + ], + "source": [ + "from synapse.ml.lightgbm import LightGBMClassifier\n", + "\n", + "model = LightGBMClassifier(\n", + " objective=\"binary\", featuresCol=\"features\", labelCol=\"Bankrupt?\", isUnbalance=True\n", + ")\n", + "\n", + "model = model.fit(train_data)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Model Prediction" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[evaluation_type: string, confusion_matrix: matrix, accuracy: double, precision: double, recall: double, AUC: double]" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "[Stage 27:> (0 + 1) / 1]\r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------------+--------------------+-----------------+------------------+-------------------+------------------+\n", + "|evaluation_type| confusion_matrix| accuracy| precision| recall| AUC|\n", + "+---------------+--------------------+-----------------+------------------+-------------------+------------------+\n", + "| Classification|1250.0 23.0 \\n3...|0.958997722095672|0.3611111111111111|0.29545454545454547|0.6386934942512319|\n", + "+---------------+--------------------+-----------------+------------------+-------------------+------------------+\n", + "\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + } + ], + "source": [ + "def predict(model):\n", + " from synapse.ml.train import ComputeModelStatistics\n", + "\n", + " predictions = model.transform(test_data)\n", + " # predictions.limit(10).show()\n", + " \n", + " metrics = ComputeModelStatistics(\n", + " evaluationMetric=\"classification\",\n", + " labelCol=\"Bankrupt?\",\n", + " scoredLabelsCol=\"prediction\",\n", + " ).transform(predictions)\n", + " display(metrics)\n", + " return metrics\n", + "\n", + "default_metrics = predict(model)\n", + "default_metrics.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Run FLAML\n", + "In the FLAML automl run configuration, users can specify the task type, time budget, error metric, learner list, whether to subsample, resampling strategy type, and so on. All these arguments have default values which will be used if users do not provide them. " + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "''' import AutoML class from flaml package '''\n", + "from flaml import AutoML\n", + "from flaml.automl.spark.utils import to_pandas_on_spark\n", + "\n", + "automl = AutoML()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "settings = {\n", + " \"time_budget\": 30, # total running time in seconds\n", + " \"metric\": 'roc_auc',\n", + " \"estimator_list\": ['lgbm_spark'], # list of ML learners; we tune lightgbm in this example\n", + " \"task\": 'classification', # task type\n", + " \"log_file_name\": 'flaml_experiment.log', # flaml log file\n", + " \"seed\": 41, # random seed\n", + " \"force_cancel\": True, # force stop training once time_budget is used up\n", + "}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Disable Arrow optimization to omit below warning:\n", + "```\n", + "/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:87: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below:\n", + " Unsupported type in conversion to Arrow: VectorUDT\n", + "Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.\n", + " warnings.warn(msg)\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "spark.conf.set(\"spark.sql.execution.arrow.pyspark.enabled\", \"false\")" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + " | index | \n", + "Bankrupt? | \n", + "features | \n", + "
---|---|---|---|
0 | \n", + "0 | \n", + "0 | \n", + "[0.0828, 0.0693, 0.0884, 0.6468, 0.6468, 0.997... | \n", + "
1 | \n", + "1 | \n", + "0 | \n", + "[0.1606, 0.1788, 0.1832, 0.5897, 0.5897, 0.998... | \n", + "
2 | \n", + "2 | \n", + "0 | \n", + "[0.204, 0.2638, 0.2598, 0.4483, 0.4483, 0.9959... | \n", + "
3 | \n", + "3 | \n", + "0 | \n", + "[0.217, 0.1881, 0.2451, 0.5992, 0.5992, 0.9962... | \n", + "
4 | \n", + "4 | \n", + "0 | \n", + "[0.2314, 0.1628, 0.2068, 0.6001, 0.6001, 0.998... | \n", + "