choose n_jobs for ensemble according to n_jobs per learner (#551)

* set n_jobs in ensemble dict

* catch the ensemble error

* choose n_jobs for stacker

* clarify
This commit is contained in:
Chi Wang 2022-05-18 21:01:51 -07:00 committed by GitHub
parent 2ca9e41e4b
commit 7126b69ce0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 14 deletions

View File

@ -534,10 +534,11 @@ class AutoML(BaseEstimator):
max_iter: An integer of the maximal number of iterations.
sample: A boolean of whether to sample the training data during
search.
ensemble: boolean or dict | default=False. Whether to perform
ensemble after search. Can be a dict with keys 'passthrough'
and 'final_estimator' to specify the passthrough and
final_estimator in the stacker.
ensemble: boolean or dict | default=False. Whether to perform
ensemble after search. Can be a dict with keys 'passthrough'
and 'final_estimator' to specify the passthrough and
final_estimator in the stacker. The dict can also contain
'n_jobs' as the key to specify the number of jobs for the stacker.
eval_method: A string of resampling strategy, one of
['auto', 'cv', 'holdout'].
split_ratio: A float of the valiation data percentage for holdout.
@ -1667,7 +1668,10 @@ class AutoML(BaseEstimator):
import os
self._state.resources_per_trial = (
{"cpu": os.cpu_count(), "gpu": fit_kwargs.get("gpu_per_trial", 0)}
{
"cpu": max(1, os.cpu_count() >> 1),
"gpu": fit_kwargs.get("gpu_per_trial", 0),
}
if self._state.n_jobs < 0
else {"cpu": self._state.n_jobs, "gpu": fit_kwargs.get("gpu_per_trial", 0)}
)
@ -2070,7 +2074,8 @@ class AutoML(BaseEstimator):
ensemble: boolean or dict | default=False. Whether to perform
ensemble after search. Can be a dict with keys 'passthrough'
and 'final_estimator' to specify the passthrough and
final_estimator in the stacker.
final_estimator in the stacker. The dict can also contain
'n_jobs' as the key to specify the number of jobs for the stacker.
eval_method: A string of resampling strategy, one of
['auto', 'cv', 'holdout'].
split_ratio: A float of the valiation data percentage for holdout.
@ -2300,7 +2305,11 @@ class AutoML(BaseEstimator):
if self._use_ray is not False:
import ray
n_cpus = use_ray and ray.available_resources()["CPU"] or os.cpu_count()
n_cpus = (
ray.is_initialized()
and ray.available_resources()["CPU"]
or os.cpu_count()
)
self._state.resources_per_trial = (
# when using gpu, default cpu is 1 per job; otherwise, default cpu is n_cpus / n_concurrent_trials
@ -3174,18 +3183,36 @@ class AutoML(BaseEstimator):
from sklearn.ensemble import StackingClassifier as Stacker
else:
from sklearn.ensemble import StackingRegressor as Stacker
if self._use_ray is not False:
import ray
n_cpus = (
ray.is_initialized()
and ray.available_resources()["CPU"]
or os.cpu_count()
)
else:
n_cpus = os.cpu_count()
ensemble_n_jobs = (
-self._state.n_jobs # maximize total parallelization degree
if abs(self._state.n_jobs)
== 1 # 1 and -1 correspond to min/max parallelization
else max(1, int(n_cpus / 2 / self._state.n_jobs))
# the total degree of parallelization = parallelization degree per estimator * parallelization degree of ensemble
)
if isinstance(self._ensemble, dict):
final_estimator = self._ensemble.get(
"final_estimator", self._trained_estimator
)
passthrough = self._ensemble.get("passthrough", True)
ensemble_n_jobs = self._ensemble.get("n_jobs", ensemble_n_jobs)
else:
final_estimator = self._trained_estimator
passthrough = True
stacker = Stacker(
estimators,
final_estimator,
n_jobs=self._state.n_jobs,
n_jobs=ensemble_n_jobs,
passthrough=passthrough,
)
sample_weight_dict = (
@ -3195,6 +3222,8 @@ class AutoML(BaseEstimator):
)
for e in estimators:
e[1].__class__.init()
import joblib
try:
stacker.fit(
self._X_train_all,
@ -3225,6 +3254,11 @@ class AutoML(BaseEstimator):
self._trained_estimator.model = stacker
else:
raise e
except joblib.externals.loky.process_executor.TerminatedWorkerError:
logger.error(
"No enough memory to build the ensemble."
" Please try increasing available RAM, decreasing n_jobs for ensemble, or disabling ensemble."
)
elif self._state.retrain_final:
# reset time budget for retraining
if self._max_iter > 1:

View File

@ -102,6 +102,12 @@ class TestClassification(unittest.TestCase):
assert automl.model is not None
automl = AutoML()
try:
import ray
n_concurrent_trials = 2
except ImportError:
n_concurrent_trials = 1
automl_settings = {
"time_budget": 2,
"task": "classification",
@ -113,6 +119,7 @@ class TestClassification(unittest.TestCase):
"log_training_metric": True,
"verbose": 4,
"ensemble": True,
"n_concurrent_trials": n_concurrent_trials,
}
automl.fit(X, y, **automl_settings)

View File

@ -2,8 +2,6 @@ import unittest
import numpy as np
import scipy.sparse
from sklearn.datasets import load_iris, load_wine
from flaml import AutoML
from flaml.data import CLASSIFICATION, get_output_from_log
from flaml.model import LGBMEstimator, XGBoostSklearnEstimator, SKLearnEstimator
@ -141,8 +139,6 @@ class TestMultiClass(unittest.TestCase):
"log_training_metric": True, # whether to log training metric
"n_jobs": 1,
}
"""The main flaml automl API"""
automl.fit(X_train=X_train, y_train=y_train, **settings)
# print the best model found for RGF
print(automl.best_model_for_estimator("RGF"))
@ -167,8 +163,6 @@ class TestMultiClass(unittest.TestCase):
},
"n_jobs": 1,
}
"""The main flaml automl API"""
automl.fit(X_train=X_train, y_train=y_train, **settings)
def test_dataframe(self):