serialize TransformerEstimator (#381)

* serialize TransformerEstimator

* check has_attr

* custom metric needs trainer

* skip test on mac
This commit is contained in:
Chi Wang 2022-01-06 10:28:19 -08:00 committed by GitHub
parent cd9740f022
commit 612668e8ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 102 additions and 88 deletions

View File

@ -321,61 +321,62 @@ class AutoMLState:
if self.time_budget is None
else self.time_budget - self.time_from_start
)
# if self.resources_per_trial.get("gpu", 0) > 0:
if (
hasattr(self, "resources_per_trial")
and self.resources_per_trial.get("gpu", 0) > 0
):
# def _trainable_function_wrapper(config: dict):
def _trainable_function_wrapper(config: dict):
# return_estimator, train_time = train_estimator(
# X_train=sampled_X_train,
# y_train=sampled_y_train,
# config_dic=config,
# task=self.task,
# estimator_name=estimator,
# n_jobs=self.n_jobs,
# estimator_class=self.learner_classes.get(estimator),
# budget=budget,
# fit_kwargs=self.fit_kwargs,
# )
# return {"estimator": return_estimator, "train_time": train_time}
return_estimator, train_time = train_estimator(
X_train=sampled_X_train,
y_train=sampled_y_train,
config_dic=config,
task=self.task,
estimator_name=estimator,
n_jobs=self.n_jobs,
estimator_class=self.learner_classes.get(estimator),
budget=budget,
fit_kwargs=self.fit_kwargs,
)
return {"estimator": return_estimator, "train_time": train_time}
# if estimator not in self.learner_classes:
# self.learner_classes[estimator] = get_estimator_class(
# self.task, estimator
# )
if estimator not in self.learner_classes:
self.learner_classes[estimator] = get_estimator_class(
self.task, estimator
)
# analysis = tune.run(
# _trainable_function_wrapper,
# config=config_w_resource,
# metric="train_time",
# mode="min",
# resources_per_trial=self.resources_per_trial,
# num_samples=1,
# use_ray=True,
# )
# result = list(analysis.results.values())[0]
# estimator, train_time = result["estimator"], result["train_time"]
# else:
if _is_nlp_task(self.task):
use_ray = self.fit_kwargs.get("use_ray")
self.fit_kwargs["use_ray"] = False
# TODO: limit number of GPUs
estimator, train_time = train_estimator(
X_train=sampled_X_train,
y_train=sampled_y_train,
config_dic=config,
task=self.task,
estimator_name=estimator,
n_jobs=self.n_jobs,
estimator_class=self.learner_classes.get(estimator),
budget=budget,
fit_kwargs=self.fit_kwargs,
)
if _is_nlp_task(self.task):
if use_ray is None:
del self.fit_kwargs["use_ray"]
else:
self.fit_kwargs["use_ray"] = use_ray
analysis = tune.run(
_trainable_function_wrapper,
config=config_w_resource,
metric="train_time",
mode="min",
resources_per_trial=self.resources_per_trial,
num_samples=1,
use_ray=True,
)
result = list(analysis.results.values())[0]
estimator, train_time = result["estimator"], result["train_time"]
else:
if _is_nlp_task(self.task):
use_ray = self.fit_kwargs.get("use_ray")
self.fit_kwargs["use_ray"] = False
estimator, train_time = train_estimator(
X_train=sampled_X_train,
y_train=sampled_y_train,
config_dic=config,
task=self.task,
estimator_name=estimator,
n_jobs=self.n_jobs,
estimator_class=self.learner_classes.get(estimator),
budget=budget,
fit_kwargs=self.fit_kwargs,
)
if _is_nlp_task(self.task):
if use_ray is None:
del self.fit_kwargs["use_ray"]
else:
self.fit_kwargs["use_ray"] = use_ray
if sampled_weight is not None:
self.fit_kwargs["sample_weight"] = weight
return estimator, train_time

View File

@ -384,6 +384,16 @@ class TransformersEstimator(BaseEstimator):
else:
return X, None
def _model_init(self, num_labels, per_model_config):
from .nlp.utils import load_model
return load_model(
checkpoint_path=self.custom_hpo_args.model_path,
task=self._task,
num_labels=num_labels,
per_model_config=per_model_config,
)
def fit(self, X_train: DataFrame, y_train: Series, budget=None, **kwargs):
from transformers import EarlyStoppingCallback
from transformers.trainer_utils import set_seed
@ -548,17 +558,9 @@ class TransformersEstimator(BaseEstimator):
**training_args_config,
)
def _model_init():
return load_model(
checkpoint_path=self.custom_hpo_args.model_path,
task=self._task,
num_labels=num_labels,
per_model_config=per_model_config,
)
self._model = TrainerForAuto(
self._trainer = TrainerForAuto(
args=training_args,
model_init=_model_init,
model_init=partial(self._model_init, num_labels, per_model_config),
train_dataset=train_dataset,
eval_dataset=eval_dataset,
tokenizer=tokenizer,
@ -572,20 +574,27 @@ class TransformersEstimator(BaseEstimator):
callbacks=[EarlyStoppingCallbackForAuto],
)
setattr(self._model, "_use_ray", self.use_ray)
setattr(self._trainer, "_use_ray", self.use_ray)
if self._task in NLG_TASKS:
setattr(self._model, "_is_seq2seq", True)
self._model.train()
setattr(self._trainer, "_is_seq2seq", True)
self._trainer.train()
self.params[self.ITER_HP] = self._model.state.global_step
self._checkpoint_path = self._select_checkpoint(self._model)
self.params[self.ITER_HP] = self._trainer.state.global_step
self._checkpoint_path = self._select_checkpoint(self._trainer)
self._kwargs = kwargs
self._num_labels = num_labels
self._per_model_config = per_model_config
self._training_args_config = training_args_config
self._ckpt_remains = list(self._model.ckpt_to_metric.keys())
self._ckpt_remains = list(self._trainer.ckpt_to_metric.keys())
self._model = load_model(
checkpoint_path=self._checkpoint_path,
task=self._task,
num_labels=self._num_labels,
per_model_config=self._per_model_config,
)
self._trainer = None
def _delete_one_ckpt(self, ckpt_location):
if self.use_ray is False:
@ -667,19 +676,12 @@ class TransformersEstimator(BaseEstimator):
def _init_model_for_predict(self, X_test):
from datasets import Dataset
from .nlp.utils import load_model
from transformers import AutoTokenizer
from .nlp.huggingface.trainer import TrainerForAuto
from .nlp.huggingface.data_collator import DataCollatorForPredict
X_test, _ = self._preprocess(X_test, **self._kwargs)
test_dataset = Dataset.from_pandas(X_test)
best_model = load_model(
checkpoint_path=self._checkpoint_path,
task=self._task,
num_labels=self._num_labels,
per_model_config=self._per_model_config,
)
training_args = self._TrainingArguments(
per_device_eval_batch_size=1,
output_dir=self.custom_hpo_args.output_dir,
@ -688,8 +690,8 @@ class TransformersEstimator(BaseEstimator):
tokenizer = AutoTokenizer.from_pretrained(
self.custom_hpo_args.model_path, use_fast=True
)
self._model = TrainerForAuto(
model=best_model,
self._trainer = TrainerForAuto(
model=self._model,
args=training_args,
data_collator=DataCollatorForPredict(
tokenizer=tokenizer,
@ -706,20 +708,21 @@ class TransformersEstimator(BaseEstimator):
), "predict_proba() only for classification tasks."
test_dataset, _ = self._init_model_for_predict(X_test)
predictions = self._model.predict(test_dataset)
predictions = self._trainer.predict(test_dataset)
self._trainer = None
return predictions.predictions
def predict(self, X_test):
test_dataset, training_args = self._init_model_for_predict(X_test)
if self._task not in NLG_TASKS:
predictions = self._model.predict(test_dataset)
predictions = self._trainer.predict(test_dataset)
else:
predictions = self._model.predict(
predictions = self._trainer.predict(
test_dataset,
max_length=training_args.generation_max_length,
num_beams=training_args.generation_num_beams,
)
self._trainer = None
if self._task == SEQCLASSIFICATION:
return np.argmax(predictions.predictions, axis=1)
elif self._task == SEQREGRESSION:

View File

@ -1,5 +1,7 @@
import sys
import pytest
import pickle
import shutil
@pytest.mark.skipif(sys.platform == "darwin", reason="do not run on mac os")
@ -53,6 +55,7 @@ def test_hf_data():
automl.fit(
X_train=X_train, y_train=y_train, X_val=X_val, y_val=y_val, **automl_settings
)
automl = AutoML()
automl.retrain_from_log(
X_train=X_train,
@ -61,7 +64,11 @@ def test_hf_data():
record_id=0,
**automl_settings
)
with open("automl.pkl", "wb") as f:
pickle.dump(automl, f, pickle.HIGHEST_PROTOCOL)
with open("automl.pkl", "rb") as f:
automl = pickle.load(f)
shutil.rmtree("test/data/output/")
automl.predict(X_test)
automl.predict(["test test", "test test"])
automl.predict(

View File

@ -18,6 +18,12 @@ def custom_metric(
from datasets import Dataset
from flaml.model import TransformersEstimator
if estimator._trainer is None:
estimator._init_model_for_predict(X_test)
trainer = estimator._trainer
estimator._trainer = None
else:
trainer = estimator._trainer
if y_test is not None:
X_test, _ = estimator._preprocess(X_test)
eval_dataset = Dataset.from_pandas(TransformersEstimator._join(X_test, y_test))
@ -25,14 +31,11 @@ def custom_metric(
X_test, _ = estimator._preprocess(X_test)
eval_dataset = Dataset.from_pandas(X_test)
trainer = estimator._model
trainer_compute_metrics_cache = trainer.compute_metrics
trainer.compute_metrics = None
metrics = trainer.evaluate(eval_dataset)
trainer.compute_metrics = trainer_compute_metrics_cache
return metrics["eval_loss"], metrics

View File

@ -1,8 +1,8 @@
import os
import sys
import pytest
@pytest.mark.skipif(os.name == "darwin", reason="do not run on mac os")
@pytest.mark.skipif(sys.platform == "darwin", reason="do not run on mac os")
def test_mcc():
from flaml import AutoML

View File

@ -1,8 +1,8 @@
import os
import sys
import pytest
@pytest.mark.skipif(os.name == "darwin", reason="do not run on mac os")
@pytest.mark.skipif(sys.platform == "darwin", reason="do not run on mac os")
def test_summarization():
from flaml import AutoML
from pandas import DataFrame