diff --git a/openml/extensions/base/__init__.py b/openml/extensions/base/__init__.py new file mode 100644 index 000000000..d85c0b268 --- /dev/null +++ b/openml/extensions/base/__init__.py @@ -0,0 +1,13 @@ +# License: BSD 3-Clause + +"""Base classes for OpenML extensions.""" + +from openml.extensions.base._connector import OpenMLAPIConnector +from openml.extensions.base._executor import ModelExecutor +from openml.extensions.base._serializer import ModelSerializer + +__all__ = [ + "ModelExecutor", + "ModelSerializer", + "OpenMLAPIConnector", +] diff --git a/openml/extensions/base/_connector.py b/openml/extensions/base/_connector.py new file mode 100644 index 000000000..9ad66307a --- /dev/null +++ b/openml/extensions/base/_connector.py @@ -0,0 +1,28 @@ +# License: BSD 3-Clause + +"""Base class for OpenML API connectors.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from openml.extensions.base import ModelExecutor, ModelSerializer + + +class OpenMLAPIConnector(ABC): + """Base class for OpenML API connectors.""" + + @abstractmethod + def serializer(self) -> ModelSerializer: + """Return the serializer for this API.""" + + @abstractmethod + def executor(self) -> ModelExecutor: + """Return the executor for this API.""" + + @classmethod + @abstractmethod + def supports(cls, estimator: Any) -> bool: + """High-level check if this connector supports the estimator instance or flow.""" diff --git a/openml/extensions/base/_executor.py b/openml/extensions/base/_executor.py new file mode 100644 index 000000000..67184a3b3 --- /dev/null +++ b/openml/extensions/base/_executor.py @@ -0,0 +1,151 @@ +# License: BSD 3-Clause + +"""Base class for estimator executors.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections import OrderedDict +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + import numpy as np + import scipy.sparse + + from openml.flows import OpenMLFlow + from openml.runs.trace import OpenMLRunTrace, OpenMLTraceIteration + from openml.tasks.task import OpenMLTask + + +class ModelExecutor(ABC): + """Define runtime execution semantics for a specific API type.""" + + @abstractmethod + def seed_model(self, model: Any, seed: int | None) -> Any: + """Set the seed of all the unseeded components of a model and return the seeded model. + + Required so that all seed information can be uploaded to OpenML for reproducible results. + + Parameters + ---------- + model : Any + The model to be seeded + seed : int + + Returns + ------- + model + """ + + @abstractmethod + def _run_model_on_fold( # noqa: PLR0913 + self, + model: Any, + task: OpenMLTask, + X_train: np.ndarray | scipy.sparse.spmatrix, + rep_no: int, + fold_no: int, + y_train: np.ndarray | None = None, + X_test: np.ndarray | scipy.sparse.spmatrix | None = None, + ) -> tuple[np.ndarray, np.ndarray | None, OrderedDict[str, float], OpenMLRunTrace | None]: + """Run a model on a repeat, fold, subsample triplet of the task. + + Returns the data that is necessary to construct the OpenML Run object. Is used by + :func:`openml.runs.run_flow_on_task`. + + Parameters + ---------- + model : Any + The UNTRAINED model to run. The model instance will be copied and not altered. + task : OpenMLTask + The task to run the model on. + X_train : array-like + Training data for the given repetition and fold. + rep_no : int + The repeat of the experiment (0-based; in case of 1 time CV, always 0) + fold_no : int + The fold nr of the experiment (0-based; in case of holdout, always 0) + y_train : Optional[np.ndarray] (default=None) + Target attributes for supervised tasks. In case of classification, these are integer + indices to the potential classes specified by dataset. + X_test : Optional, array-like (default=None) + Test attributes to test for generalization in supervised tasks. + + Returns + ------- + predictions : np.ndarray + Model predictions. + probabilities : Optional, np.ndarray + Predicted probabilities (only applicable for supervised classification tasks). + user_defined_measures : OrderedDict[str, float] + User defined measures that were generated on this fold + trace : Optional, OpenMLRunTrace + Hyperparameter optimization trace (only applicable for supervised tasks with + hyperparameter optimization). + """ + + @abstractmethod + def check_if_model_fitted(self, model: Any) -> bool: + """Returns True/False denoting if the model has already been fitted/trained. + + Parameters + ---------- + model : Any + + Returns + ------- + bool + """ + + @abstractmethod + def obtain_parameter_values( + self, + flow: OpenMLFlow, + model: Any = None, + ) -> list[dict[str, Any]]: + """Extracts all parameter settings required for the flow from the model. + + If no explicit model is provided, the parameters will be extracted from `flow.model` + instead. + + Parameters + ---------- + flow : OpenMLFlow + OpenMLFlow object (containing flow ids, i.e., it has to be downloaded from the server) + + model: Any, optional (default=None) + The model from which to obtain the parameter values. Must match the flow signature. + If None, use the model specified in ``OpenMLFlow.model``. + + Returns + ------- + list + A list of dicts, where each dict has the following entries: + - ``oml:name`` : str: The OpenML parameter name + - ``oml:value`` : mixed: A representation of the parameter value + - ``oml:component`` : int: flow id to which the parameter belongs + """ + + # Abstract methods for hyperparameter optimization + + @abstractmethod + def instantiate_model_from_hpo_class( + self, + model: Any, + trace_iteration: OpenMLTraceIteration, + ) -> Any: + """Instantiate a base model which can be searched over by the hyperparameter optimization + model. + + Parameters + ---------- + model : Any + A hyperparameter optimization model which defines the model to be instantiated. + trace_iteration : OpenMLTraceIteration + Describing the hyperparameter settings to instantiate. + + Returns + ------- + Any + """ + # TODO a trace belongs to a run and therefore a flow -> simplify this part of the interface! diff --git a/openml/extensions/base/_serializer.py b/openml/extensions/base/_serializer.py new file mode 100644 index 000000000..f2673d4c4 --- /dev/null +++ b/openml/extensions/base/_serializer.py @@ -0,0 +1,73 @@ +# License: BSD 3-Clause + +"""Base class for estimator serializors.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from openml.flows import OpenMLFlow + + +class ModelSerializer(ABC): + """Handle the conversion between estimator instances and OpenML Flows.""" + + @classmethod + @abstractmethod + def can_handle_model(cls, model: Any) -> bool: + """Check whether a model flow can be handled by this extension. + + This is typically done by checking the type of the model, or the package it belongs to. + + Parameters + ---------- + model : Any + + Returns + ------- + bool + """ + + @abstractmethod + def model_to_flow(self, model: Any) -> OpenMLFlow: + """Transform a model to a flow for uploading it to OpenML. + + Parameters + ---------- + model : Any + + Returns + ------- + OpenMLFlow + """ + + @abstractmethod + def flow_to_model( + self, + flow: OpenMLFlow, + initialize_with_defaults: bool = False, # noqa: FBT001, FBT002 + strict_version: bool = True, # noqa: FBT002, FBT001 + ) -> Any: + """Instantiate a model from the flow representation. + + Parameters + ---------- + flow : OpenMLFlow + + initialize_with_defaults : bool, optional (default=False) + If this flag is set, the hyperparameter values of flows will be + ignored and a flow with its defaults is returned. + + strict_version : bool, default=True + Whether to fail if version requirements are not fulfilled. + + Returns + ------- + Any + """ + + @abstractmethod + def get_version_information(self) -> list[str]: + """Return dependency and version information.""" diff --git a/openml/extensions/registry.py b/openml/extensions/registry.py new file mode 100644 index 000000000..e3bc0788e --- /dev/null +++ b/openml/extensions/registry.py @@ -0,0 +1,50 @@ +# License: BSD 3-Clause + +"""Extension registry.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from openml.exceptions import PyOpenMLError +from openml.extensions.sklearn import SklearnAPIConnector + +if TYPE_CHECKING: + from openml.extensions.base import OpenMLAPIConnector + +API_CONNECTOR_REGISTRY: list[type[OpenMLAPIConnector]] = [ + SklearnAPIConnector, +] + + +def resolve_api_connector(estimator: Any) -> OpenMLAPIConnector: + """ + Identify and return the appropriate OpenML API connector for a given estimator. + + This function iterates through the global ``API_CONNECTOR_REGISTRY`` to find + a connector class that supports the provided estimator instance or OpenML flow. + If a matching connector is found, it is instantiated and returned. + + Parameters + ---------- + estimator : Any + The estimator instance (e.g., a scikit-learn estimator) or OpenML flow for + which an API connector is required. + + Returns + ------- + OpenMLAPIConnector + An instance of the matching API connector. + + Raises + ------ + OpenMLException + If no connector is found in the registry that supports the provided + model, or if multiple connectors in the registry claim support for + the provided model. + """ + for connector_cls in API_CONNECTOR_REGISTRY: + if connector_cls.supports(estimator): + return connector_cls() + + raise PyOpenMLError("No OpenML API connector supports this estimator.") diff --git a/openml/extensions/sklearn/__init__.py b/openml/extensions/sklearn/__init__.py new file mode 100644 index 000000000..4e22407be --- /dev/null +++ b/openml/extensions/sklearn/__init__.py @@ -0,0 +1,13 @@ +# License: BSD 3-Clause + +"""OpenML extension for Scikit-learn.""" + +from openml.extensions.sklearn.connector import SklearnAPIConnector +from openml.extensions.sklearn.executor import SklearnExecutor +from openml.extensions.sklearn.serializer import SklearnSerializer + +__all__ = [ + "SklearnAPIConnector", + "SklearnExecutor", + "SklearnSerializer", +] diff --git a/openml/extensions/sklearn/connector.py b/openml/extensions/sklearn/connector.py new file mode 100644 index 000000000..795baf640 --- /dev/null +++ b/openml/extensions/sklearn/connector.py @@ -0,0 +1,68 @@ +# License: BSD 3-Clause + +"""Connector for the Scikit-learn extension.""" + +from __future__ import annotations + +from openml.extensions.base import OpenMLAPIConnector +from openml.extensions.sklearn.executor import SklearnExecutor +from openml.extensions.sklearn.serializer import SklearnSerializer +from openml.flows import OpenMLFlow + + +class SklearnAPIConnector(OpenMLAPIConnector): + """ + Connector for the Scikit-learn extension. + + This class provides the interface to connect Scikit-learn models and flows + to the OpenML API, handling both serialization and execution compatibility checks. + """ + + def serializer(self) -> SklearnSerializer: + """ + Return the serializer for Scikit-learn estimators. + + Returns + ------- + SklearnSerializer + The serializer instance capable of handling Scikit-learn estimator. + """ + return SklearnSerializer() + + def executor(self) -> SklearnExecutor: + """ + Return the executor for Scikit-learn estimators. + + Returns + ------- + SklearnExecutor + The executor instance capable of running Scikit-learn estimators. + """ + return SklearnExecutor() + + @classmethod + def supports(cls, estimator) -> bool: + """ + Check if this connector supports the given model or flow. + + Parameters + ---------- + estimator : Any or OpenMLFlow + The Scikit-learn estimator instance or OpenMLFlow object. + + Returns + ------- + bool + True if both the serializer and executor can handle the provided + estimator or flow, False otherwise. + """ + serializer = SklearnSerializer() + SklearnExecutor() + + if isinstance(estimator, OpenMLFlow): + support = serializer.can_handle_flow(estimator) + + else: + support = serializer.can_handle_model(estimator) + + return support diff --git a/openml/extensions/sklearn/executor.py b/openml/extensions/sklearn/executor.py new file mode 100644 index 000000000..69d54a102 --- /dev/null +++ b/openml/extensions/sklearn/executor.py @@ -0,0 +1,784 @@ +# License: BSD 3-Clause +from __future__ import annotations + +import json +import logging +import time +import warnings +from collections import OrderedDict +from typing import TYPE_CHECKING, Any, List, cast + +import numpy as np +import pandas as pd +import sklearn.base +import sklearn.model_selection +import sklearn.pipeline + +import openml +from openml.exceptions import PyOpenMLError +from openml.extensions.base import ModelExecutor +from openml.flows import OpenMLFlow +from openml.runs.trace import PREFIX, OpenMLRunTrace, OpenMLTraceIteration +from openml.tasks import ( + OpenMLClassificationTask, + OpenMLClusteringTask, + OpenMLLearningCurveTask, + OpenMLRegressionTask, + OpenMLSupervisedTask, + OpenMLTask, +) + +if TYPE_CHECKING: + import scipy.sparse + +logger = logging.getLogger(__name__) + +SKLEARN_PIPELINE_STRING_COMPONENTS = ("drop", "passthrough") +COMPONENT_REFERENCE = "component_reference" +COMPOSITION_STEP_CONSTANT = "composition_step_constant" + + +class SklearnExecutor(ModelExecutor): + """Executor for Scikit-learn estimators.""" + + def seed_model(self, model: Any, seed: int | None = None) -> Any: # noqa: C901 + """Set the random state of all the unseeded components of a model and return the seeded + model. + + Required so that all seed information can be uploaded to OpenML for reproducible results. + + Models that are already seeded will maintain the seed. In this case, + only integer seeds are allowed (An exception is raised when a RandomState was used as + seed). + + Parameters + ---------- + model : sklearn model + The model to be seeded + seed : int + The seed to initialize the RandomState with. Unseeded subcomponents + will be seeded with a random number from the RandomState. + + Returns + ------- + Any + """ + + def _seed_current_object(current_value): + if isinstance(current_value, int): # acceptable behaviour + return False + + if isinstance(current_value, np.random.RandomState): + raise ValueError( + "Models initialized with a RandomState object are not " + "supported. Please seed with an integer. ", + ) + + if current_value is not None: + raise ValueError( + "Models should be seeded with int or None (this should never happen). ", + ) + + return True + + rs = np.random.RandomState(seed) + model_params = model.get_params() + random_states = {} + for param_name in sorted(model_params): + if "random_state" in param_name: + current_value = model_params[param_name] + # important to draw the value at this point (and not in the if + # statement) this way we guarantee that if a different set of + # subflows is seeded, the same number of the random generator is + # used + new_value = rs.randint(0, 2**16) + if _seed_current_object(current_value): + random_states[param_name] = new_value + + # Also seed CV objects! + elif isinstance(model_params[param_name], sklearn.model_selection.BaseCrossValidator): + if not hasattr(model_params[param_name], "random_state"): + continue + + current_value = model_params[param_name].random_state + new_value = rs.randint(0, 2**16) + if _seed_current_object(current_value): + model_params[param_name].random_state = new_value + + model.set_params(**random_states) + return model + + def check_if_model_fitted(self, model: Any) -> bool: + """Returns True/False denoting if the model has already been fitted/trained + + Parameters + ---------- + model : Any + + Returns + ------- + bool + """ + from sklearn.exceptions import NotFittedError + from sklearn.utils.validation import check_is_fitted + + try: + # check if model is fitted + check_is_fitted(model) + + # Creating random dummy data of arbitrary size + dummy_data = np.random.uniform(size=(10, 3)) # noqa: NPY002 + # Using 'predict' instead of 'sklearn.utils.validation.check_is_fitted' for a more + # robust check that works across sklearn versions and models. Internally, 'predict' + # should call 'check_is_fitted' for every concerned attribute, thus offering a more + # assured check than explicit calls to 'check_is_fitted' + model.predict(dummy_data) + # Will reach here if the model was fit on a dataset with 3 features + return True + except NotFittedError: # needs to be the first exception to be caught + # Model is not fitted, as is required + return False + except ValueError: + # Will reach here if the model was fit on a dataset with more or less than 3 features + return True + + def _run_model_on_fold( # noqa: PLR0915, PLR0913, C901, PLR0912 + self, + model: Any, + task: OpenMLTask, + X_train: np.ndarray | scipy.sparse.spmatrix | pd.DataFrame, + rep_no: int, + fold_no: int, + y_train: np.ndarray | None = None, + X_test: np.ndarray | scipy.sparse.spmatrix | pd.DataFrame | None = None, + ) -> tuple[ + np.ndarray, + pd.DataFrame | None, + OrderedDict[str, float], + OpenMLRunTrace | None, + ]: + """Run a model on a repeat,fold,subsample triplet of the task and return prediction + information. + + Furthermore, it will measure run time measures in case multi-core behaviour allows this. + * exact user cpu time will be measured if the number of cores is set (recursive throughout + the model) exactly to 1 + * wall clock time will be measured if the number of cores is set (recursive throughout the + model) to any given number (but not when it is set to -1) + + Returns the data that is necessary to construct the OpenML Run object. Is used by + run_task_get_arff_content. Do not use this function unless you know what you are doing. + + Parameters + ---------- + model : Any + The UNTRAINED model to run. The model instance will be copied and not altered. + task : OpenMLTask + The task to run the model on. + X_train : array-like + Training data for the given repetition and fold. + rep_no : int + The repeat of the experiment (0-based; in case of 1 time CV, always 0) + fold_no : int + The fold nr of the experiment (0-based; in case of holdout, always 0) + y_train : Optional[np.ndarray] (default=None) + Target attributes for supervised tasks. In case of classification, these are integer + indices to the potential classes specified by dataset. + X_test : Optional, array-like (default=None) + Test attributes to test for generalization in supervised tasks. + + Returns + ------- + pred_y : np.ndarray + Predictions on the training/test set, depending on the task type. + For supervised tasks, predictions are on the test set. + For unsupervised tasks, predictions are on the training set. + proba_y : pd.DataFrame, optional + Predicted probabilities for the test set. + None, if task is not Classification or Learning Curve prediction. + user_defined_measures : OrderedDict[str, float] + User defined measures that were generated on this fold + trace : OpenMLRunTrace, optional + arff trace object from a fitted model and the trace content obtained by + repeatedly calling ``run_model_on_task`` + """ + + def _prediction_to_probabilities( + y: np.ndarray | list, + model_classes: list[Any], + class_labels: list[str] | None, + ) -> pd.DataFrame: + """Transforms predicted probabilities to match with OpenML class indices. + + Parameters + ---------- + y : np.ndarray + Predicted probabilities (possibly omitting classes if they were not present in the + training data). + model_classes : list + List of classes known_predicted by the model, ordered by their index. + class_labels : list + List of classes as stored in the task object fetched from server. + + Returns + ------- + pd.DataFrame + """ + if class_labels is None: + raise ValueError("The task has no class labels") + + if isinstance(y_train, np.ndarray) and isinstance(class_labels[0], str): + # mapping (decoding) the predictions to the categories + # creating a separate copy to not change the expected pred_y type + y = [class_labels[pred] for pred in y] # list or numpy array of predictions + + # model_classes: sklearn classifier mapping from original array id to + # prediction index id + if not isinstance(model_classes, list): + raise ValueError("please convert model classes to list prior to calling this fn") + + # DataFrame allows more accurate mapping of classes as column names + result = pd.DataFrame( + 0, + index=np.arange(len(y)), + columns=model_classes, + dtype=np.float32, + ) + for obs, prediction in enumerate(y): + result.loc[obs, prediction] = 1.0 + return result + + if isinstance(task, OpenMLSupervisedTask): + if y_train is None: + raise TypeError("argument y_train must not be of type None") + if X_test is None: + raise TypeError("argument X_test must not be of type None") + + model_copy = sklearn.base.clone(model, safe=True) + # sanity check: prohibit users from optimizing n_jobs + self._prevent_optimize_n_jobs(model_copy) + # measures and stores runtimes + user_defined_measures = OrderedDict() # type: 'OrderedDict[str, float]' + try: + # for measuring runtime. Only available since Python 3.3 + modelfit_start_cputime = time.process_time() + modelfit_start_walltime = time.time() + + if isinstance(task, OpenMLSupervisedTask): + model_copy.fit(X_train, y_train) # type: ignore + elif isinstance(task, OpenMLClusteringTask): + model_copy.fit(X_train) # type: ignore + + modelfit_dur_cputime = (time.process_time() - modelfit_start_cputime) * 1000 + modelfit_dur_walltime = (time.time() - modelfit_start_walltime) * 1000 + + user_defined_measures["usercpu_time_millis_training"] = modelfit_dur_cputime + refit_time = model_copy.refit_time_ * 1000 if hasattr(model_copy, "refit_time_") else 0 # type: ignore + user_defined_measures["wall_clock_time_millis_training"] = modelfit_dur_walltime + + except AttributeError as e: + # typically happens when training a regressor on classification task + raise PyOpenMLError(str(e)) from e + + if isinstance(task, (OpenMLClassificationTask, OpenMLLearningCurveTask)): + # search for model classes_ (might differ depending on modeltype) + # first, pipelines are a special case (these don't have a classes_ + # object, but rather borrows it from the last step. We do this manually, + # because of the BaseSearch check) + if isinstance(model_copy, sklearn.pipeline.Pipeline): + used_estimator = model_copy.steps[-1][-1] + else: + used_estimator = model_copy + + if self._is_hpo_class(used_estimator): + model_classes = used_estimator.best_estimator_.classes_ + else: + model_classes = used_estimator.classes_ + + if not isinstance(model_classes, list): + model_classes = model_classes.tolist() + + # to handle the case when dataset is numpy and categories are encoded + # however the class labels stored in task are still categories + if isinstance(y_train, np.ndarray) and isinstance( + cast("List", task.class_labels)[0], + str, + ): + model_classes = [cast("List[str]", task.class_labels)[i] for i in model_classes] + + modelpredict_start_cputime = time.process_time() + modelpredict_start_walltime = time.time() + + # In supervised learning this returns the predictions for Y, in clustering + # it returns the clusters + if isinstance(task, OpenMLSupervisedTask): + pred_y = model_copy.predict(X_test) + elif isinstance(task, OpenMLClusteringTask): + pred_y = model_copy.predict(X_train) + else: + raise ValueError(task) + + modelpredict_duration_cputime = (time.process_time() - modelpredict_start_cputime) * 1000 + user_defined_measures["usercpu_time_millis_testing"] = modelpredict_duration_cputime + user_defined_measures["usercpu_time_millis"] = ( + modelfit_dur_cputime + modelpredict_duration_cputime + ) + modelpredict_duration_walltime = (time.time() - modelpredict_start_walltime) * 1000 + user_defined_measures["wall_clock_time_millis_testing"] = modelpredict_duration_walltime + user_defined_measures["wall_clock_time_millis"] = ( + modelfit_dur_walltime + modelpredict_duration_walltime + refit_time + ) + + if isinstance(task, (OpenMLClassificationTask, OpenMLLearningCurveTask)): + try: + proba_y = model_copy.predict_proba(X_test) + proba_y = pd.DataFrame(proba_y, columns=model_classes) # handles X_test as numpy + except AttributeError: # predict_proba is not available when probability=False + proba_y = _prediction_to_probabilities(pred_y, model_classes, task.class_labels) + + if task.class_labels is not None: + if proba_y.shape[1] != len(task.class_labels): + # Remap the probabilities in case there was a class missing + # at training time. By default, the classification targets + # are mapped to be zero-based indices to the actual classes. + # Therefore, the model_classes contain the correct indices to + # the correct probability array. Example: + # classes in the dataset: 0, 1, 2, 3, 4, 5 + # classes in the training set: 0, 1, 2, 4, 5 + # then we need to add a column full of zeros into the probabilities + # for class 3 because the rest of the library expects that the + # probabilities are ordered the same way as the classes are ordered). + message = ( + f"Estimator only predicted for {proba_y.shape[1]}/{len(task.class_labels)}" + " classes!" + ) + warnings.warn(message, stacklevel=2) + openml.config.logger.warning(message) + + for _i, col in enumerate(task.class_labels): + # adding missing columns with 0 probability + if col not in model_classes: + proba_y[col] = 0 + # We re-order the columns to move possibly added missing columns into place. + proba_y = proba_y[task.class_labels] + else: + raise ValueError("The task has no class labels") + + if not np.all(set(proba_y.columns) == set(task.class_labels)): + missing_cols = list(set(task.class_labels) - set(proba_y.columns)) + raise ValueError("Predicted probabilities missing for the columns: ", missing_cols) + + elif isinstance(task, (OpenMLRegressionTask, OpenMLClusteringTask)): + proba_y = None + else: + raise TypeError(type(task)) + + if self._is_hpo_class(model_copy): + trace_data = self._extract_trace_data(model_copy, rep_no, fold_no) + trace: OpenMLRunTrace | None = self._obtain_arff_trace( + model_copy, + trace_data, + ) + else: + trace = None + + return pred_y, proba_y, user_defined_measures, trace + + def obtain_parameter_values( # noqa: C901, PLR0915 + self, + flow: OpenMLFlow, + model: Any = None, + ) -> list[dict[str, Any]]: + """Extracts all parameter settings required for the flow from the model. + + If no explicit model is provided, the parameters will be extracted from `flow.model` + instead. + + Parameters + ---------- + flow : OpenMLFlow + OpenMLFlow object (containing flow ids, i.e., it has to be downloaded from the server) + + model: Any, optional (default=None) + The model from which to obtain the parameter values. Must match the flow signature. + If None, use the model specified in ``OpenMLFlow.model``. + + Returns + ------- + list + A list of dicts, where each dict has the following entries: + - ``oml:name`` : str: The OpenML parameter name + - ``oml:value`` : mixed: A representation of the parameter value + - ``oml:component`` : int: flow id to which the parameter belongs + """ + openml.flows.functions._check_flow_for_server_id(flow) + + def get_flow_dict(_flow): + flow_map = {_flow.name: _flow.flow_id} + for subflow in _flow.components: + flow_map.update(get_flow_dict(_flow.components[subflow])) + return flow_map + + def extract_parameters( # noqa: PLR0915, PLR0912, C901 + _flow, + _flow_dict, + component_model, + _main_call=False, # noqa: FBT002 + main_id=None, + ): + def is_subcomponent_specification(values): + # checks whether the current value can be a specification of + # subcomponents, as for example the value for steps parameter + # (in Pipeline) or transformers parameter (in + # ColumnTransformer). + return ( + # Specification requires list/tuple of list/tuple with + # at least length 2. + isinstance(values, (tuple, list)) + and all(isinstance(item, (tuple, list)) and len(item) > 1 for item in values) + # And each component needs to be a flow or interpretable string + and all( + isinstance(item[1], openml.flows.OpenMLFlow) + or ( + isinstance(item[1], str) + and item[1] in SKLEARN_PIPELINE_STRING_COMPONENTS + ) + for item in values + ) + ) + + # _flow is openml flow object, _param dict maps from flow name to flow + # id for the main call, the param dict can be overridden (useful for + # unit tests / sentinels) this way, for flows without subflows we do + # not have to rely on _flow_dict + exp_parameters = set(_flow.parameters) + if ( + isinstance(component_model, str) + and component_model in SKLEARN_PIPELINE_STRING_COMPONENTS + ): + model_parameters = set() + else: + model_parameters = set(component_model.get_params(deep=False)) + if len(exp_parameters.symmetric_difference(model_parameters)) != 0: + flow_params = sorted(exp_parameters) + model_params = sorted(model_parameters) + raise ValueError( + "Parameters of the model do not match the " + "parameters expected by the " + "flow:\nexpected flow parameters: " + f"{flow_params}\nmodel parameters: {model_params}", + ) + exp_components = set(_flow.components) + if ( + isinstance(component_model, str) + and component_model in SKLEARN_PIPELINE_STRING_COMPONENTS + ): + model_components = set() + else: + _ = set(component_model.get_params(deep=False)) + model_components = { + mp + for mp in component_model.get_params(deep=True) + if "__" not in mp and mp not in _ + } + if len(exp_components.symmetric_difference(model_components)) != 0: + is_problem = True + if len(exp_components - model_components) > 0: + # If an expected component is not returned as a component by get_params(), + # this means that it is also a parameter -> we need to check that this is + # actually the case + difference = exp_components - model_components + component_in_model_parameters = [] + for component in difference: + if component in model_parameters: + component_in_model_parameters.append(True) + else: + component_in_model_parameters.append(False) + is_problem = not all(component_in_model_parameters) + if is_problem: + flow_components = sorted(exp_components) + model_components = sorted(model_components) + raise ValueError( + "Subcomponents of the model do not match the " + "parameters expected by the " + "flow:\nexpected flow subcomponents: " + f"{flow_components}\nmodel subcomponents: {model_components}", + ) + + _params = [] + for _param_name in _flow.parameters: + _current = OrderedDict() + _current["oml:name"] = _param_name + + current_param_values = self.model_to_flow(component_model.get_params()[_param_name]) + + # Try to filter out components (a.k.a. subflows) which are + # handled further down in the code (by recursively calling + # this function)! + if isinstance(current_param_values, openml.flows.OpenMLFlow): + continue + + if is_subcomponent_specification(current_param_values): + # complex parameter value, with subcomponents + parsed_values = [] + for subcomponent in current_param_values: + # scikit-learn stores usually tuples in the form + # (name (str), subcomponent (mixed), argument + # (mixed)). OpenML replaces the subcomponent by an + # OpenMLFlow object. + if len(subcomponent) < 2 or len(subcomponent) > 3: + raise ValueError("Component reference should be size {2,3}. ") + + subcomponent_identifier = subcomponent[0] + subcomponent_flow = subcomponent[1] + if not isinstance(subcomponent_identifier, str): + raise TypeError( + "Subcomponent identifier should be of type string, " + f"but is {type(subcomponent_identifier)}", + ) + if not isinstance(subcomponent_flow, (openml.flows.OpenMLFlow, str)): + if ( + isinstance(subcomponent_flow, str) + and subcomponent_flow in SKLEARN_PIPELINE_STRING_COMPONENTS + ): + pass + else: + raise TypeError( + "Subcomponent flow should be of type flow, but is" + f" {type(subcomponent_flow)}", + ) + + current = { + "oml-python:serialized_object": COMPONENT_REFERENCE, + "value": { + "key": subcomponent_identifier, + "step_name": subcomponent_identifier, + }, + } + if len(subcomponent) == 3: + if not isinstance(subcomponent[2], list) and not isinstance( + subcomponent[2], + OrderedDict, + ): + raise TypeError( + "Subcomponent argument should be list or OrderedDict", + ) + current["value"]["argument_1"] = subcomponent[2] + parsed_values.append(current) + parsed_values = json.dumps(parsed_values) + else: + # vanilla parameter value + parsed_values = json.dumps(current_param_values) + + _current["oml:value"] = parsed_values + if _main_call: + _current["oml:component"] = main_id + else: + _current["oml:component"] = _flow_dict[_flow.name] + _params.append(_current) + + for _identifier in _flow.components: + subcomponent_model = component_model.get_params()[_identifier] + _params.extend( + extract_parameters( + _flow.components[_identifier], + _flow_dict, + subcomponent_model, + ), + ) + return _params + + flow_dict = get_flow_dict(flow) + model = model if model is not None else flow.model + return extract_parameters(flow, flow_dict, model, _main_call=True, main_id=flow.flow_id) + + def _openml_param_name_to_sklearn( + self, + openml_parameter: openml.setups.OpenMLParameter, + flow: OpenMLFlow, + ) -> str: + """ + Converts the name of an OpenMLParameter into the sklean name, given a flow. + + Parameters + ---------- + openml_parameter: OpenMLParameter + The parameter under consideration + + flow: OpenMLFlow + The flow that provides context. + + Returns + ------- + sklearn_parameter_name: str + The name the parameter will have once used in scikit-learn + """ + if not isinstance(openml_parameter, openml.setups.OpenMLParameter): + raise ValueError("openml_parameter should be an instance of OpenMLParameter") + if not isinstance(flow, OpenMLFlow): + raise ValueError("flow should be an instance of OpenMLFlow") + + flow_structure = flow.get_structure("name") + if openml_parameter.flow_name not in flow_structure: + raise ValueError("Obtained OpenMLParameter and OpenMLFlow do not correspond. ") + name = openml_parameter.flow_name # for PEP8 + return "__".join(flow_structure[name] + [openml_parameter.parameter_name]) + + ################################################################################################ + # Methods for hyperparameter optimization + + def _is_hpo_class(self, model: Any) -> bool: + """Check whether the model performs hyperparameter optimization. + + Used to check whether an optimization trace can be extracted from the model after + running it. + + Parameters + ---------- + model : Any + + Returns + ------- + bool + """ + return isinstance(model, sklearn.model_selection._search.BaseSearchCV) + + def instantiate_model_from_hpo_class( + self, + model: Any, + trace_iteration: OpenMLTraceIteration, + ) -> Any: + """Instantiate a ``base_estimator`` which can be searched over by the hyperparameter + optimization model. + + Parameters + ---------- + model : Any + A hyperparameter optimization model which defines the model to be instantiated. + trace_iteration : OpenMLTraceIteration + Describing the hyperparameter settings to instantiate. + + Returns + ------- + Any + """ + if not self._is_hpo_class(model): + raise AssertionError( + f"Flow model {model} is not an instance of" + " sklearn.model_selection._search.BaseSearchCV", + ) + base_estimator = model.estimator + base_estimator.set_params(**trace_iteration.get_parameters()) + return base_estimator + + def _extract_trace_data(self, model, rep_no, fold_no): + """Extracts data from a machine learning model's cross-validation results + and creates an ARFF (Attribute-Relation File Format) trace. + + Parameters + ---------- + model : Any + A fitted hyperparameter optimization model. + rep_no : int + The repetition number. + fold_no : int + The fold number. + + Returns + ------- + A list of ARFF tracecontent. + """ + arff_tracecontent = [] + for itt_no in range(len(model.cv_results_["mean_test_score"])): + # we use the string values for True and False, as it is defined in + # this way by the OpenML server + selected = "false" + if itt_no == model.best_index_: + selected = "true" + test_score = model.cv_results_["mean_test_score"][itt_no] + arff_line = [rep_no, fold_no, itt_no, test_score, selected] + for key in model.cv_results_: + if key.startswith("param_"): + value = model.cv_results_[key][itt_no] + # Built-in serializer does not convert all numpy types, + # these methods convert them to built-in types instead. + if isinstance(value, np.generic): + # For scalars it actually returns scalars, not a list + value = value.tolist() + serialized_value = json.dumps(value) if value is not np.ma.masked else np.nan + arff_line.append(serialized_value) + arff_tracecontent.append(arff_line) + return arff_tracecontent + + def _obtain_arff_trace( + self, + model: Any, + trace_content: list, + ) -> OpenMLRunTrace: + """Create arff trace object from a fitted model and the trace content obtained by + repeatedly calling ``run_model_on_task``. + + Parameters + ---------- + model : Any + A fitted hyperparameter optimization model. + + trace_content : List[List] + Trace content obtained by ``openml.runs.run_flow_on_task``. + + Returns + ------- + OpenMLRunTrace + """ + if not self._is_hpo_class(model): + raise AssertionError( + f"Flow model {model} is not an instance of " + "sklearn.model_selection._search.BaseSearchCV", + ) + if not hasattr(model, "cv_results_"): + raise ValueError("model should contain `cv_results_`") + + # attributes that will be in trace arff, regardless of the model + trace_attributes = [ + ("repeat", "NUMERIC"), + ("fold", "NUMERIC"), + ("iteration", "NUMERIC"), + ("evaluation", "NUMERIC"), + ("selected", ["true", "false"]), + ] + + # model dependent attributes for trace arff + for key in model.cv_results_: + if key.startswith("param_"): + # supported types should include all types, including bool, + # int float + supported_basic_types = (bool, int, float, str) + for param_value in model.cv_results_[key]: + if isinstance(param_value, np.generic): + param_value = param_value.tolist() # noqa: PLW2901 + if ( + isinstance(param_value, supported_basic_types) + or param_value is None + or param_value is np.ma.masked + ): + # basic string values + type = "STRING" # noqa: A001 + elif isinstance(param_value, (list, tuple)) and all( + isinstance(i, int) for i in param_value + ): + # list of integers (usually for selecting features) + # hyperparameter layer_sizes of MLPClassifier + type = "STRING" # noqa: A001 + else: + raise TypeError(f"Unsupported param type in param grid: {key}") + + # renamed the attribute param to parameter, as this is a required + # OpenML convention - this also guards against name collisions + # with the required trace attributes + attribute = (PREFIX + key[6:], type) # type: ignore + trace_attributes.append(attribute) + + return OpenMLRunTrace.generate( + trace_attributes, + trace_content, + ) diff --git a/openml/extensions/sklearn/serializer.py b/openml/extensions/sklearn/serializer.py new file mode 100644 index 000000000..26413afec --- /dev/null +++ b/openml/extensions/sklearn/serializer.py @@ -0,0 +1,1508 @@ +# License: BSD 3-Clause + +"""Serializer for the Scikit-learn estimators.""" + +from __future__ import annotations + +import contextlib +import copy +import importlib +import inspect +import json +import logging +import re +import sys +import traceback +import warnings +from collections import OrderedDict +from json.decoder import JSONDecodeError +from re import IGNORECASE +from typing import Any, Callable, Sized + +import numpy as np +import scipy.stats +import sklearn.base +import sklearn.model_selection +import sklearn.pipeline +from packaging.version import Version + +import openml +from openml.exceptions import PyOpenMLError +from openml.extensions.base import ModelSerializer +from openml.flows import OpenMLFlow + +logger = logging.getLogger(__name__) + + +DEPENDENCIES_PATTERN = re.compile( + r"^(?P[\w\-]+)((?P==|>=|>)" + r"(?P(\d+\.)?(\d+\.)?(\d+)?(dev)?[0-9]*))?$", +) + +# NOTE(eddiebergman): This was imported before but became deprecated, +# as a result I just enumerated them manually by copy-ing and pasting, +# recommended solution in Numpy 2.0 guide was to explicitly list them. +SIMPLE_NUMPY_TYPES = [ + np.int8, + np.int16, + np.int32, + np.int64, + np.longlong, + np.uint8, + np.uint16, + np.uint32, + np.uint64, + np.ulonglong, + np.float16, + np.float32, + np.float64, + np.longdouble, + np.complex64, + np.complex128, + np.clongdouble, +] +SIMPLE_TYPES = (bool, int, float, str, *SIMPLE_NUMPY_TYPES) + + +SKLEARN_PIPELINE_STRING_COMPONENTS = ("drop", "passthrough") +COMPONENT_REFERENCE = "component_reference" +COMPOSITION_STEP_CONSTANT = "composition_step_constant" + + +class SklearnSerializer(ModelSerializer): + """Serializer for Scikit-learn estimators.""" + + @classmethod + def can_handle_flow(cls, flow: OpenMLFlow) -> bool: + """Check whether a given describes a scikit-learn estimator. + + This is done by parsing the ``external_version`` field. + + Parameters + ---------- + flow : OpenMLFlow + + Returns + ------- + bool + """ + return cls._is_sklearn_flow(flow) + + @classmethod + def can_handle_model(cls, model: Any) -> bool: + """Check whether a model is an instance of ``sklearn.base.BaseEstimator``. + + Parameters + ---------- + model : Any + + Returns + ------- + bool + """ + return isinstance(model, sklearn.base.BaseEstimator) + + def flow_to_model( + self, + flow: OpenMLFlow, + initialize_with_defaults: bool = False, # noqa: FBT001, FBT002 + strict_version: bool = True, # noqa: FBT001, FBT002 + ) -> Any: + """Initializes a sklearn model based on a flow. + + Parameters + ---------- + flow : mixed + the object to deserialize (can be flow object, or any serialized + parameter value that is accepted by) + + initialize_with_defaults : bool, optional (default=False) + If this flag is set, the hyperparameter values of flows will be + ignored and a flow with its defaults is returned. + + strict_version : bool, default=True + Whether to fail if version requirements are not fulfilled. + + Returns + ------- + mixed + """ + return self._deserialize_sklearn( + flow, + initialize_with_defaults=initialize_with_defaults, + strict_version=strict_version, + ) + + def model_to_flow(self, model: Any) -> OpenMLFlow: + """Transform a scikit-learn model to a flow for uploading it to OpenML. + + Parameters + ---------- + model : Any + + Returns + ------- + OpenMLFlow + """ + # Necessary to make pypy not complain about all the different possible return types + return self._serialize_sklearn(model) + + @classmethod + def trim_flow_name( # noqa: C901 + cls, + long_name: str, + extra_trim_length: int = 100, + _outer: bool = True, # noqa: FBT001, FBT002 + ) -> str: + """Shorten generated sklearn flow name to at most ``max_length`` characters. + + Flows are assumed to have the following naming structure: + ``(model_selection)? (pipeline)? (steps)+`` + and will be shortened to: + ``sklearn.(selection.)?(pipeline.)?(steps)+`` + e.g. (white spaces and newlines added for readability) + + .. code :: + + sklearn.pipeline.Pipeline( + columntransformer=sklearn.compose._column_transformer.ColumnTransformer( + numeric=sklearn.pipeline.Pipeline( + imputer=sklearn.preprocessing.imputation.Imputer, + standardscaler=sklearn.preprocessing.data.StandardScaler), + nominal=sklearn.pipeline.Pipeline( + simpleimputer=sklearn.impute.SimpleImputer, + onehotencoder=sklearn.preprocessing._encoders.OneHotEncoder)), + variancethreshold=sklearn.feature_selection.variance_threshold.VarianceThreshold, + svc=sklearn.svm.classes.SVC) + + -> + ``sklearn.Pipeline(ColumnTransformer,VarianceThreshold,SVC)`` + + Parameters + ---------- + long_name : str + The full flow name generated by the scikit-learn extension. + extra_trim_length: int (default=100) + If the trimmed name would exceed `extra_trim_length` characters, additional trimming + of the short name is performed. This reduces the produced short name length. + There is no guarantee the end result will not exceed `extra_trim_length`. + _outer : bool (default=True) + For internal use only. Specifies if the function is called recursively. + + Returns + ------- + str + + """ + + def remove_all_in_parentheses(string: str) -> str: + string, removals = re.subn(r"\([^()]*\)", "", string) + while removals > 0: + string, removals = re.subn(r"\([^()]*\)", "", string) + return string + + # Generally, we want to trim all hyperparameters, the exception to that is for model + # selection, as the `estimator` hyperparameter is very indicative of what is in the flow. + # So we first trim name of the `estimator` specified in mode selection. For reference, in + # the example below, we want to trim `sklearn.tree.tree.DecisionTreeClassifier`, and + # keep it in the final trimmed flow name: + # sklearn.pipeline.Pipeline(Imputer=sklearn.preprocessing.imputation.Imputer, + # VarianceThreshold=sklearn.feature_selection.variance_threshold.VarianceThreshold, # noqa: ERA001, E501 + # Estimator=sklearn.model_selection._search.RandomizedSearchCV(estimator= + # sklearn.tree.tree.DecisionTreeClassifier)) + if "sklearn.model_selection" in long_name: + start_index = long_name.index("sklearn.model_selection") + estimator_start = ( + start_index + long_name[start_index:].index("estimator=") + len("estimator=") + ) + + model_select_boilerplate = long_name[start_index:estimator_start] + # above is .g. "sklearn.model_selection._search.RandomizedSearchCV(estimator=" + model_selection_class = model_select_boilerplate.split("(")[0].split(".")[-1] + + # Now we want to also find and parse the `estimator`, for this we find the closing + # parenthesis to the model selection technique: + closing_parenthesis_expected = 1 + for char in long_name[estimator_start:]: + if char == "(": + closing_parenthesis_expected += 1 + if char == ")": + closing_parenthesis_expected -= 1 + if closing_parenthesis_expected == 0: + break + + _end: int = estimator_start + len(long_name[estimator_start:]) - 1 + model_select_pipeline = long_name[estimator_start:_end] + + trimmed_pipeline = cls.trim_flow_name(model_select_pipeline, _outer=False) + _, trimmed_pipeline = trimmed_pipeline.split(".", maxsplit=1) # trim module prefix + model_select_short = f"sklearn.{model_selection_class}[{trimmed_pipeline}]" + name = long_name[:start_index] + model_select_short + long_name[_end + 1 :] + else: + name = long_name + + module_name = long_name.split(".")[0] + short_name = module_name + ".{}" + + if name.startswith("sklearn.pipeline"): + full_pipeline_class, pipeline = name[:-1].split("(", maxsplit=1) + pipeline_class = full_pipeline_class.split(".")[-1] + # We don't want nested pipelines in the short name, so we trim all complicated + # subcomponents, i.e. those with parentheses: + pipeline = remove_all_in_parentheses(pipeline) + + # then the pipeline steps are formatted e.g.: + # step1name=sklearn.submodule.ClassName,step2name... + components = [component.split(".")[-1] for component in pipeline.split(",")] + pipeline = f"{pipeline_class}({','.join(components)})" + if len(short_name.format(pipeline)) > extra_trim_length: + pipeline = f"{pipeline_class}(...,{components[-1]})" + else: + # Just a simple component: e.g. sklearn.tree.DecisionTreeClassifier + pipeline = remove_all_in_parentheses(name).split(".")[-1] + + if not _outer: + # Anything from parenthesis in inner calls should not be culled, so we use brackets + pipeline = pipeline.replace("(", "[").replace(")", "]") + else: + # Square brackets may be introduced with nested model_selection + pipeline = pipeline.replace("[", "(").replace("]", ")") + + return short_name.format(pipeline) + + @classmethod + def _min_dependency_str(cls, sklearn_version: str) -> str: + """Returns a string containing the minimum dependencies for the sklearn version passed. + + Parameters + ---------- + sklearn_version : str + A version string of the xx.xx.xx + + Returns + ------- + str + """ + # This explicit check is necessary to support existing entities on the OpenML servers + # that used the fixed dependency string (in the else block) + if Version(openml.__version__) > Version("0.11"): + # OpenML v0.11 onwards supports sklearn>=0.24 + # assumption: 0.24 onwards sklearn should contain a _min_dependencies.py file with + # variables declared for extracting minimum dependency for that version + if Version(sklearn_version) >= Version("0.24"): + from sklearn import _min_dependencies as _mindep + + dependency_list = { + "numpy": f"{_mindep.NUMPY_MIN_VERSION}", + "scipy": f"{_mindep.SCIPY_MIN_VERSION}", + "joblib": f"{_mindep.JOBLIB_MIN_VERSION}", + "threadpoolctl": f"{_mindep.THREADPOOLCTL_MIN_VERSION}", + } + elif Version(sklearn_version) >= Version("0.23"): + dependency_list = { + "numpy": "1.13.3", + "scipy": "0.19.1", + "joblib": "0.11", + "threadpoolctl": "2.0.0", + } + if Version(sklearn_version).micro == 0: + dependency_list.pop("threadpoolctl") + elif Version(sklearn_version) >= Version("0.21"): + dependency_list = {"numpy": "1.11.0", "scipy": "0.17.0", "joblib": "0.11"} + elif Version(sklearn_version) >= Version("0.19"): + dependency_list = {"numpy": "1.8.2", "scipy": "0.13.3"} + else: + dependency_list = {"numpy": "1.6.1", "scipy": "0.9"} + else: + # this is INCORRECT for sklearn versions >= 0.19 and < 0.24 + # given that OpenML has existing flows uploaded with such dependency information, + # we change no behaviour for older sklearn version, however from 0.24 onwards + # the dependency list will be accurately updated for any flow uploaded to OpenML + dependency_list = {"numpy": "1.6.1", "scipy": "0.9"} + + sklearn_dep = f"sklearn=={sklearn_version}" + dep_str = "\n".join([f"{k}>={v}" for k, v in dependency_list.items()]) + return "\n".join([sklearn_dep, dep_str]) + + def _deserialize_sklearn( # noqa: PLR0915, C901, PLR0912 + self, + o: Any, + components: dict | None = None, + initialize_with_defaults: bool = False, # noqa: FBT001, FBT002 + recursion_depth: int = 0, + strict_version: bool = True, # noqa: FBT002, FBT001 + ) -> Any: + """Recursive function to deserialize a scikit-learn flow. + + This function inspects an object to deserialize and decides how to do so. This function + delegates all work to the respective functions to deserialize special data structures etc. + This function works on everything that has been serialized to OpenML: OpenMLFlow, + components (which are flows themselves), functions, hyperparameter distributions (for + random search) and the actual hyperparameter values themselves. + + Parameters + ---------- + o : mixed + the object to deserialize (can be flow object, or any serialized + parameter value that is accepted by) + + components : Optional[dict] + Components of the current flow being de-serialized. These will not be used when + de-serializing the actual flow, but when de-serializing a component reference. + + initialize_with_defaults : bool, optional (default=False) + If this flag is set, the hyperparameter values of flows will be + ignored and a flow with its defaults is returned. + + recursion_depth : int + The depth at which this flow is called, mostly for debugging + purposes + + strict_version : bool, default=True + Whether to fail if version requirements are not fulfilled. + + Returns + ------- + mixed + """ + logger.info( + "-{} flow_to_sklearn START o={}, components={}, init_defaults={}".format( + "-" * recursion_depth, o, components, initialize_with_defaults + ), + ) + depth_pp = recursion_depth + 1 # shortcut var, depth plus plus + + # First, we need to check whether the presented object is a json string. + # JSON strings are used to encoder parameter values. By passing around + # json strings for parameters, we make sure that we can flow_to_sklearn + # the parameter values to the correct type. + + if isinstance(o, str): + with contextlib.suppress(JSONDecodeError): + o = json.loads(o) + + if isinstance(o, dict): + # Check if the dict encodes a 'special' object, which could not + # easily converted into a string, but rather the information to + # re-create the object were stored in a dictionary. + if "oml-python:serialized_object" in o: + serialized_type = o["oml-python:serialized_object"] + value = o["value"] + if serialized_type == "type": + rval = self._deserialize_type(value) + elif serialized_type == "rv_frozen": + rval = self._deserialize_rv_frozen(value) + elif serialized_type == "function": + rval = self._deserialize_function(value) + elif serialized_type in (COMPOSITION_STEP_CONSTANT, COMPONENT_REFERENCE): + if serialized_type == COMPOSITION_STEP_CONSTANT: + pass + elif serialized_type == COMPONENT_REFERENCE: + value = self._deserialize_sklearn( + value, + recursion_depth=depth_pp, + strict_version=strict_version, + ) + else: + raise NotImplementedError(serialized_type) + assert components is not None # Necessary for mypy + step_name = value["step_name"] + key = value["key"] + component = self._deserialize_sklearn( + components[key], + initialize_with_defaults=initialize_with_defaults, + recursion_depth=depth_pp, + strict_version=strict_version, + ) + # The component is now added to where it should be used + # later. It should not be passed to the constructor of the + # main flow object. + del components[key] + if step_name is None: + rval = component + elif "argument_1" not in value: + rval = (step_name, component) + else: + rval = (step_name, component, value["argument_1"]) + elif serialized_type == "cv_object": + rval = self._deserialize_cross_validator( + value, + recursion_depth=recursion_depth, + strict_version=strict_version, + ) + else: + raise ValueError(f"Cannot flow_to_sklearn {serialized_type}") + + else: + rval = OrderedDict( + ( + self._deserialize_sklearn( + o=key, + components=components, + initialize_with_defaults=initialize_with_defaults, + recursion_depth=depth_pp, + strict_version=strict_version, + ), + self._deserialize_sklearn( + o=value, + components=components, + initialize_with_defaults=initialize_with_defaults, + recursion_depth=depth_pp, + strict_version=strict_version, + ), + ) + for key, value in sorted(o.items()) + ) + elif isinstance(o, (list, tuple)): + rval = [ + self._deserialize_sklearn( + o=element, + components=components, + initialize_with_defaults=initialize_with_defaults, + recursion_depth=depth_pp, + strict_version=strict_version, + ) + for element in o + ] + if isinstance(o, tuple): + rval = tuple(rval) + elif isinstance(o, (bool, int, float, str)) or o is None: + rval = o + elif isinstance(o, OpenMLFlow): + if not self._is_sklearn_flow(o): + raise ValueError("Only sklearn flows can be reinstantiated") + rval = self._deserialize_model( + flow=o, + keep_defaults=initialize_with_defaults, + recursion_depth=recursion_depth, + strict_version=strict_version, + ) + else: + raise TypeError(o) + logger.info(f"-{'-' * recursion_depth} flow_to_sklearn END o={o}, rval={rval}") + return rval + + def _serialize_sklearn(self, o: Any, parent_model: Any | None = None) -> Any: # noqa: PLR0912, C901 + rval = None # type: Any + + # TODO: assert that only on first recursion lvl `parent_model` can be None + if self.is_estimator(o): + # is the main model or a submodel + rval = self._serialize_model(o) + elif ( + isinstance(o, (list, tuple)) + and len(o) == 2 + and o[1] in SKLEARN_PIPELINE_STRING_COMPONENTS + and isinstance(parent_model, sklearn.pipeline._BaseComposition) + ): + rval = o + elif isinstance(o, (list, tuple)): + # TODO: explain what type of parameter is here + rval = [self._serialize_sklearn(element, parent_model) for element in o] + if isinstance(o, tuple): + rval = tuple(rval) + elif isinstance(o, SIMPLE_TYPES) or o is None: + if isinstance(o, tuple(SIMPLE_NUMPY_TYPES)): + o = o.item() # type: ignore + # base parameter values + rval = o + elif isinstance(o, dict): + # TODO: explain what type of parameter is here + if not isinstance(o, OrderedDict): + o = OrderedDict(sorted(o.items())) + + rval = OrderedDict() + for key, value in o.items(): + if not isinstance(key, str): + raise TypeError( + "Can only use string as keys, you passed " + f"type {type(key)} for value {key!s}.", + ) + _key = self._serialize_sklearn(key, parent_model) + rval[_key] = self._serialize_sklearn(value, parent_model) + elif isinstance(o, type): + # TODO: explain what type of parameter is here + rval = self._serialize_type(o) + elif isinstance(o, scipy.stats.distributions.rv_frozen): + rval = self._serialize_rv_frozen(o) + # This only works for user-defined functions (and not even partial). + # I think this is exactly what we want here as there shouldn't be any + # built-in or functool.partials in a pipeline + elif inspect.isfunction(o): + # TODO: explain what type of parameter is here + rval = self._serialize_function(o) + elif self._is_cross_validator(o): + # TODO: explain what type of parameter is here + rval = self._serialize_cross_validator(o) + else: + raise TypeError(o, type(o)) + + return rval + + def is_estimator(self, model: Any) -> bool: + """Check whether the given model is a scikit-learn estimator. + + This function is only required for backwards compatibility and will be removed in the + near future. + + Parameters + ---------- + model : Any + + Returns + ------- + bool + """ + o = model + return hasattr(o, "fit") and hasattr(o, "get_params") and hasattr(o, "set_params") + + def get_version_information(self) -> list[str]: + """List versions of libraries required by the flow. + + Libraries listed are ``Python``, ``scikit-learn``, ``numpy`` and ``scipy``. + + Returns + ------- + List + """ + # This can possibly be done by a package such as pyxb, but I could not get + # it to work properly. + import numpy + import scipy + import sklearn + + major, minor, micro, _, _ = sys.version_info + python_version = f"Python_{'.'.join([str(major), str(minor), str(micro)])}." + sklearn_version = f"Sklearn_{sklearn.__version__}." + numpy_version = f"NumPy_{numpy.__version__}." # type: ignore + scipy_version = f"SciPy_{scipy.__version__}." + + return [python_version, sklearn_version, numpy_version, scipy_version] + + def create_setup_string(self, model: Any) -> str: # noqa: ARG002 + """Create a string which can be used to reinstantiate the given model. + + Parameters + ---------- + model : Any + + Returns + ------- + str + """ + return " ".join(self.get_version_information()) + + def _is_cross_validator(self, o: Any) -> bool: + return isinstance(o, sklearn.model_selection.BaseCrossValidator) + + @classmethod + def _is_sklearn_flow(cls, flow: OpenMLFlow) -> bool: + sklearn_dependency = isinstance(flow.dependencies, str) and "sklearn" in flow.dependencies + sklearn_as_external = isinstance(flow.external_version, str) and ( + flow.external_version.startswith("sklearn==") or ",sklearn==" in flow.external_version + ) + return sklearn_dependency or sklearn_as_external + + def _get_sklearn_description(self, model: Any, char_lim: int = 1024) -> str: + r"""Fetches the sklearn function docstring for the flow description + + Retrieves the sklearn docstring available and does the following: + * If length of docstring <= char_lim, then returns the complete docstring + * Else, trims the docstring till it encounters a 'Read more in the :ref:' + * Or till it encounters a 'Parameters\n----------\n' + The final string returned is at most of length char_lim with leading and + trailing whitespaces removed. + + Parameters + ---------- + model : sklearn model + char_lim : int + Specifying the max length of the returned string. + OpenML servers have a constraint of 1024 characters for the 'description' field. + + Returns + ------- + str + """ + + def match_format(s): + return f"{s}\n{len(s) * '-'}\n" + + s = inspect.getdoc(model) + if s is None: + return "" + try: + # trim till 'Read more' + pattern = "Read more in the :ref:" + index = s.index(pattern) + s = s[:index] + # trimming docstring to be within char_lim + if len(s) > char_lim: + s = f"{s[: char_lim - 3]}..." + return s.strip() + except ValueError: + logger.warning( + "'Read more' not found in descriptions. " + "Trying to trim till 'Parameters' if available in docstring.", + ) + try: + # if 'Read more' doesn't exist, trim till 'Parameters' + pattern = "Parameters" + index = s.index(match_format(pattern)) + except ValueError: + # returning full docstring + logger.warning("'Parameters' not found in docstring. Omitting docstring trimming.") + index = len(s) + s = s[:index] + # trimming docstring to be within char_lim + if len(s) > char_lim: + s = f"{s[: char_lim - 3]}..." + return s.strip() + + def _extract_sklearn_parameter_docstring(self, model) -> None | str: + """Extracts the part of sklearn docstring containing parameter information + + Fetches the entire docstring and trims just the Parameter section. + The assumption is that 'Parameters' is the first section in sklearn docstrings, + followed by other sections titled 'Attributes', 'See also', 'Note', 'References', + appearing in that order if defined. + Returns a None if no section with 'Parameters' can be found in the docstring. + + Parameters + ---------- + model : sklearn model + + Returns + ------- + str, or None + """ + + def match_format(s): + return f"{s}\n{len(s) * '-'}\n" + + s = inspect.getdoc(model) + if s is None: + return None + try: + index1 = s.index(match_format("Parameters")) + except ValueError as e: + # when sklearn docstring has no 'Parameters' section + logger.warning(f"{match_format('Parameters')} {e}") + return None + + headings = ["Attributes", "Notes", "See also", "Note", "References"] + for h in headings: + try: + # to find end of Parameters section + index2 = s.index(match_format(h)) + break + except ValueError: + logger.warning(f"{h} not available in docstring") + continue + else: + # in the case only 'Parameters' exist, trim till end of docstring + index2 = len(s) + s = s[index1:index2] + return s.strip() + + def _extract_sklearn_param_info(self, model, char_lim=1024) -> None | dict: + """Parses parameter type and description from sklearn dosctring + + Parameters + ---------- + model : sklearn model + char_lim : int + Specifying the max length of the returned string. + OpenML servers have a constraint of 1024 characters string fields. + + Returns + ------- + Dict, or None + """ + docstring = self._extract_sklearn_parameter_docstring(model) + if docstring is None: + # when sklearn docstring has no 'Parameters' section + return None + + n = re.compile("[.]*\n", flags=IGNORECASE) + lines = n.split(docstring) + p = re.compile("[a-z0-9_ ]+ : [a-z0-9_']+[a-z0-9_ ]*", flags=IGNORECASE) + # The above regular expression is designed to detect sklearn parameter names and type + # in the format of [variable_name][space]:[space][type] + # The expectation is that the parameter description for this detected parameter will + # be all the lines in the docstring till the regex finds another parameter match + + # collecting parameters and their descriptions + description = [] # type: List + for s in lines: + param = p.findall(s) + if param != []: + # a parameter definition is found by regex + # creating placeholder when parameter found which will be a list of strings + # string descriptions will be appended in subsequent iterations + # till another parameter is found and a new placeholder is created + placeholder = [""] # type: List[str] + description.append(placeholder) + elif len(description) > 0: # description=[] means no parameters found yet + # appending strings to the placeholder created when parameter found + description[-1].append(s) + for i in range(len(description)): + # concatenating parameter description strings + description[i] = "\n".join(description[i]).strip() + # limiting all parameter descriptions to accepted OpenML string length + if len(description[i]) > char_lim: + description[i] = f"{description[i][: char_lim - 3]}..." + + # collecting parameters and their types + parameter_docs = OrderedDict() + matches = p.findall(docstring) + for i, param in enumerate(matches): + key, value = str(param).split(":") + parameter_docs[key.strip()] = [value.strip(), description[i]] + + # to avoid KeyError for missing parameters + param_list_true = list(model.get_params().keys()) + param_list_found = list(parameter_docs.keys()) + for param in list(set(param_list_true) - set(param_list_found)): + parameter_docs[param] = [None, None] + + return parameter_docs + + def _serialize_model(self, model: Any) -> OpenMLFlow: + """Create an OpenMLFlow. + + Calls `sklearn_to_flow` recursively to properly serialize the + parameters to strings and the components (other models) to OpenMLFlows. + + Parameters + ---------- + model : sklearn estimator + + Returns + ------- + OpenMLFlow + + """ + # Get all necessary information about the model objects itself + ( + parameters, + parameters_meta_info, + subcomponents, + subcomponents_explicit, + ) = self._extract_information_from_model(model) + + # Check that a component does not occur multiple times in a flow as this + # is not supported by OpenML + self._check_multiple_occurence_of_component_in_flow(model, subcomponents) + + # Create a flow name, which contains all components in brackets, e.g.: + # RandomizedSearchCV(Pipeline(StandardScaler,AdaBoostClassifier(DecisionTreeClassifier)), + # StandardScaler,AdaBoostClassifier(DecisionTreeClassifier)) + class_name = model.__module__ + "." + model.__class__.__name__ + + # will be part of the name (in brackets) + sub_components_names = "" + for key in subcomponents: + name_thing = subcomponents[key] + if isinstance(name_thing, OpenMLFlow): + name = name_thing.name + elif ( + isinstance(name_thing, str) + and subcomponents[key] in SKLEARN_PIPELINE_STRING_COMPONENTS + ): + name = name_thing + else: + raise TypeError(type(subcomponents[key])) + + if key in subcomponents_explicit: + sub_components_names += "," + key + "=" + name + else: + sub_components_names += "," + name + + # slice operation on string in order to get rid of leading comma + name = f"{class_name}({sub_components_names[1:]})" if sub_components_names else class_name + short_name = self.trim_flow_name(name) + + # Get the external versions of all sub-components + external_version = self._get_external_version_string(model, subcomponents) + dependencies = self._get_dependencies() + tags = self._get_tags() + + sklearn_description = self._get_sklearn_description(model) + return OpenMLFlow( + name=name, + class_name=class_name, + custom_name=short_name, + description=sklearn_description, + model=model, + components=subcomponents, + parameters=parameters, + parameters_meta_info=parameters_meta_info, + external_version=external_version, + tags=tags, + extension=self, + language="English", + dependencies=dependencies, + ) + + def _get_dependencies(self) -> str: + return self._min_dependency_str(sklearn.__version__) # type: ignore + + def _get_tags(self) -> list[str]: + sklearn_version = self._format_external_version("sklearn", sklearn.__version__) # type: ignore + sklearn_version_formatted = sklearn_version.replace("==", "_") + return [ + "openml-python", + "sklearn", + "scikit-learn", + "python", + sklearn_version_formatted, + # TODO: add more tags based on the scikit-learn + # module a flow is in? For example automatically + # annotate a class of sklearn.svm.SVC() with the + # tag svm? + ] + + def _get_external_version_string( + self, + model: Any, + sub_components: dict[str, OpenMLFlow], + ) -> str: + # Create external version string for a flow, given the model and the + # already parsed dictionary of sub_components. Retrieves the external + # version of all subcomponents, which themselves already contain all + # requirements for their subcomponents. The external version string is a + # sorted concatenation of all modules which are present in this run. + + external_versions = set() + + # The model is None if the flow is a placeholder flow such as 'passthrough' or 'drop' + if model is not None: + model_package_name = model.__module__.split(".")[0] + module = importlib.import_module(model_package_name) + model_package_version_number = module.__version__ # type: ignore + external_version = self._format_external_version( + model_package_name, + model_package_version_number, + ) + external_versions.add(external_version) + + openml_version = self._format_external_version("openml", openml.__version__) + sklearn_version = self._format_external_version("sklearn", sklearn.__version__) # type: ignore + external_versions.add(openml_version) + external_versions.add(sklearn_version) + for visitee in sub_components.values(): + if isinstance(visitee, str) and visitee in SKLEARN_PIPELINE_STRING_COMPONENTS: + continue + for external_version in visitee.external_version.split(","): + external_versions.add(external_version) + return ",".join(sorted(external_versions)) + + def _check_multiple_occurence_of_component_in_flow( + self, + model: Any, + sub_components: dict[str, OpenMLFlow], + ) -> None: + to_visit_stack: list[OpenMLFlow] = [] + to_visit_stack.extend(sub_components.values()) + known_sub_components: set[str] = set() + + while len(to_visit_stack) > 0: + visitee = to_visit_stack.pop() + if isinstance(visitee, str) and visitee in SKLEARN_PIPELINE_STRING_COMPONENTS: + known_sub_components.add(visitee) + elif visitee.name in known_sub_components: + raise ValueError( + f"Found a second occurence of component {visitee.name} when " + f"trying to serialize {model}.", + ) + else: + known_sub_components.add(visitee.name) + to_visit_stack.extend(visitee.components.values()) + + def _extract_information_from_model( # noqa: PLR0915, C901, PLR0912 + self, + model: Any, + ) -> tuple[ + OrderedDict[str, str | None], + OrderedDict[str, dict | None], + OrderedDict[str, OpenMLFlow], + set, + ]: + # This function contains four "global" states and is quite long and + # complicated. If it gets to complicated to ensure it's correctness, + # it would be best to make it a class with the four "global" states being + # the class attributes and the if/elif/else in the for-loop calls to + # separate class methods + + # stores all entities that should become subcomponents + sub_components = OrderedDict() # type: OrderedDict[str, OpenMLFlow] + # stores the keys of all subcomponents that should become + sub_components_explicit = set() + parameters: OrderedDict[str, str | None] = OrderedDict() + parameters_meta_info: OrderedDict[str, dict | None] = OrderedDict() + parameters_docs = self._extract_sklearn_param_info(model) + + model_parameters = model.get_params(deep=False) + for k, v in sorted(model_parameters.items(), key=lambda t: t[0]): + rval = self._serialize_sklearn(v, model) + + def flatten_all(list_): + """Flattens arbitrary depth lists of lists (e.g. [[1,2],[3,[1]]] -> [1,2,3,1]).""" + for el in list_: + if isinstance(el, (list, tuple)) and len(el) > 0: + yield from flatten_all(el) + else: + yield el + + # In case rval is a list of lists (or tuples), we need to identify two situations: + # - sklearn pipeline steps, feature union or base classifiers in voting classifier. + # They look like e.g. [("imputer", Imputer()), ("classifier", SVC())] + # - a list of lists with simple types (e.g. int or str), such as for an OrdinalEncoder + # where all possible values for each feature are described: [[0,1,2], [1,2,5]] + is_non_empty_list_of_lists_with_same_type = ( + isinstance(rval, (list, tuple)) + and len(rval) > 0 + and isinstance(rval[0], (list, tuple)) + and all(isinstance(rval_i, type(rval[0])) for rval_i in rval) + ) + + # Check that all list elements are of simple types. + nested_list_of_simple_types = ( + is_non_empty_list_of_lists_with_same_type + and all(isinstance(el, SIMPLE_TYPES) for el in flatten_all(rval)) + and all( + len(rv) in (2, 3) and rv[1] not in SKLEARN_PIPELINE_STRING_COMPONENTS + for rv in rval + ) + ) + + if is_non_empty_list_of_lists_with_same_type and not nested_list_of_simple_types: + # If a list of lists is identified that include 'non-simple' types (e.g. objects), + # we assume they are steps in a pipeline, feature union, or base classifiers in + # a voting classifier. + parameter_value = [] # type: List + reserved_keywords = set(model.get_params(deep=False).keys()) + + for sub_component_tuple in rval: + identifier = sub_component_tuple[0] + sub_component = sub_component_tuple[1] + sub_component_type = type(sub_component_tuple) + if not 2 <= len(sub_component_tuple) <= 3: + # length 2 is for {VotingClassifier.estimators, + # Pipeline.steps, FeatureUnion.transformer_list} + # length 3 is for ColumnTransformer + raise ValueError( + f"Length of tuple of type {sub_component_type}" + " does not match assumptions" + ) + + if isinstance(sub_component, str): + if sub_component not in SKLEARN_PIPELINE_STRING_COMPONENTS: + msg = ( + "Second item of tuple does not match assumptions. " + "If string, can be only 'drop' or 'passthrough' but" + f"got {sub_component}" + ) + raise ValueError(msg) + elif sub_component is None: + msg = ( + "Cannot serialize objects of None type. Please use a valid " + "placeholder for None. Note that empty sklearn estimators can be " + "replaced with 'drop' or 'passthrough'." + ) + raise ValueError(msg) + elif not isinstance(sub_component, OpenMLFlow): + msg = ( + "Second item of tuple does not match assumptions. " + f"Expected OpenMLFlow, got {type(sub_component)}" + ) + raise TypeError(msg) + + if identifier in reserved_keywords: + parent_model = f"{model.__module__}.{model.__class__.__name__}" + msg = ( + "Found element shadowing official " + f"parameter for {parent_model}: {identifier}" + ) + raise PyOpenMLError(msg) + + # when deserializing the parameter + sub_components_explicit.add(identifier) + if isinstance(sub_component, str): + external_version = self._get_external_version_string(None, {}) + dependencies = self._get_dependencies() + tags = self._get_tags() + + sub_components[identifier] = OpenMLFlow( + name=sub_component, + description="Placeholder flow for scikit-learn's string pipeline " + "members", + components=OrderedDict(), + parameters=OrderedDict(), + parameters_meta_info=OrderedDict(), + external_version=external_version, + tags=tags, + language="English", + dependencies=dependencies, + model=None, + ) + component_reference: OrderedDict[str, str | dict] = OrderedDict() + component_reference["oml-python:serialized_object"] = ( + COMPOSITION_STEP_CONSTANT + ) + cr_value: dict[str, Any] = OrderedDict() + cr_value["key"] = identifier + cr_value["step_name"] = identifier + if len(sub_component_tuple) == 3: + cr_value["argument_1"] = sub_component_tuple[2] + component_reference["value"] = cr_value + else: + sub_components[identifier] = sub_component + component_reference = OrderedDict() + component_reference["oml-python:serialized_object"] = COMPONENT_REFERENCE + cr_value = OrderedDict() + cr_value["key"] = identifier + cr_value["step_name"] = identifier + if len(sub_component_tuple) == 3: + cr_value["argument_1"] = sub_component_tuple[2] + component_reference["value"] = cr_value + parameter_value.append(component_reference) + + # Here (and in the elif and else branch below) are the only + # places where we encode a value as json to make sure that all + # parameter values still have the same type after + # deserialization + if isinstance(rval, tuple): + parameter_json = json.dumps(tuple(parameter_value)) + else: + parameter_json = json.dumps(parameter_value) + parameters[k] = parameter_json + + elif isinstance(rval, OpenMLFlow): + # A subcomponent, for example the base model in + # AdaBoostClassifier + sub_components[k] = rval + sub_components_explicit.add(k) + component_reference = OrderedDict() + component_reference["oml-python:serialized_object"] = COMPONENT_REFERENCE + cr_value = OrderedDict() + cr_value["key"] = k + cr_value["step_name"] = None + component_reference["value"] = cr_value + cr = self._serialize_sklearn(component_reference, model) + parameters[k] = json.dumps(cr) + + elif not (hasattr(rval, "__len__") and len(rval) == 0): + rval = json.dumps(rval) + parameters[k] = rval + # a regular hyperparameter + else: + parameters[k] = None + + if parameters_docs is not None: + data_type, description = parameters_docs[k] + parameters_meta_info[k] = OrderedDict( + (("description", description), ("data_type", data_type)), + ) + else: + parameters_meta_info[k] = OrderedDict((("description", None), ("data_type", None))) + + return parameters, parameters_meta_info, sub_components, sub_components_explicit + + def _get_fn_arguments_with_defaults(self, fn_name: Callable) -> tuple[dict, set]: + """ + Returns + ------- + i) a dict with all parameter names that have a default value, and + ii) a set with all parameter names that do not have a default + + Parameters + ---------- + fn_name : callable + The function of which we want to obtain the defaults + + Returns + ------- + params_with_defaults: dict + a dict mapping parameter name to the default value + params_without_defaults: set + a set with all parameters that do not have a default value + """ + # parameters with defaults are optional, all others are required. + parameters = inspect.signature(fn_name).parameters + required_params = set() + optional_params = {} + for param in parameters: + parameter = parameters.get(param) + default_val = parameter.default # type: ignore + if default_val is inspect.Signature.empty: + required_params.add(param) + else: + optional_params[param] = default_val + return optional_params, required_params + + def _deserialize_model( # noqa: C901 + self, + flow: OpenMLFlow, + keep_defaults: bool, # noqa: FBT001 + recursion_depth: int, + strict_version: bool = True, # noqa: FBT002, FBT001 + ) -> Any: + logger.info(f"-{'-' * recursion_depth} deserialize {flow.name}") + model_name = flow.class_name + self._check_dependencies(flow.dependencies, strict_version=strict_version) + + parameters = flow.parameters + components = flow.components + parameter_dict: dict[str, Any] = OrderedDict() + + # Do a shallow copy of the components dictionary so we can remove the + # components from this copy once we added them into the pipeline. This + # allows us to not consider them any more when looping over the + # components, but keeping the dictionary of components untouched in the + # original components dictionary. + components_ = copy.copy(components) + + for name in parameters: + value = parameters.get(name) + logger.info(f"--{'-' * recursion_depth} flow_parameter={name}, value={value}") + rval = self._deserialize_sklearn( + value, + components=components_, + initialize_with_defaults=keep_defaults, + recursion_depth=recursion_depth + 1, + strict_version=strict_version, + ) + parameter_dict[name] = rval + + for name in components: + if name in parameter_dict: + continue + if name not in components_: + continue + value = components[name] + logger.info(f"--{'-' * recursion_depth} flow_component={name}, value={value}") + rval = self._deserialize_sklearn( + value, + recursion_depth=recursion_depth + 1, + strict_version=strict_version, + ) + parameter_dict[name] = rval + + if model_name is None and flow.name in SKLEARN_PIPELINE_STRING_COMPONENTS: + return flow.name + + assert model_name is not None + module_name = model_name.rsplit(".", 1) + model_class = getattr(importlib.import_module(module_name[0]), module_name[1]) + + if keep_defaults: + # obtain all params with a default + param_defaults, _ = self._get_fn_arguments_with_defaults(model_class.__init__) + + # delete the params that have a default from the dict, + # so they get initialized with their default value + # except [...] + for param in param_defaults: + # [...] the ones that also have a key in the components dict. + # As OpenML stores different flows for ensembles with different + # (base-)components, in OpenML terms, these are not considered + # hyperparameters but rather constants (i.e., changing them would + # result in a different flow) + if param not in components: + del parameter_dict[param] + + if not strict_version: + # Ignore incompatible parameters + allowed_parameter = list(inspect.signature(model_class.__init__).parameters) + for p in list(parameter_dict.keys()): + if p not in allowed_parameter: + warnings.warn( + f"While deserializing in a non-strict way, parameter {p} is not " + f"allowed for {model_class.__name__} likely due to a version mismatch. " + "We ignore the parameter.", + UserWarning, + stacklevel=2, + ) + del parameter_dict[p] + + return model_class(**parameter_dict) + + def _check_dependencies( + self, + dependencies: str, + strict_version: bool = True, # noqa: FBT001, FBT002 + ) -> None: + if not dependencies: + return + + dependencies_list = dependencies.split("\n") + for dependency_string in dependencies_list: + match = DEPENDENCIES_PATTERN.match(dependency_string) + if not match: + raise ValueError(f"Cannot parse dependency {dependency_string}") + + dependency_name = match.group("name") + operation = match.group("operation") + version = match.group("version") + + module = importlib.import_module(dependency_name) + required_version = Version(version) + installed_version = Version(module.__version__) # type: ignore + + if operation == "==": + check = required_version == installed_version + elif operation == ">": + check = installed_version > required_version + elif operation == ">=": + check = ( + installed_version > required_version or installed_version == required_version + ) + else: + raise NotImplementedError(f"operation '{operation}' is not supported") + message = ( + f"Trying to deserialize a model with dependency {dependency_string} not satisfied." + ) + if not check: + if strict_version: + raise ValueError(message) + + warnings.warn(message, category=UserWarning, stacklevel=2) + + def _serialize_type(self, o: Any) -> OrderedDict[str, str]: + mapping = { + float: "float", + np.float32: "np.float32", + np.float64: "np.float64", + int: "int", + np.int32: "np.int32", + np.int64: "np.int64", + } + if Version(np.__version__) < Version("1.24"): + mapping[float] = "np.float" + mapping[int] = "np.int" + + ret = OrderedDict() # type: 'OrderedDict[str, str]' + ret["oml-python:serialized_object"] = "type" + ret["value"] = mapping[o] + return ret + + def _deserialize_type(self, o: str) -> Any: + mapping = { + "float": float, + "np.float32": np.float32, + "np.float64": np.float64, + "int": int, + "np.int32": np.int32, + "np.int64": np.int64, + } + + # TODO(eddiebergman): Might be able to remove this + if Version(np.__version__) < Version("1.24"): + mapping["np.float"] = np.float # type: ignore # noqa: NPY001 + mapping["np.int"] = np.int # type: ignore # noqa: NPY001 + + return mapping[o] + + def _serialize_rv_frozen(self, o: Any) -> OrderedDict[str, str | dict]: + args = o.args + kwds = o.kwds + a = o.a + b = o.b + dist = o.dist.__class__.__module__ + "." + o.dist.__class__.__name__ + ret: OrderedDict[str, str | dict] = OrderedDict() + ret["oml-python:serialized_object"] = "rv_frozen" + ret["value"] = OrderedDict( + (("dist", dist), ("a", a), ("b", b), ("args", args), ("kwds", kwds)), + ) + return ret + + def _deserialize_rv_frozen(self, o: OrderedDict[str, str]) -> Any: + args = o["args"] + kwds = o["kwds"] + a = o["a"] + b = o["b"] + dist_name = o["dist"] + + module_name = dist_name.rsplit(".", 1) + try: + rv_class = getattr(importlib.import_module(module_name[0]), module_name[1]) + except AttributeError as e: + _tb = traceback.format_exc() + warnings.warn( + f"Cannot create model {dist_name} for flow. Reason is from error {type(e)}:{e}" + f"\nTraceback: {_tb}", + RuntimeWarning, + stacklevel=2, + ) + return None + + dist = scipy.stats.distributions.rv_frozen(rv_class(), *args, **kwds) # type: ignore + dist.a = a + dist.b = b + + return dist + + def _serialize_function(self, o: Callable) -> OrderedDict[str, str]: + name = o.__module__ + "." + o.__name__ + ret = OrderedDict() # type: 'OrderedDict[str, str]' + ret["oml-python:serialized_object"] = "function" + ret["value"] = name + return ret + + def _deserialize_function(self, name: str) -> Callable: + module_name = name.rsplit(".", 1) + return getattr(importlib.import_module(module_name[0]), module_name[1]) + + def _serialize_cross_validator(self, o: Any) -> OrderedDict[str, str | dict]: + ret: OrderedDict[str, str | dict] = OrderedDict() + + parameters = OrderedDict() # type: 'OrderedDict[str, Any]' + + # XXX this is copied from sklearn.model_selection._split + cls = o.__class__ + init = getattr(cls.__init__, "deprecated_original", cls.__init__) + # Ignore varargs, kw and default values and pop self + init_signature = inspect.signature(init) # type: ignore + # Consider the constructor parameters excluding 'self' + if init is object.__init__: + args = [] # type: List + else: + args = sorted( + [ + p.name + for p in init_signature.parameters.values() + if p.name != "self" and p.kind != p.VAR_KEYWORD + ], + ) + + for key in args: + # We need deprecation warnings to always be on in order to + # catch deprecated param values. + # This is set in utils/__init__.py but it gets overwritten + # when running under python3 somehow. + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always", DeprecationWarning) + value = getattr(o, key, None) + if w is not None and len(w) and w[0].category is DeprecationWarning: + # if the parameter is deprecated, don't show it + continue + + if not (isinstance(value, Sized) and len(value) == 0): + value = json.dumps(value) + parameters[key] = value + else: + parameters[key] = None + + ret["oml-python:serialized_object"] = "cv_object" + name = o.__module__ + "." + o.__class__.__name__ + value = OrderedDict([("name", name), ("parameters", parameters)]) + ret["value"] = value + + return ret + + def _deserialize_cross_validator( + self, + value: OrderedDict[str, Any], + recursion_depth: int, + strict_version: bool = True, # noqa: FBT002, FBT001 + ) -> Any: + model_name = value["name"] + parameters = value["parameters"] + + module_name = model_name.rsplit(".", 1) + model_class = getattr(importlib.import_module(module_name[0]), module_name[1]) + for parameter in parameters: + parameters[parameter] = self._deserialize_sklearn( + parameters[parameter], + recursion_depth=recursion_depth + 1, + strict_version=strict_version, + ) + return model_class(**parameters) + + def _format_external_version( + self, + model_package_name: str, + model_package_version_number: str, + ) -> str: + return f"{model_package_name}=={model_package_version_number}" + + @staticmethod + def _get_parameter_values_recursive( + param_grid: dict | list[dict], + parameter_name: str, + ) -> list[Any]: + """ + Returns a list of values for a given hyperparameter, encountered + recursively throughout the flow. (e.g., n_jobs can be defined + for various flows) + + Parameters + ---------- + param_grid: Union[Dict, List[Dict]] + Dict mapping from hyperparameter list to value, to a list of + such dicts + + parameter_name: str + The hyperparameter that needs to be inspected + + Returns + ------- + List + A list of all values of hyperparameters with this name + """ + if isinstance(param_grid, dict): + return [ + value + for param, value in param_grid.items() + if param.split("__")[-1] == parameter_name + ] + + if isinstance(param_grid, list): + result = [] + for sub_grid in param_grid: + result.extend( + SklearnSerializer._get_parameter_values_recursive(sub_grid, parameter_name), + ) + return result + + raise ValueError("Param_grid should either be a dict or list of dicts") + + def _prevent_optimize_n_jobs(self, model): + """ + Ensures that HPO classes will not optimize the n_jobs hyperparameter + + Parameters + ---------- + model: + The model that will be fitted + """ + if self._is_hpo_class(model): + if isinstance(model, sklearn.model_selection.GridSearchCV): + param_distributions = model.param_grid + elif isinstance(model, sklearn.model_selection.RandomizedSearchCV): + param_distributions = model.param_distributions + else: + if hasattr(model, "param_distributions"): + param_distributions = model.param_distributions + else: + raise AttributeError( + "Using subclass BaseSearchCV other than " + "{GridSearchCV, RandomizedSearchCV}. " + "Could not find attribute " + "param_distributions.", + ) + logger.warning( + "Warning! Using subclass BaseSearchCV other than " + "{GridSearchCV, RandomizedSearchCV}. " + "Should implement param check. ", + ) + n_jobs_vals = self._get_parameter_values_recursive( + param_distributions, + "n_jobs", + ) + if len(n_jobs_vals) > 0: + raise PyOpenMLError( + "openml-python should not be used to optimize the n_jobs parameter.", + )