Expand source code
import copy
from copy import deepcopy
from typing import List, Callable, Mapping, Union, Optional

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.stats
from joblib import Parallel, delayed
from sklearn.base import RegressorMixin, BaseEstimator
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import cross_val_score
from sklearn.tree import DecisionTreeClassifier
from sklearn import datasets, model_selection

from .data import Data
from .initializers.initializer import Initializer
from .model import Model
from .node import LeafNode, DecisionNode
from .samplers.leafnode import LeafNodeSampler
from .samplers.modelsampler import ModelSampler, Chain
from .samplers.schedule import SampleSchedule
from .samplers.sigma import SigmaSampler
from .samplers.treemutation import TreeMutationSampler
from .samplers.unconstrainedtree.treemutation import get_tree_sampler
from .sigma import Sigma


def run_chain(model: 'SklearnModel', X: np.ndarray, y: np.ndarray):
    """
    Run a single chain for a model
    Primarily used as a building block for constructing a parallel run of multiple chains
    """

    # TODO: support for classification F^{-1} (y) ~ N(G(x), 1)

    # if model.classification:
    #     z = np.random.normal(loc=model.predict(X))
    #     z[y == 1] = np.maximum(z[y == 1], 0)
    #     z[y == 0] = np.minimum(z[y == 0], 0)
    #     y = z

    model.model = model._construct_model(X, y)

    return model.sampler.samples(model.model,
                                 model.n_samples,
                                 model.n_burn,
                                 model.thin,
                                 model.store_in_sample_predictions,
                                 model.store_acceptance_trace)


def delayed_run_chain():
    return run_chain


def get_nodes(root_node):
    decision_nodes = []
    leaf_nodes = []

    def _add_nodes(n):
        if type(n) == LeafNode:
            leaf_nodes.append(n)
        elif type(n) == DecisionNode:
            decision_nodes.append(n)
        if n.left_child:
            _add_nodes(n.left_child)
        if n.right_child:
            _add_nodes(n.right_child)

    _add_nodes(root_node)
    return decision_nodes, leaf_nodes


def get_root_node(tree):
    for n in tree.decision_nodes:
        if n.depth == 0:
            return n


def shrink_tree(tree, reg_param):
    root = get_root_node(tree)
    tree_d_node = shrink_node(root, reg_param)
    d, l = get_nodes(tree_d_node)
    tree._nodes = d + l
    return tree


def expand_node(node):
    mask_int = 1 - node.data.mask.astype(int)
    # y = node.data.y.values
    val = np.sum(node.data.y.values
                 * mask_int) / np.sum(mask_int)
    node.set_value(val)
    return node


def expand_tree(tree):
    decision, leaves = get_nodes(get_root_node(tree))
    leaves_new = [expand_node(n) for n in leaves]
    tree._nodes = decision + leaves_new
    return tree


def shrink_node(node, reg_param, parent_val, parent_num, cum_sum, scheme, constant):
    """Shrink the tree
    """

    # node.set_value(node.mean_response)

    left = node.left_child
    right = node.right_child
    is_leaf = type(node) == LeafNode
    # if self.prediction_task == 'regression':
    val = node.current_value
    is_root = parent_val is None and parent_num is None
    n_samples = node.n_obs if (scheme != "leaf_based" or is_root) else parent_num

    if is_root:
        val_new = val

    else:
        reg_term = reg_param if scheme == "constant" else reg_param / parent_num

        val_new = (val - parent_val) / (1 + reg_term)

    cum_sum += val_new

    if is_leaf:
        if scheme == "leaf_based":
            v = constant + (val - constant) / (1 + reg_param / node.n_obs)
            node.set_value(v)
        else:
            node.set_value(cum_sum)

    else:
        shrink_node(left, reg_param, val, parent_num=n_samples, cum_sum=cum_sum, scheme=scheme, constant=constant)
        shrink_node(right, reg_param, val, parent_num=n_samples, cum_sum=cum_sum, scheme=scheme, constant=constant)

    return node


class SklearnModel(BaseEstimator, RegressorMixin):
    """
    The main access point to building BART models in BartPy

    Parameters
    ----------
    n_trees: int
        the number of trees to use, more trees will make a smoother fit, but slow training and fitting
    n_chains: int
        the number of independent chains to run
        more chains will improve the quality of the samples, but will require more computation
    sigma_a: float
        shape parameter of the prior on sigma
    sigma_b: float
        scale parameter of the prior on sigma
    n_samples: int
        how many recorded samples to take
    n_burn: int
        how many samples to run without recording to reach convergence
    thin: float
        percentage of samples to store.
        use this to save memory when running large models
    p_grow: float
        probability of choosing a grow mutation in tree mutation sampling
    p_prune: float
        probability of choosing a prune mutation in tree mutation sampling
    alpha: float
        prior parameter on tree structure
    beta: float
        prior parameter on tree structure
    store_in_sample_predictions: bool
        whether to store full prediction samples
        set to False if you don't need in sample results - saves a lot of memory
    store_acceptance_trace: bool
        whether to store acceptance rates of the gibbs samples
        unless you're very memory constrained, you wouldn't want to set this to false
        useful for diagnostics
    tree_sampler: TreeMutationSampler
        Method of sampling used on trees
        defaults to `bartpy.samplers.unconstrainedtree`
    initializer: Initializer
        Class that handles the initialization of tree structure and leaf values
    n_jobs: int
        how many cores to use when computing MCMC samples
        set to `-1` to use all cores
    """

    def __init__(self,
                 n_trees: int = 200,
                 n_chains: int = 4,
                 sigma_a: float = 0.001,
                 sigma_b: float = 0.001,
                 n_samples: int = 200,
                 n_burn: int = 200,
                 thin: float = 0.1,
                 alpha: float = 0.95,
                 beta: float = 2.,
                 store_in_sample_predictions: bool = False,
                 store_acceptance_trace: bool = False,
                 tree_sampler: TreeMutationSampler = get_tree_sampler(0.5, 0.5),
                 initializer: Optional[Initializer] = None,
                 n_jobs=-1,
                 classification: bool = False,
                 max_rules=None):
        self.n_trees = n_trees
        self.n_chains = n_chains
        self.sigma_a = sigma_a
        self.sigma_b = sigma_b
        self.n_burn = n_burn
        self.n_samples = n_samples
        self.p_grow = 0.5
        self.p_prune = 0.5
        self.alpha = alpha
        self.beta = beta
        self.thin = thin
        self.n_jobs = n_jobs
        self.store_in_sample_predictions = store_in_sample_predictions
        self.store_acceptance_trace = store_acceptance_trace
        self.columns = None
        self.tree_sampler = tree_sampler
        self.initializer = initializer
        self.schedule = SampleSchedule(self.tree_sampler, LeafNodeSampler(), SigmaSampler())
        self.sampler = ModelSampler(self.schedule)
        self.classification = classification
        self.max_rules = max_rules

        self.sigma, self.data, self.model, self._prediction_samples, self._model_samples, self.extract = [None] * 6

    def fit(self, X: Union[np.ndarray, pd.DataFrame], y: np.ndarray) -> 'SklearnModel':
        """
        Learn the model based on training data

        Parameters
        ----------
        X: pd.DataFrame
            training covariates
        y: np.ndarray
            training targets

        Returns
        -------
        SklearnModel
            self with trained parameter values
        """
        self.model = self._construct_model(X, y)
        self.extract = Parallel(n_jobs=self.n_jobs)(self.f_delayed_chains(X, y))
        self.combined_chains = self._combine_chains(self.extract)
        self._model_samples, self._prediction_samples = self.combined_chains["model"], self.combined_chains[
            "in_sample_predictions"]
        self._acceptance_trace = self.combined_chains["acceptance"]
        self._likelihood = self.combined_chains["likelihood"]
        self._probs = self.combined_chains["probs"]

        self.fitted_ = True
        return self

    @property
    def fitted(self):
        if hasattr(self, "fitted_"):
            return self.fitted_
        return False

    @property
    def complexity_(self):
        if hasattr(self.initializer, "_tree"):
            estimator = self.initializer._tree
            if hasattr(estimator, 'complexity_'):
                return estimator.complexity_

    @staticmethod
    def _combine_chains(extract: List[Chain]) -> Chain:
        keys = list(extract[0].keys())
        combined = {}
        for key in keys:
            combined[key] = np.concatenate([chain[key] for chain in extract], axis=0)
        return combined

    @staticmethod
    def _convert_covariates_to_data(X: np.ndarray, y: np.ndarray) -> Data:
        from copy import deepcopy
        if type(X) == pd.DataFrame:
            X: pd.DataFrame = X
            X = X.values
        return Data(deepcopy(X), deepcopy(y), normalize=True)

    def _construct_model(self, X: np.ndarray, y: np.ndarray) -> Model:
        if len(X) == 0 or X.shape[1] == 0:
            raise ValueError("Empty covariate matrix passed")
        self.data = self._convert_covariates_to_data(X, y)
        self.sigma = Sigma(self.sigma_a, self.sigma_b, self.data.y.normalizing_scale, self.classification)
        self.model = Model(self.data,
                           self.sigma,
                           n_trees=self.n_trees,
                           alpha=self.alpha,
                           beta=self.beta,
                           initializer=self.initializer,
                           classification=self.classification)
        n_trees = self.n_trees if self.initializer is None else self.initializer.n_trees
        self.n_trees = n_trees
        return self.model

    def f_delayed_chains(self, X: np.ndarray, y: np.ndarray):
        """
        Access point for getting access to delayed methods for running chains
        Useful for when you want to run multiple instances of the model in parallel
        e.g. when calculating a null distribution for feature importance

        Parameters
        ----------
        X: np.ndarray
            Covariate matrix
        y: np.ndarray
            Target array

        Returns
        -------
        List[Callable[[], ChainExtract]]
        """
        return [delayed(x)(self, X, y) for x in self.f_chains()]

    def f_chains(self) -> List[Callable[[], Chain]]:
        """
        List of methods to run MCMC chains
        Useful for running multiple models in parallel

        Returns
        -------
        List[Callable[[], Extract]]
            List of method to run individual chains
            Length of n_chains
        """
        return [delayed_run_chain() for _ in range(self.n_chains)]

    def predict(self, X: np.ndarray = None) -> np.ndarray:
        """
        Predict the target corresponding to the provided covariate matrix
        If X is None, will predict based on training covariates

        Prediction is based on the mean of all samples

        Parameters
        ----------
        X: pd.DataFrame
            covariates to predict from

        Returns
        -------
        np.ndarray
            predictions for the X covariates
        """
        if X is None and self.store_in_sample_predictions:
            return self.data.y.unnormalize_y(np.mean(self._prediction_samples, axis=0))
        elif X is None and not self.store_in_sample_predictions:
            raise ValueError(
                "In sample predictions only possible if model.store_in_sample_predictions is `True`.  Either set the parameter to True or pass a non-None X parameter")
        else:
            predictions = self._out_of_sample_predict(X)
            if self.classification:
                return np.round(predictions, 0)
            return predictions

    def predict_proba(self, X: np.ndarray = None) -> np.ndarray:
        preds = self._out_of_sample_predict(X)
        return np.stack([preds, 1 - preds], axis=1)

    def residuals(self, X=None, y=None) -> np.ndarray:
        """
        Array of error for each observation

        Parameters
        ----------
        X: np.ndarray
            Covariate matrix
        y: np.ndarray
            Target array

        Returns
        -------
        np.ndarray
            Error for each observation
        """
        if y is None:
            return self.model.data.y.unnormalized_y - self.predict(X)
        else:
            return y - self.predict(X)

    def l2_error(self, X=None, y=None) -> np.ndarray:
        """
        Calculate the squared errors for each row in the covariate matrix

        Parameters
        ----------
        X: np.ndarray
            Covariate matrix
        y: np.ndarray
            Target array
        Returns
        -------
        np.ndarray
            Squared error for each observation
        """
        return np.square(self.residuals(X, y))

    def rmse(self, X, y) -> float:
        """
        The total RMSE error of the model
        The sum of squared errors over all observations

        Parameters
        ----------
        X: np.ndarray
            Covariate matrix
        y: np.ndarray
            Target array

        Returns
        -------
        float
            The total summed L2 error for the model
        """
        return np.sqrt(np.sum(self.l2_error(X, y)))

    def _chain_pred_arr(self, X, chain_number):
        chain_len = int(self.n_samples)
        samples_chain = self._model_samples[chain_number * chain_len: (chain_number + 1) * chain_len]
        predictions_transformed = [x.predict(X) for x in samples_chain]
        return predictions_transformed

    def predict_chain(self, X, chain_number):
        predictions_transformed = self._chain_pred_arr(X, chain_number)
        predictions = self.data.y.unnormalize_y(np.mean(predictions_transformed, axis=0))
        if self.classification:
            predictions = scipy.stats.norm.cdf(predictions)
        return predictions

    def chain_mse_std(self, X, y, chain_number):
        predictions_transformed = self._chain_pred_arr(X, chain_number)
        predictions_std = np.std(
            [mean_squared_error(self.data.y.unnormalize_y(preds), y) for preds in predictions_transformed])
        return predictions_std

    def chain_predictions(self, X, chain_number):
        predictions_transformed = self._chain_pred_arr(X, chain_number)
        preds_arr = [self.data.y.unnormalize_y(preds) for preds in predictions_transformed]
        return preds_arr

    def between_chains_var(self, X):
        all_predictions = np.stack([self.data.y.unnormalize_y(x.predict(X)) for x in self._model_samples], axis=1)

        def _get_var(preds_arr):
            mean_pred = preds_arr.mean(axis=1)
            var = np.mean((preds_arr - np.expand_dims(mean_pred, 1)) ** 2)
            return var

        total_var = _get_var(all_predictions)
        within_chain_var = 0
        for c in range(self.n_chains):
            chain_preds = self._chain_pred_arr(X, c)
            within_chain_var += _get_var(np.stack(chain_preds, axis=1))
        return total_var - within_chain_var

    def _out_of_sample_predict(self, X):
        samples = self._model_samples
        predictions_transformed = [x.predict(X) for x in samples]
        predictions = self.data.y.unnormalize_y(np.mean(predictions_transformed, axis=0))
        if self.classification:
            predictions = scipy.stats.norm.cdf(predictions)
        return predictions

    def fit_predict(self, X, y):
        self.fit(X, y)
        if self.store_in_sample_predictions:
            return self.predict()
        else:
            return self.predict(X)

    @property
    def model_samples(self) -> List[Model]:
        """
        Array of the model as it was after each sample.
        Useful for examining for:

         - examining the state of trees, nodes and sigma throughout the sampling
         - out of sample prediction

        Returns None if the model hasn't been fit

        Returns
        -------
        List[Model]
        """
        return self._model_samples

    @property
    def acceptance_trace(self) -> List[Mapping[str, float]]:
        """
        List of Mappings from variable name to acceptance rates

        Each entry is the acceptance rate of the variable in each iteration of the model

        Returns
        -------
        List[Mapping[str, float]]
        """
        return self._acceptance_trace

    @property
    def likelihood(self) -> List:
        """
        List of Mappings from variable name to likelihood

        Each entry is the acceptance rate of the variable in each iteration of the model

        Returns
        -------
        List[Mapping[str, float]]
        """
        return self._likelihood

    @property
    def probs(self) -> List:
        """
        List of Mappings from variable name to likelihood

        Each entry is the acceptance rate of the variable in each iteration of the model

        Returns
        -------
        List[Mapping[str, float]]
        """
        return self._probs

    @property
    def prediction_samples(self) -> np.ndarray:
        """
        Matrix of prediction samples at each point in sampling
        Useful for assessing convergence, calculating point estimates etc.

        Returns
        -------
        np.ndarray
            prediction samples with dimensionality n_samples * n_points
        """
        return self.prediction_samples

    def from_extract(self, extract: List[Chain], X: np.ndarray, y: np.ndarray) -> 'SklearnModel':
        """
        Create a copy of the model using an extract
        Useful for doing operations on extracts created in external processes like feature selection
        Parameters
        ----------
        extract: Extract
            samples produced by delayed chain methods
        X: np.ndarray
            Covariate matrix
        y: np.ndarray
            Target variable

        Returns
        -------
        SklearnModel
            Copy of the current model with samples
        """
        new_model = deepcopy(self)
        combined_chain = self._combine_chains(extract)
        self._model_samples, self._prediction_samples = combined_chain["model"], combined_chain["in_sample_predictions"]
        self._acceptance_trace = combined_chain["acceptance"]
        new_model.data = self._convert_covariates_to_data(X, y)
        return new_model


class BART(SklearnModel):

    @staticmethod
    def _get_n_nodes(trees):
        nodes = 0
        for tree in trees:
            nodes += len(tree.decision_nodes)
        return nodes

    @property
    def sample_complexity(self):
        # samples = self._model_samples
        # trees = [s.trees for s in samples]
        complexities = [self._get_n_nodes(t) for t in self.trees]
        return np.sum(complexities)

    @staticmethod
    def sub_forest(trees, n_nodes):
        nodes = 0
        for i, tree in enumerate(trees):
            nodes += len(tree.decision_nodes)
            if nodes >= n_nodes:
                return trees[0:i + 1]

    @property
    def trees(self):
        trs = [s.trees for s in self._model_samples]
        return trs

    def update_complexity(self, i):
        samples_complexity = [self._get_n_nodes(t) for t in self.trees]

        # complexity_sum = 0
        arg_sort_complexity = np.argsort(samples_complexity)
        self._model_samples = self._model_samples[arg_sort_complexity[:i + 1]]

        return self


class ImputedBART(BaseEstimator):
    def __init__(self, estimator_):
        # super(ShrunkBARTRegressor, self).__init__()
        self.estimator_ = estimator_

    def predict(self, *args, **kwargs):
        return self.estimator_.predict(*args, **kwargs)

    def predict_proba(self, *args, **kwargs):
        if hasattr(self.estimator_, 'predict_proba'):
            return self.estimator_.predict_proba(*args, **kwargs)
        else:
            return NotImplemented

    def score(self, *args, **kwargs):
        if hasattr(self.estimator_, 'score'):
            return self.estimator_.score(*args, **kwargs)
        else:
            return NotImplemented


class ShrunkBART(ImputedBART):

    def __init__(self, estimator_, reg_param, scheme):
        super(ShrunkBART, self).__init__(estimator_)
        self.reg_param = reg_param
        self.scheme = scheme

    def shrink_tree(self, tree):
        root = get_root_node(tree)
        tree_d_node = shrink_node(root, self.reg_param, parent_val=None, parent_num=None, cum_sum=0, scheme=self.scheme,
                                  constant=np.mean(self.estimator_.data.y.values))
        d, l = get_nodes(tree_d_node)
        tree._nodes = d + l
        return tree

    def fit(self, *args, **kwargs):
        if not self.estimator_.fitted:
            self.estimator_.fit(*args, **kwargs)
        samples = []
        for s in self.estimator_.model_samples:
            for i, tree in enumerate(s._trees):
                s_tree = self.shrink_tree(expand_tree(copy.deepcopy(tree)))
                s._trees[i] = s_tree
            samples.append(s)
        self.estimator_._model_samples = samples
        self.fitted_ = True


class ExpandedBART(ImputedBART):

    def fit(self, *args, **kwargs):
        if not self.estimator_.fitted:
            self.estimator_.fit(*args, **kwargs)
        samples = []
        for s in self.estimator_.model_samples:
            for i, tree in enumerate(s._trees):
                s_tree = expand_tree(copy.deepcopy(tree))
                s._trees[i] = s_tree
            samples.append(s)
        self.estimator_._model_samples = samples
        self.fitted_ = True


# class ExpandedBARTRegressor(ImputedBARTRegressor):
#
#     def fit(self, *args, **kwargs):
#         if not self.estimator_.fitted:
#             self.estimator_.fit(*args, **kwargs)
#         samples = []
#         for s in self.estimator_.model_samples:
#             for i, tree in enumerate(s._trees):
#                 s_tree = expend_tree(copy.deepcopy(tree), args[1])
#                 s._trees[i] = s_tree
#             samples.append(s)
#         self.estimator_._model_samples = samples
#         self.fitted_ = True


class ShrunkBARTCV(ShrunkBART):
    def __init__(self, estimator_: BaseEstimator, scheme: str,
                 reg_param_list: List[float] = [0.1, 1, 10, 50, 100, 500],
                 cv: int = 3, scoring=None):
        super(ShrunkBARTCV, self).__init__(estimator_, None, scheme)
        self.reg_param_list = np.array(reg_param_list)
        self.cv = cv
        self.scoring = scoring

    def fit(self, X, y, *args, **kwargs):
        self.scores_ = []
        for reg_param in self.reg_param_list:
            est = ShrunkBART(deepcopy(self.estimator_), reg_param, self.scheme)
            cv_scores = cross_val_score(est, X, y, cv=self.cv, scoring=self.scoring)
            self.scores_.append(np.mean(cv_scores))
        self.reg_param = self.reg_param_list[np.argmax(self.scores_)]
        super().fit(X=X, y=y)


def main():
    # iris = datasets.load_iris()
    # idx = np.logical_or(iris.target == 0, iris.target == 1)
    # X, y = iris.data[idx, ...], iris.target[idx]
    X, y = datasets.load_diabetes(return_X_y=True)

    X_train, X_test, y_train, y_test = model_selection.train_test_split(
        X, y, test_size=0.3, random_state=1)
    bart = BART(classification=False)
    bart.fit(X_train, y_train)
    preds_org = bart.predict(X_test)
    mse = np.linalg.norm(preds_org - y_test)
    print(mse)
    # tree = DecisionTreeClassifier()
    # tree.fit(X, y)
    # preds_tree = tree.predict_proba(X)
    # bart_s = ShrunkBARTCV(copy.deepcopy(bart), scheme="node_based")
    # bart_s.fit(X, y)
    #
    # # bart_s_c = ShrunkBART(copy.deepcopy(bart), reg_param=2, scheme="constant")
    # # bart_s_c.fit(X, y)
    # #
    # # bart_s_l = ShrunkBART(copy.deepcopy(bart), reg_param=2, scheme="leaf_based")
    # # bart_s_l.fit(X, y)
    # # bart_s_cv = ShrunkBARTRegressorCV(estimator_=copy.deepcopy(bart))
    # # bart_s_cv.fit(X, y)
    # e_bart = ExpandedBART(estimator_=copy.deepcopy(bart))
    # e_bart.fit(X, y)
    #
    # preds = bart_s.predict(X)
    #
    # # preds_c = bart_s_c.predict(X)
    # # preds_l = bart_s_l.predict(X)
    # # # preds_cv = bart_s_cv.predict(X)
    # preds_bart_e = e_bart.predict(X)
    # fig, ax = plt.subplots(1)
    #
    # ax.scatter(np.arange(len(y)), preds_org, c="orange", label="bart")
    # ax.scatter(np.arange(len(y)), preds, c="purple", alpha=0.3, label="shrunk node")
    # # ax.scatter(np.arange(len(y)), preds_c, c="blue", alpha=0.3, label="shrunk constant")
    # # ax.scatter(np.arange(len(y)), preds_l, c="red", alpha=0.3, label="shrunk leaf")
    # ax.scatter(np.arange(len(y)), preds_bart_e, c="green", alpha=0.3, label="average")
    # # preds_all = [preds_org, preds_c, preds_l, preds_bart_e, preds]
    # # shift = 0.5
    # # rng = (np.min([np.min(p) for p in preds_all]) - shift, np.max([np.max(p) for p in preds_all]) + shift)
    # # n_bins = 200
    # # alpha = 0.8
    # # ax.hist(preds_org, color="orange", alpha=alpha, label="bart", bins=n_bins, range=rng)
    # # ax.hist(preds, color="purple", alpha=alpha, label="shrunk node", bins=n_bins, range=rng)
    # # ax.hist(preds_c, color="blue", alpha=alpha, label="shrunk constant", bins=n_bins, range=rng)
    # # ax.hist(preds_l, color="red", alpha=alpha, label="shrunk leaf", bins=n_bins, range=rng)
    # # ax.hist(preds_bart_e, color="green", alpha=alpha, label="average", bins=n_bins, range=rng)
    # #
    # # ax.set_xlabel("Predicted Value")
    # # ax.set_ylabel("Count")
    #
    # plt.title(np.mean(y))
    #
    # plt.legend(loc="upper left")
    # plt.savefig("bart_shrink.png")
    # # plt.show()
    # #
    # # plt.close()
    #


if __name__ == '__main__':
    main()

Functions

def delayed_run_chain()
Expand source code
def delayed_run_chain():
    return run_chain
def expand_node(node)
Expand source code
def expand_node(node):
    mask_int = 1 - node.data.mask.astype(int)
    # y = node.data.y.values
    val = np.sum(node.data.y.values
                 * mask_int) / np.sum(mask_int)
    node.set_value(val)
    return node
def expand_tree(tree)
Expand source code
def expand_tree(tree):
    decision, leaves = get_nodes(get_root_node(tree))
    leaves_new = [expand_node(n) for n in leaves]
    tree._nodes = decision + leaves_new
    return tree
def get_nodes(root_node)
Expand source code
def get_nodes(root_node):
    decision_nodes = []
    leaf_nodes = []

    def _add_nodes(n):
        if type(n) == LeafNode:
            leaf_nodes.append(n)
        elif type(n) == DecisionNode:
            decision_nodes.append(n)
        if n.left_child:
            _add_nodes(n.left_child)
        if n.right_child:
            _add_nodes(n.right_child)

    _add_nodes(root_node)
    return decision_nodes, leaf_nodes
def get_root_node(tree)
Expand source code
def get_root_node(tree):
    for n in tree.decision_nodes:
        if n.depth == 0:
            return n
def main()
Expand source code
def main():
    # iris = datasets.load_iris()
    # idx = np.logical_or(iris.target == 0, iris.target == 1)
    # X, y = iris.data[idx, ...], iris.target[idx]
    X, y = datasets.load_diabetes(return_X_y=True)

    X_train, X_test, y_train, y_test = model_selection.train_test_split(
        X, y, test_size=0.3, random_state=1)
    bart = BART(classification=False)
    bart.fit(X_train, y_train)
    preds_org = bart.predict(X_test)
    mse = np.linalg.norm(preds_org - y_test)
    print(mse)
    # tree = DecisionTreeClassifier()
    # tree.fit(X, y)
    # preds_tree = tree.predict_proba(X)
    # bart_s = ShrunkBARTCV(copy.deepcopy(bart), scheme="node_based")
    # bart_s.fit(X, y)
    #
    # # bart_s_c = ShrunkBART(copy.deepcopy(bart), reg_param=2, scheme="constant")
    # # bart_s_c.fit(X, y)
    # #
    # # bart_s_l = ShrunkBART(copy.deepcopy(bart), reg_param=2, scheme="leaf_based")
    # # bart_s_l.fit(X, y)
    # # bart_s_cv = ShrunkBARTRegressorCV(estimator_=copy.deepcopy(bart))
    # # bart_s_cv.fit(X, y)
    # e_bart = ExpandedBART(estimator_=copy.deepcopy(bart))
    # e_bart.fit(X, y)
    #
    # preds = bart_s.predict(X)
    #
    # # preds_c = bart_s_c.predict(X)
    # # preds_l = bart_s_l.predict(X)
    # # # preds_cv = bart_s_cv.predict(X)
    # preds_bart_e = e_bart.predict(X)
    # fig, ax = plt.subplots(1)
    #
    # ax.scatter(np.arange(len(y)), preds_org, c="orange", label="bart")
    # ax.scatter(np.arange(len(y)), preds, c="purple", alpha=0.3, label="shrunk node")
    # # ax.scatter(np.arange(len(y)), preds_c, c="blue", alpha=0.3, label="shrunk constant")
    # # ax.scatter(np.arange(len(y)), preds_l, c="red", alpha=0.3, label="shrunk leaf")
    # ax.scatter(np.arange(len(y)), preds_bart_e, c="green", alpha=0.3, label="average")
    # # preds_all = [preds_org, preds_c, preds_l, preds_bart_e, preds]
    # # shift = 0.5
    # # rng = (np.min([np.min(p) for p in preds_all]) - shift, np.max([np.max(p) for p in preds_all]) + shift)
    # # n_bins = 200
    # # alpha = 0.8
    # # ax.hist(preds_org, color="orange", alpha=alpha, label="bart", bins=n_bins, range=rng)
    # # ax.hist(preds, color="purple", alpha=alpha, label="shrunk node", bins=n_bins, range=rng)
    # # ax.hist(preds_c, color="blue", alpha=alpha, label="shrunk constant", bins=n_bins, range=rng)
    # # ax.hist(preds_l, color="red", alpha=alpha, label="shrunk leaf", bins=n_bins, range=rng)
    # # ax.hist(preds_bart_e, color="green", alpha=alpha, label="average", bins=n_bins, range=rng)
    # #
    # # ax.set_xlabel("Predicted Value")
    # # ax.set_ylabel("Count")
    #
    # plt.title(np.mean(y))
    #
    # plt.legend(loc="upper left")
    # plt.savefig("bart_shrink.png")
    # # plt.show()
    # #
    # # plt.close()
    #
def run_chain(model: SklearnModel, X: numpy.ndarray, y: numpy.ndarray)

Run a single chain for a model Primarily used as a building block for constructing a parallel run of multiple chains

Expand source code
def run_chain(model: 'SklearnModel', X: np.ndarray, y: np.ndarray):
    """
    Run a single chain for a model
    Primarily used as a building block for constructing a parallel run of multiple chains
    """

    # TODO: support for classification F^{-1} (y) ~ N(G(x), 1)

    # if model.classification:
    #     z = np.random.normal(loc=model.predict(X))
    #     z[y == 1] = np.maximum(z[y == 1], 0)
    #     z[y == 0] = np.minimum(z[y == 0], 0)
    #     y = z

    model.model = model._construct_model(X, y)

    return model.sampler.samples(model.model,
                                 model.n_samples,
                                 model.n_burn,
                                 model.thin,
                                 model.store_in_sample_predictions,
                                 model.store_acceptance_trace)
def shrink_node(node, reg_param, parent_val, parent_num, cum_sum, scheme, constant)

Shrink the tree

Expand source code
def shrink_node(node, reg_param, parent_val, parent_num, cum_sum, scheme, constant):
    """Shrink the tree
    """

    # node.set_value(node.mean_response)

    left = node.left_child
    right = node.right_child
    is_leaf = type(node) == LeafNode
    # if self.prediction_task == 'regression':
    val = node.current_value
    is_root = parent_val is None and parent_num is None
    n_samples = node.n_obs if (scheme != "leaf_based" or is_root) else parent_num

    if is_root:
        val_new = val

    else:
        reg_term = reg_param if scheme == "constant" else reg_param / parent_num

        val_new = (val - parent_val) / (1 + reg_term)

    cum_sum += val_new

    if is_leaf:
        if scheme == "leaf_based":
            v = constant + (val - constant) / (1 + reg_param / node.n_obs)
            node.set_value(v)
        else:
            node.set_value(cum_sum)

    else:
        shrink_node(left, reg_param, val, parent_num=n_samples, cum_sum=cum_sum, scheme=scheme, constant=constant)
        shrink_node(right, reg_param, val, parent_num=n_samples, cum_sum=cum_sum, scheme=scheme, constant=constant)

    return node
def shrink_tree(tree, reg_param)
Expand source code
def shrink_tree(tree, reg_param):
    root = get_root_node(tree)
    tree_d_node = shrink_node(root, reg_param)
    d, l = get_nodes(tree_d_node)
    tree._nodes = d + l
    return tree

Classes

class BART (n_trees: int = 200, n_chains: int = 4, sigma_a: float = 0.001, sigma_b: float = 0.001, n_samples: int = 200, n_burn: int = 200, thin: float = 0.1, alpha: float = 0.95, beta: float = 2.0, store_in_sample_predictions: bool = False, store_acceptance_trace: bool = False, tree_sampler: TreeMutationSampler = <imodels.experimental.bartpy.samplers.unconstrainedtree.treemutation.UnconstrainedTreeMutationSampler object>, initializer: Optional[Initializer] = None, n_jobs=-1, classification: bool = False, max_rules=None)

The main access point to building BART models in BartPy

Parameters

n_trees : int
the number of trees to use, more trees will make a smoother fit, but slow training and fitting
n_chains : int
the number of independent chains to run more chains will improve the quality of the samples, but will require more computation
sigma_a : float
shape parameter of the prior on sigma
sigma_b : float
scale parameter of the prior on sigma
n_samples : int
how many recorded samples to take
n_burn : int
how many samples to run without recording to reach convergence
thin : float
percentage of samples to store. use this to save memory when running large models
p_grow : float
probability of choosing a grow mutation in tree mutation sampling
p_prune : float
probability of choosing a prune mutation in tree mutation sampling
alpha : float
prior parameter on tree structure
beta : float
prior parameter on tree structure
store_in_sample_predictions : bool
whether to store full prediction samples set to False if you don't need in sample results - saves a lot of memory
store_acceptance_trace : bool
whether to store acceptance rates of the gibbs samples unless you're very memory constrained, you wouldn't want to set this to false useful for diagnostics
tree_sampler : TreeMutationSampler
Method of sampling used on trees defaults to bartpy.samplers.unconstrainedtree
initializer : Initializer
Class that handles the initialization of tree structure and leaf values
n_jobs : int
how many cores to use when computing MCMC samples set to -1 to use all cores
Expand source code
class BART(SklearnModel):

    @staticmethod
    def _get_n_nodes(trees):
        nodes = 0
        for tree in trees:
            nodes += len(tree.decision_nodes)
        return nodes

    @property
    def sample_complexity(self):
        # samples = self._model_samples
        # trees = [s.trees for s in samples]
        complexities = [self._get_n_nodes(t) for t in self.trees]
        return np.sum(complexities)

    @staticmethod
    def sub_forest(trees, n_nodes):
        nodes = 0
        for i, tree in enumerate(trees):
            nodes += len(tree.decision_nodes)
            if nodes >= n_nodes:
                return trees[0:i + 1]

    @property
    def trees(self):
        trs = [s.trees for s in self._model_samples]
        return trs

    def update_complexity(self, i):
        samples_complexity = [self._get_n_nodes(t) for t in self.trees]

        # complexity_sum = 0
        arg_sort_complexity = np.argsort(samples_complexity)
        self._model_samples = self._model_samples[arg_sort_complexity[:i + 1]]

        return self

Ancestors

  • SklearnModel
  • sklearn.base.BaseEstimator
  • sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
  • sklearn.utils._metadata_requests._MetadataRequester
  • sklearn.base.RegressorMixin

Static methods

def sub_forest(trees, n_nodes)
Expand source code
@staticmethod
def sub_forest(trees, n_nodes):
    nodes = 0
    for i, tree in enumerate(trees):
        nodes += len(tree.decision_nodes)
        if nodes >= n_nodes:
            return trees[0:i + 1]

Instance variables

var sample_complexity
Expand source code
@property
def sample_complexity(self):
    # samples = self._model_samples
    # trees = [s.trees for s in samples]
    complexities = [self._get_n_nodes(t) for t in self.trees]
    return np.sum(complexities)
var trees
Expand source code
@property
def trees(self):
    trs = [s.trees for s in self._model_samples]
    return trs

Methods

def update_complexity(self, i)
Expand source code
def update_complexity(self, i):
    samples_complexity = [self._get_n_nodes(t) for t in self.trees]

    # complexity_sum = 0
    arg_sort_complexity = np.argsort(samples_complexity)
    self._model_samples = self._model_samples[arg_sort_complexity[:i + 1]]

    return self

Inherited members

class ExpandedBART (estimator_)

Base class for all estimators in scikit-learn.

Inheriting from this class provides default implementations of:

  • setting and getting parameters used by GridSearchCV and friends;
  • textual and HTML representation displayed in terminals and IDEs;
  • estimator serialization;
  • parameters validation;
  • data validation;
  • feature names validation.

Read more in the :ref:User Guide <rolling_your_own_estimator>.

Notes

All estimators should specify all the parameters that can be set at the class level in their __init__ as explicit keyword arguments (no *args or **kwargs).

Examples

>>> import numpy as np
>>> from sklearn.base import BaseEstimator
>>> class MyEstimator(BaseEstimator):
...     def __init__(self, *, param=1):
...         self.param = param
...     def fit(self, X, y=None):
...         self.is_fitted_ = True
...         return self
...     def predict(self, X):
...         return np.full(shape=X.shape[0], fill_value=self.param)
>>> estimator = MyEstimator(param=2)
>>> estimator.get_params()
{'param': 2}
>>> X = np.array([[1, 2], [2, 3], [3, 4]])
>>> y = np.array([1, 0, 1])
>>> estimator.fit(X, y).predict(X)
array([2, 2, 2])
>>> estimator.set_params(param=3).fit(X, y).predict(X)
array([3, 3, 3])
Expand source code
class ExpandedBART(ImputedBART):

    def fit(self, *args, **kwargs):
        if not self.estimator_.fitted:
            self.estimator_.fit(*args, **kwargs)
        samples = []
        for s in self.estimator_.model_samples:
            for i, tree in enumerate(s._trees):
                s_tree = expand_tree(copy.deepcopy(tree))
                s._trees[i] = s_tree
            samples.append(s)
        self.estimator_._model_samples = samples
        self.fitted_ = True

Ancestors

  • ImputedBART
  • sklearn.base.BaseEstimator
  • sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
  • sklearn.utils._metadata_requests._MetadataRequester

Methods

def fit(self, *args, **kwargs)
Expand source code
def fit(self, *args, **kwargs):
    if not self.estimator_.fitted:
        self.estimator_.fit(*args, **kwargs)
    samples = []
    for s in self.estimator_.model_samples:
        for i, tree in enumerate(s._trees):
            s_tree = expand_tree(copy.deepcopy(tree))
            s._trees[i] = s_tree
        samples.append(s)
    self.estimator_._model_samples = samples
    self.fitted_ = True
class ImputedBART (estimator_)

Base class for all estimators in scikit-learn.

Inheriting from this class provides default implementations of:

  • setting and getting parameters used by GridSearchCV and friends;
  • textual and HTML representation displayed in terminals and IDEs;
  • estimator serialization;
  • parameters validation;
  • data validation;
  • feature names validation.

Read more in the :ref:User Guide <rolling_your_own_estimator>.

Notes

All estimators should specify all the parameters that can be set at the class level in their __init__ as explicit keyword arguments (no *args or **kwargs).

Examples

>>> import numpy as np
>>> from sklearn.base import BaseEstimator
>>> class MyEstimator(BaseEstimator):
...     def __init__(self, *, param=1):
...         self.param = param
...     def fit(self, X, y=None):
...         self.is_fitted_ = True
...         return self
...     def predict(self, X):
...         return np.full(shape=X.shape[0], fill_value=self.param)
>>> estimator = MyEstimator(param=2)
>>> estimator.get_params()
{'param': 2}
>>> X = np.array([[1, 2], [2, 3], [3, 4]])
>>> y = np.array([1, 0, 1])
>>> estimator.fit(X, y).predict(X)
array([2, 2, 2])
>>> estimator.set_params(param=3).fit(X, y).predict(X)
array([3, 3, 3])
Expand source code
class ImputedBART(BaseEstimator):
    def __init__(self, estimator_):
        # super(ShrunkBARTRegressor, self).__init__()
        self.estimator_ = estimator_

    def predict(self, *args, **kwargs):
        return self.estimator_.predict(*args, **kwargs)

    def predict_proba(self, *args, **kwargs):
        if hasattr(self.estimator_, 'predict_proba'):
            return self.estimator_.predict_proba(*args, **kwargs)
        else:
            return NotImplemented

    def score(self, *args, **kwargs):
        if hasattr(self.estimator_, 'score'):
            return self.estimator_.score(*args, **kwargs)
        else:
            return NotImplemented

Ancestors

  • sklearn.base.BaseEstimator
  • sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
  • sklearn.utils._metadata_requests._MetadataRequester

Subclasses

Methods

def predict(self, *args, **kwargs)
Expand source code
def predict(self, *args, **kwargs):
    return self.estimator_.predict(*args, **kwargs)
def predict_proba(self, *args, **kwargs)
Expand source code
def predict_proba(self, *args, **kwargs):
    if hasattr(self.estimator_, 'predict_proba'):
        return self.estimator_.predict_proba(*args, **kwargs)
    else:
        return NotImplemented
def score(self, *args, **kwargs)
Expand source code
def score(self, *args, **kwargs):
    if hasattr(self.estimator_, 'score'):
        return self.estimator_.score(*args, **kwargs)
    else:
        return NotImplemented
class ShrunkBART (estimator_, reg_param, scheme)

Base class for all estimators in scikit-learn.

Inheriting from this class provides default implementations of:

  • setting and getting parameters used by GridSearchCV and friends;
  • textual and HTML representation displayed in terminals and IDEs;
  • estimator serialization;
  • parameters validation;
  • data validation;
  • feature names validation.

Read more in the :ref:User Guide <rolling_your_own_estimator>.

Notes

All estimators should specify all the parameters that can be set at the class level in their __init__ as explicit keyword arguments (no *args or **kwargs).

Examples

>>> import numpy as np
>>> from sklearn.base import BaseEstimator
>>> class MyEstimator(BaseEstimator):
...     def __init__(self, *, param=1):
...         self.param = param
...     def fit(self, X, y=None):
...         self.is_fitted_ = True
...         return self
...     def predict(self, X):
...         return np.full(shape=X.shape[0], fill_value=self.param)
>>> estimator = MyEstimator(param=2)
>>> estimator.get_params()
{'param': 2}
>>> X = np.array([[1, 2], [2, 3], [3, 4]])
>>> y = np.array([1, 0, 1])
>>> estimator.fit(X, y).predict(X)
array([2, 2, 2])
>>> estimator.set_params(param=3).fit(X, y).predict(X)
array([3, 3, 3])
Expand source code
class ShrunkBART(ImputedBART):

    def __init__(self, estimator_, reg_param, scheme):
        super(ShrunkBART, self).__init__(estimator_)
        self.reg_param = reg_param
        self.scheme = scheme

    def shrink_tree(self, tree):
        root = get_root_node(tree)
        tree_d_node = shrink_node(root, self.reg_param, parent_val=None, parent_num=None, cum_sum=0, scheme=self.scheme,
                                  constant=np.mean(self.estimator_.data.y.values))
        d, l = get_nodes(tree_d_node)
        tree._nodes = d + l
        return tree

    def fit(self, *args, **kwargs):
        if not self.estimator_.fitted:
            self.estimator_.fit(*args, **kwargs)
        samples = []
        for s in self.estimator_.model_samples:
            for i, tree in enumerate(s._trees):
                s_tree = self.shrink_tree(expand_tree(copy.deepcopy(tree)))
                s._trees[i] = s_tree
            samples.append(s)
        self.estimator_._model_samples = samples
        self.fitted_ = True

Ancestors

  • ImputedBART
  • sklearn.base.BaseEstimator
  • sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
  • sklearn.utils._metadata_requests._MetadataRequester

Subclasses

Methods

def fit(self, *args, **kwargs)
Expand source code
def fit(self, *args, **kwargs):
    if not self.estimator_.fitted:
        self.estimator_.fit(*args, **kwargs)
    samples = []
    for s in self.estimator_.model_samples:
        for i, tree in enumerate(s._trees):
            s_tree = self.shrink_tree(expand_tree(copy.deepcopy(tree)))
            s._trees[i] = s_tree
        samples.append(s)
    self.estimator_._model_samples = samples
    self.fitted_ = True
def shrink_tree(self, tree)
Expand source code
def shrink_tree(self, tree):
    root = get_root_node(tree)
    tree_d_node = shrink_node(root, self.reg_param, parent_val=None, parent_num=None, cum_sum=0, scheme=self.scheme,
                              constant=np.mean(self.estimator_.data.y.values))
    d, l = get_nodes(tree_d_node)
    tree._nodes = d + l
    return tree
class ShrunkBARTCV (estimator_: sklearn.base.BaseEstimator, scheme: str, reg_param_list: List[float] = [0.1, 1, 10, 50, 100, 500], cv: int = 3, scoring=None)

Base class for all estimators in scikit-learn.

Inheriting from this class provides default implementations of:

  • setting and getting parameters used by GridSearchCV and friends;
  • textual and HTML representation displayed in terminals and IDEs;
  • estimator serialization;
  • parameters validation;
  • data validation;
  • feature names validation.

Read more in the :ref:User Guide <rolling_your_own_estimator>.

Notes

All estimators should specify all the parameters that can be set at the class level in their __init__ as explicit keyword arguments (no *args or **kwargs).

Examples

>>> import numpy as np
>>> from sklearn.base import BaseEstimator
>>> class MyEstimator(BaseEstimator):
...     def __init__(self, *, param=1):
...         self.param = param
...     def fit(self, X, y=None):
...         self.is_fitted_ = True
...         return self
...     def predict(self, X):
...         return np.full(shape=X.shape[0], fill_value=self.param)
>>> estimator = MyEstimator(param=2)
>>> estimator.get_params()
{'param': 2}
>>> X = np.array([[1, 2], [2, 3], [3, 4]])
>>> y = np.array([1, 0, 1])
>>> estimator.fit(X, y).predict(X)
array([2, 2, 2])
>>> estimator.set_params(param=3).fit(X, y).predict(X)
array([3, 3, 3])
Expand source code
class ShrunkBARTCV(ShrunkBART):
    def __init__(self, estimator_: BaseEstimator, scheme: str,
                 reg_param_list: List[float] = [0.1, 1, 10, 50, 100, 500],
                 cv: int = 3, scoring=None):
        super(ShrunkBARTCV, self).__init__(estimator_, None, scheme)
        self.reg_param_list = np.array(reg_param_list)
        self.cv = cv
        self.scoring = scoring

    def fit(self, X, y, *args, **kwargs):
        self.scores_ = []
        for reg_param in self.reg_param_list:
            est = ShrunkBART(deepcopy(self.estimator_), reg_param, self.scheme)
            cv_scores = cross_val_score(est, X, y, cv=self.cv, scoring=self.scoring)
            self.scores_.append(np.mean(cv_scores))
        self.reg_param = self.reg_param_list[np.argmax(self.scores_)]
        super().fit(X=X, y=y)

Ancestors

  • ShrunkBART
  • ImputedBART
  • sklearn.base.BaseEstimator
  • sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
  • sklearn.utils._metadata_requests._MetadataRequester

Methods

def fit(self, X, y, *args, **kwargs)
Expand source code
def fit(self, X, y, *args, **kwargs):
    self.scores_ = []
    for reg_param in self.reg_param_list:
        est = ShrunkBART(deepcopy(self.estimator_), reg_param, self.scheme)
        cv_scores = cross_val_score(est, X, y, cv=self.cv, scoring=self.scoring)
        self.scores_.append(np.mean(cv_scores))
    self.reg_param = self.reg_param_list[np.argmax(self.scores_)]
    super().fit(X=X, y=y)
class SklearnModel (n_trees: int = 200, n_chains: int = 4, sigma_a: float = 0.001, sigma_b: float = 0.001, n_samples: int = 200, n_burn: int = 200, thin: float = 0.1, alpha: float = 0.95, beta: float = 2.0, store_in_sample_predictions: bool = False, store_acceptance_trace: bool = False, tree_sampler: TreeMutationSampler = <imodels.experimental.bartpy.samplers.unconstrainedtree.treemutation.UnconstrainedTreeMutationSampler object>, initializer: Optional[Initializer] = None, n_jobs=-1, classification: bool = False, max_rules=None)

The main access point to building BART models in BartPy

Parameters

n_trees : int
the number of trees to use, more trees will make a smoother fit, but slow training and fitting
n_chains : int
the number of independent chains to run more chains will improve the quality of the samples, but will require more computation
sigma_a : float
shape parameter of the prior on sigma
sigma_b : float
scale parameter of the prior on sigma
n_samples : int
how many recorded samples to take
n_burn : int
how many samples to run without recording to reach convergence
thin : float
percentage of samples to store. use this to save memory when running large models
p_grow : float
probability of choosing a grow mutation in tree mutation sampling
p_prune : float
probability of choosing a prune mutation in tree mutation sampling
alpha : float
prior parameter on tree structure
beta : float
prior parameter on tree structure
store_in_sample_predictions : bool
whether to store full prediction samples set to False if you don't need in sample results - saves a lot of memory
store_acceptance_trace : bool
whether to store acceptance rates of the gibbs samples unless you're very memory constrained, you wouldn't want to set this to false useful for diagnostics
tree_sampler : TreeMutationSampler
Method of sampling used on trees defaults to bartpy.samplers.unconstrainedtree
initializer : Initializer
Class that handles the initialization of tree structure and leaf values
n_jobs : int
how many cores to use when computing MCMC samples set to -1 to use all cores
Expand source code
class SklearnModel(BaseEstimator, RegressorMixin):
    """
    The main access point to building BART models in BartPy

    Parameters
    ----------
    n_trees: int
        the number of trees to use, more trees will make a smoother fit, but slow training and fitting
    n_chains: int
        the number of independent chains to run
        more chains will improve the quality of the samples, but will require more computation
    sigma_a: float
        shape parameter of the prior on sigma
    sigma_b: float
        scale parameter of the prior on sigma
    n_samples: int
        how many recorded samples to take
    n_burn: int
        how many samples to run without recording to reach convergence
    thin: float
        percentage of samples to store.
        use this to save memory when running large models
    p_grow: float
        probability of choosing a grow mutation in tree mutation sampling
    p_prune: float
        probability of choosing a prune mutation in tree mutation sampling
    alpha: float
        prior parameter on tree structure
    beta: float
        prior parameter on tree structure
    store_in_sample_predictions: bool
        whether to store full prediction samples
        set to False if you don't need in sample results - saves a lot of memory
    store_acceptance_trace: bool
        whether to store acceptance rates of the gibbs samples
        unless you're very memory constrained, you wouldn't want to set this to false
        useful for diagnostics
    tree_sampler: TreeMutationSampler
        Method of sampling used on trees
        defaults to `bartpy.samplers.unconstrainedtree`
    initializer: Initializer
        Class that handles the initialization of tree structure and leaf values
    n_jobs: int
        how many cores to use when computing MCMC samples
        set to `-1` to use all cores
    """

    def __init__(self,
                 n_trees: int = 200,
                 n_chains: int = 4,
                 sigma_a: float = 0.001,
                 sigma_b: float = 0.001,
                 n_samples: int = 200,
                 n_burn: int = 200,
                 thin: float = 0.1,
                 alpha: float = 0.95,
                 beta: float = 2.,
                 store_in_sample_predictions: bool = False,
                 store_acceptance_trace: bool = False,
                 tree_sampler: TreeMutationSampler = get_tree_sampler(0.5, 0.5),
                 initializer: Optional[Initializer] = None,
                 n_jobs=-1,
                 classification: bool = False,
                 max_rules=None):
        self.n_trees = n_trees
        self.n_chains = n_chains
        self.sigma_a = sigma_a
        self.sigma_b = sigma_b
        self.n_burn = n_burn
        self.n_samples = n_samples
        self.p_grow = 0.5
        self.p_prune = 0.5
        self.alpha = alpha
        self.beta = beta
        self.thin = thin
        self.n_jobs = n_jobs
        self.store_in_sample_predictions = store_in_sample_predictions
        self.store_acceptance_trace = store_acceptance_trace
        self.columns = None
        self.tree_sampler = tree_sampler
        self.initializer = initializer
        self.schedule = SampleSchedule(self.tree_sampler, LeafNodeSampler(), SigmaSampler())
        self.sampler = ModelSampler(self.schedule)
        self.classification = classification
        self.max_rules = max_rules

        self.sigma, self.data, self.model, self._prediction_samples, self._model_samples, self.extract = [None] * 6

    def fit(self, X: Union[np.ndarray, pd.DataFrame], y: np.ndarray) -> 'SklearnModel':
        """
        Learn the model based on training data

        Parameters
        ----------
        X: pd.DataFrame
            training covariates
        y: np.ndarray
            training targets

        Returns
        -------
        SklearnModel
            self with trained parameter values
        """
        self.model = self._construct_model(X, y)
        self.extract = Parallel(n_jobs=self.n_jobs)(self.f_delayed_chains(X, y))
        self.combined_chains = self._combine_chains(self.extract)
        self._model_samples, self._prediction_samples = self.combined_chains["model"], self.combined_chains[
            "in_sample_predictions"]
        self._acceptance_trace = self.combined_chains["acceptance"]
        self._likelihood = self.combined_chains["likelihood"]
        self._probs = self.combined_chains["probs"]

        self.fitted_ = True
        return self

    @property
    def fitted(self):
        if hasattr(self, "fitted_"):
            return self.fitted_
        return False

    @property
    def complexity_(self):
        if hasattr(self.initializer, "_tree"):
            estimator = self.initializer._tree
            if hasattr(estimator, 'complexity_'):
                return estimator.complexity_

    @staticmethod
    def _combine_chains(extract: List[Chain]) -> Chain:
        keys = list(extract[0].keys())
        combined = {}
        for key in keys:
            combined[key] = np.concatenate([chain[key] for chain in extract], axis=0)
        return combined

    @staticmethod
    def _convert_covariates_to_data(X: np.ndarray, y: np.ndarray) -> Data:
        from copy import deepcopy
        if type(X) == pd.DataFrame:
            X: pd.DataFrame = X
            X = X.values
        return Data(deepcopy(X), deepcopy(y), normalize=True)

    def _construct_model(self, X: np.ndarray, y: np.ndarray) -> Model:
        if len(X) == 0 or X.shape[1] == 0:
            raise ValueError("Empty covariate matrix passed")
        self.data = self._convert_covariates_to_data(X, y)
        self.sigma = Sigma(self.sigma_a, self.sigma_b, self.data.y.normalizing_scale, self.classification)
        self.model = Model(self.data,
                           self.sigma,
                           n_trees=self.n_trees,
                           alpha=self.alpha,
                           beta=self.beta,
                           initializer=self.initializer,
                           classification=self.classification)
        n_trees = self.n_trees if self.initializer is None else self.initializer.n_trees
        self.n_trees = n_trees
        return self.model

    def f_delayed_chains(self, X: np.ndarray, y: np.ndarray):
        """
        Access point for getting access to delayed methods for running chains
        Useful for when you want to run multiple instances of the model in parallel
        e.g. when calculating a null distribution for feature importance

        Parameters
        ----------
        X: np.ndarray
            Covariate matrix
        y: np.ndarray
            Target array

        Returns
        -------
        List[Callable[[], ChainExtract]]
        """
        return [delayed(x)(self, X, y) for x in self.f_chains()]

    def f_chains(self) -> List[Callable[[], Chain]]:
        """
        List of methods to run MCMC chains
        Useful for running multiple models in parallel

        Returns
        -------
        List[Callable[[], Extract]]
            List of method to run individual chains
            Length of n_chains
        """
        return [delayed_run_chain() for _ in range(self.n_chains)]

    def predict(self, X: np.ndarray = None) -> np.ndarray:
        """
        Predict the target corresponding to the provided covariate matrix
        If X is None, will predict based on training covariates

        Prediction is based on the mean of all samples

        Parameters
        ----------
        X: pd.DataFrame
            covariates to predict from

        Returns
        -------
        np.ndarray
            predictions for the X covariates
        """
        if X is None and self.store_in_sample_predictions:
            return self.data.y.unnormalize_y(np.mean(self._prediction_samples, axis=0))
        elif X is None and not self.store_in_sample_predictions:
            raise ValueError(
                "In sample predictions only possible if model.store_in_sample_predictions is `True`.  Either set the parameter to True or pass a non-None X parameter")
        else:
            predictions = self._out_of_sample_predict(X)
            if self.classification:
                return np.round(predictions, 0)
            return predictions

    def predict_proba(self, X: np.ndarray = None) -> np.ndarray:
        preds = self._out_of_sample_predict(X)
        return np.stack([preds, 1 - preds], axis=1)

    def residuals(self, X=None, y=None) -> np.ndarray:
        """
        Array of error for each observation

        Parameters
        ----------
        X: np.ndarray
            Covariate matrix
        y: np.ndarray
            Target array

        Returns
        -------
        np.ndarray
            Error for each observation
        """
        if y is None:
            return self.model.data.y.unnormalized_y - self.predict(X)
        else:
            return y - self.predict(X)

    def l2_error(self, X=None, y=None) -> np.ndarray:
        """
        Calculate the squared errors for each row in the covariate matrix

        Parameters
        ----------
        X: np.ndarray
            Covariate matrix
        y: np.ndarray
            Target array
        Returns
        -------
        np.ndarray
            Squared error for each observation
        """
        return np.square(self.residuals(X, y))

    def rmse(self, X, y) -> float:
        """
        The total RMSE error of the model
        The sum of squared errors over all observations

        Parameters
        ----------
        X: np.ndarray
            Covariate matrix
        y: np.ndarray
            Target array

        Returns
        -------
        float
            The total summed L2 error for the model
        """
        return np.sqrt(np.sum(self.l2_error(X, y)))

    def _chain_pred_arr(self, X, chain_number):
        chain_len = int(self.n_samples)
        samples_chain = self._model_samples[chain_number * chain_len: (chain_number + 1) * chain_len]
        predictions_transformed = [x.predict(X) for x in samples_chain]
        return predictions_transformed

    def predict_chain(self, X, chain_number):
        predictions_transformed = self._chain_pred_arr(X, chain_number)
        predictions = self.data.y.unnormalize_y(np.mean(predictions_transformed, axis=0))
        if self.classification:
            predictions = scipy.stats.norm.cdf(predictions)
        return predictions

    def chain_mse_std(self, X, y, chain_number):
        predictions_transformed = self._chain_pred_arr(X, chain_number)
        predictions_std = np.std(
            [mean_squared_error(self.data.y.unnormalize_y(preds), y) for preds in predictions_transformed])
        return predictions_std

    def chain_predictions(self, X, chain_number):
        predictions_transformed = self._chain_pred_arr(X, chain_number)
        preds_arr = [self.data.y.unnormalize_y(preds) for preds in predictions_transformed]
        return preds_arr

    def between_chains_var(self, X):
        all_predictions = np.stack([self.data.y.unnormalize_y(x.predict(X)) for x in self._model_samples], axis=1)

        def _get_var(preds_arr):
            mean_pred = preds_arr.mean(axis=1)
            var = np.mean((preds_arr - np.expand_dims(mean_pred, 1)) ** 2)
            return var

        total_var = _get_var(all_predictions)
        within_chain_var = 0
        for c in range(self.n_chains):
            chain_preds = self._chain_pred_arr(X, c)
            within_chain_var += _get_var(np.stack(chain_preds, axis=1))
        return total_var - within_chain_var

    def _out_of_sample_predict(self, X):
        samples = self._model_samples
        predictions_transformed = [x.predict(X) for x in samples]
        predictions = self.data.y.unnormalize_y(np.mean(predictions_transformed, axis=0))
        if self.classification:
            predictions = scipy.stats.norm.cdf(predictions)
        return predictions

    def fit_predict(self, X, y):
        self.fit(X, y)
        if self.store_in_sample_predictions:
            return self.predict()
        else:
            return self.predict(X)

    @property
    def model_samples(self) -> List[Model]:
        """
        Array of the model as it was after each sample.
        Useful for examining for:

         - examining the state of trees, nodes and sigma throughout the sampling
         - out of sample prediction

        Returns None if the model hasn't been fit

        Returns
        -------
        List[Model]
        """
        return self._model_samples

    @property
    def acceptance_trace(self) -> List[Mapping[str, float]]:
        """
        List of Mappings from variable name to acceptance rates

        Each entry is the acceptance rate of the variable in each iteration of the model

        Returns
        -------
        List[Mapping[str, float]]
        """
        return self._acceptance_trace

    @property
    def likelihood(self) -> List:
        """
        List of Mappings from variable name to likelihood

        Each entry is the acceptance rate of the variable in each iteration of the model

        Returns
        -------
        List[Mapping[str, float]]
        """
        return self._likelihood

    @property
    def probs(self) -> List:
        """
        List of Mappings from variable name to likelihood

        Each entry is the acceptance rate of the variable in each iteration of the model

        Returns
        -------
        List[Mapping[str, float]]
        """
        return self._probs

    @property
    def prediction_samples(self) -> np.ndarray:
        """
        Matrix of prediction samples at each point in sampling
        Useful for assessing convergence, calculating point estimates etc.

        Returns
        -------
        np.ndarray
            prediction samples with dimensionality n_samples * n_points
        """
        return self.prediction_samples

    def from_extract(self, extract: List[Chain], X: np.ndarray, y: np.ndarray) -> 'SklearnModel':
        """
        Create a copy of the model using an extract
        Useful for doing operations on extracts created in external processes like feature selection
        Parameters
        ----------
        extract: Extract
            samples produced by delayed chain methods
        X: np.ndarray
            Covariate matrix
        y: np.ndarray
            Target variable

        Returns
        -------
        SklearnModel
            Copy of the current model with samples
        """
        new_model = deepcopy(self)
        combined_chain = self._combine_chains(extract)
        self._model_samples, self._prediction_samples = combined_chain["model"], combined_chain["in_sample_predictions"]
        self._acceptance_trace = combined_chain["acceptance"]
        new_model.data = self._convert_covariates_to_data(X, y)
        return new_model

Ancestors

  • sklearn.base.BaseEstimator
  • sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
  • sklearn.utils._metadata_requests._MetadataRequester
  • sklearn.base.RegressorMixin

Subclasses

Instance variables

var acceptance_trace : List[Mapping[str, float]]

List of Mappings from variable name to acceptance rates

Each entry is the acceptance rate of the variable in each iteration of the model

Returns

List[Mapping[str, float]]
 
Expand source code
@property
def acceptance_trace(self) -> List[Mapping[str, float]]:
    """
    List of Mappings from variable name to acceptance rates

    Each entry is the acceptance rate of the variable in each iteration of the model

    Returns
    -------
    List[Mapping[str, float]]
    """
    return self._acceptance_trace
var complexity_
Expand source code
@property
def complexity_(self):
    if hasattr(self.initializer, "_tree"):
        estimator = self.initializer._tree
        if hasattr(estimator, 'complexity_'):
            return estimator.complexity_
var fitted
Expand source code
@property
def fitted(self):
    if hasattr(self, "fitted_"):
        return self.fitted_
    return False
var likelihood : List

List of Mappings from variable name to likelihood

Each entry is the acceptance rate of the variable in each iteration of the model

Returns

List[Mapping[str, float]]
 
Expand source code
@property
def likelihood(self) -> List:
    """
    List of Mappings from variable name to likelihood

    Each entry is the acceptance rate of the variable in each iteration of the model

    Returns
    -------
    List[Mapping[str, float]]
    """
    return self._likelihood
var model_samples : List[Model]

Array of the model as it was after each sample. Useful for examining for:

  • examining the state of trees, nodes and sigma throughout the sampling
  • out of sample prediction

Returns None if the model hasn't been fit

Returns

List[Model]
 
Expand source code
@property
def model_samples(self) -> List[Model]:
    """
    Array of the model as it was after each sample.
    Useful for examining for:

     - examining the state of trees, nodes and sigma throughout the sampling
     - out of sample prediction

    Returns None if the model hasn't been fit

    Returns
    -------
    List[Model]
    """
    return self._model_samples
var prediction_samples : numpy.ndarray

Matrix of prediction samples at each point in sampling Useful for assessing convergence, calculating point estimates etc.

Returns

np.ndarray
prediction samples with dimensionality n_samples * n_points
Expand source code
@property
def prediction_samples(self) -> np.ndarray:
    """
    Matrix of prediction samples at each point in sampling
    Useful for assessing convergence, calculating point estimates etc.

    Returns
    -------
    np.ndarray
        prediction samples with dimensionality n_samples * n_points
    """
    return self.prediction_samples
var probs : List

List of Mappings from variable name to likelihood

Each entry is the acceptance rate of the variable in each iteration of the model

Returns

List[Mapping[str, float]]
 
Expand source code
@property
def probs(self) -> List:
    """
    List of Mappings from variable name to likelihood

    Each entry is the acceptance rate of the variable in each iteration of the model

    Returns
    -------
    List[Mapping[str, float]]
    """
    return self._probs

Methods

def between_chains_var(self, X)
Expand source code
def between_chains_var(self, X):
    all_predictions = np.stack([self.data.y.unnormalize_y(x.predict(X)) for x in self._model_samples], axis=1)

    def _get_var(preds_arr):
        mean_pred = preds_arr.mean(axis=1)
        var = np.mean((preds_arr - np.expand_dims(mean_pred, 1)) ** 2)
        return var

    total_var = _get_var(all_predictions)
    within_chain_var = 0
    for c in range(self.n_chains):
        chain_preds = self._chain_pred_arr(X, c)
        within_chain_var += _get_var(np.stack(chain_preds, axis=1))
    return total_var - within_chain_var
def chain_mse_std(self, X, y, chain_number)
Expand source code
def chain_mse_std(self, X, y, chain_number):
    predictions_transformed = self._chain_pred_arr(X, chain_number)
    predictions_std = np.std(
        [mean_squared_error(self.data.y.unnormalize_y(preds), y) for preds in predictions_transformed])
    return predictions_std
def chain_predictions(self, X, chain_number)
Expand source code
def chain_predictions(self, X, chain_number):
    predictions_transformed = self._chain_pred_arr(X, chain_number)
    preds_arr = [self.data.y.unnormalize_y(preds) for preds in predictions_transformed]
    return preds_arr
def f_chains(self) ‑> List[Callable[[], Mapping[str, Union[List[Any], numpy.ndarray]]]]

List of methods to run MCMC chains Useful for running multiple models in parallel

Returns

List[Callable[[], Extract]]
List of method to run individual chains Length of n_chains
Expand source code
def f_chains(self) -> List[Callable[[], Chain]]:
    """
    List of methods to run MCMC chains
    Useful for running multiple models in parallel

    Returns
    -------
    List[Callable[[], Extract]]
        List of method to run individual chains
        Length of n_chains
    """
    return [delayed_run_chain() for _ in range(self.n_chains)]
def f_delayed_chains(self, X: numpy.ndarray, y: numpy.ndarray)

Access point for getting access to delayed methods for running chains Useful for when you want to run multiple instances of the model in parallel e.g. when calculating a null distribution for feature importance

Parameters

X : np.ndarray
Covariate matrix
y : np.ndarray
Target array

Returns

List[Callable[[], ChainExtract]]
 
Expand source code
def f_delayed_chains(self, X: np.ndarray, y: np.ndarray):
    """
    Access point for getting access to delayed methods for running chains
    Useful for when you want to run multiple instances of the model in parallel
    e.g. when calculating a null distribution for feature importance

    Parameters
    ----------
    X: np.ndarray
        Covariate matrix
    y: np.ndarray
        Target array

    Returns
    -------
    List[Callable[[], ChainExtract]]
    """
    return [delayed(x)(self, X, y) for x in self.f_chains()]
def fit(self, X: Union[numpy.ndarray, pandas.core.frame.DataFrame], y: numpy.ndarray) ‑> SklearnModel

Learn the model based on training data

Parameters

X : pd.DataFrame
training covariates
y : np.ndarray
training targets

Returns

SklearnModel
self with trained parameter values
Expand source code
def fit(self, X: Union[np.ndarray, pd.DataFrame], y: np.ndarray) -> 'SklearnModel':
    """
    Learn the model based on training data

    Parameters
    ----------
    X: pd.DataFrame
        training covariates
    y: np.ndarray
        training targets

    Returns
    -------
    SklearnModel
        self with trained parameter values
    """
    self.model = self._construct_model(X, y)
    self.extract = Parallel(n_jobs=self.n_jobs)(self.f_delayed_chains(X, y))
    self.combined_chains = self._combine_chains(self.extract)
    self._model_samples, self._prediction_samples = self.combined_chains["model"], self.combined_chains[
        "in_sample_predictions"]
    self._acceptance_trace = self.combined_chains["acceptance"]
    self._likelihood = self.combined_chains["likelihood"]
    self._probs = self.combined_chains["probs"]

    self.fitted_ = True
    return self
def fit_predict(self, X, y)
Expand source code
def fit_predict(self, X, y):
    self.fit(X, y)
    if self.store_in_sample_predictions:
        return self.predict()
    else:
        return self.predict(X)
def from_extract(self, extract: List[Mapping[str, Union[List[Any], numpy.ndarray]]], X: numpy.ndarray, y: numpy.ndarray) ‑> SklearnModel

Create a copy of the model using an extract Useful for doing operations on extracts created in external processes like feature selection Parameters


extract : Extract
samples produced by delayed chain methods
X : np.ndarray
Covariate matrix
y : np.ndarray
Target variable

Returns

SklearnModel
Copy of the current model with samples
Expand source code
def from_extract(self, extract: List[Chain], X: np.ndarray, y: np.ndarray) -> 'SklearnModel':
    """
    Create a copy of the model using an extract
    Useful for doing operations on extracts created in external processes like feature selection
    Parameters
    ----------
    extract: Extract
        samples produced by delayed chain methods
    X: np.ndarray
        Covariate matrix
    y: np.ndarray
        Target variable

    Returns
    -------
    SklearnModel
        Copy of the current model with samples
    """
    new_model = deepcopy(self)
    combined_chain = self._combine_chains(extract)
    self._model_samples, self._prediction_samples = combined_chain["model"], combined_chain["in_sample_predictions"]
    self._acceptance_trace = combined_chain["acceptance"]
    new_model.data = self._convert_covariates_to_data(X, y)
    return new_model
def l2_error(self, X=None, y=None) ‑> numpy.ndarray

Calculate the squared errors for each row in the covariate matrix

Parameters

X : np.ndarray
Covariate matrix
y : np.ndarray
Target array

Returns

np.ndarray
Squared error for each observation
Expand source code
def l2_error(self, X=None, y=None) -> np.ndarray:
    """
    Calculate the squared errors for each row in the covariate matrix

    Parameters
    ----------
    X: np.ndarray
        Covariate matrix
    y: np.ndarray
        Target array
    Returns
    -------
    np.ndarray
        Squared error for each observation
    """
    return np.square(self.residuals(X, y))
def predict(self, X: numpy.ndarray = None) ‑> numpy.ndarray

Predict the target corresponding to the provided covariate matrix If X is None, will predict based on training covariates

Prediction is based on the mean of all samples

Parameters

X : pd.DataFrame
covariates to predict from

Returns

np.ndarray
predictions for the X covariates
Expand source code
def predict(self, X: np.ndarray = None) -> np.ndarray:
    """
    Predict the target corresponding to the provided covariate matrix
    If X is None, will predict based on training covariates

    Prediction is based on the mean of all samples

    Parameters
    ----------
    X: pd.DataFrame
        covariates to predict from

    Returns
    -------
    np.ndarray
        predictions for the X covariates
    """
    if X is None and self.store_in_sample_predictions:
        return self.data.y.unnormalize_y(np.mean(self._prediction_samples, axis=0))
    elif X is None and not self.store_in_sample_predictions:
        raise ValueError(
            "In sample predictions only possible if model.store_in_sample_predictions is `True`.  Either set the parameter to True or pass a non-None X parameter")
    else:
        predictions = self._out_of_sample_predict(X)
        if self.classification:
            return np.round(predictions, 0)
        return predictions
def predict_chain(self, X, chain_number)
Expand source code
def predict_chain(self, X, chain_number):
    predictions_transformed = self._chain_pred_arr(X, chain_number)
    predictions = self.data.y.unnormalize_y(np.mean(predictions_transformed, axis=0))
    if self.classification:
        predictions = scipy.stats.norm.cdf(predictions)
    return predictions
def predict_proba(self, X: numpy.ndarray = None) ‑> numpy.ndarray
Expand source code
def predict_proba(self, X: np.ndarray = None) -> np.ndarray:
    preds = self._out_of_sample_predict(X)
    return np.stack([preds, 1 - preds], axis=1)
def residuals(self, X=None, y=None) ‑> numpy.ndarray

Array of error for each observation

Parameters

X : np.ndarray
Covariate matrix
y : np.ndarray
Target array

Returns

np.ndarray
Error for each observation
Expand source code
def residuals(self, X=None, y=None) -> np.ndarray:
    """
    Array of error for each observation

    Parameters
    ----------
    X: np.ndarray
        Covariate matrix
    y: np.ndarray
        Target array

    Returns
    -------
    np.ndarray
        Error for each observation
    """
    if y is None:
        return self.model.data.y.unnormalized_y - self.predict(X)
    else:
        return y - self.predict(X)
def rmse(self, X, y) ‑> float

The total RMSE error of the model The sum of squared errors over all observations

Parameters

X : np.ndarray
Covariate matrix
y : np.ndarray
Target array

Returns

float
The total summed L2 error for the model
Expand source code
def rmse(self, X, y) -> float:
    """
    The total RMSE error of the model
    The sum of squared errors over all observations

    Parameters
    ----------
    X: np.ndarray
        Covariate matrix
    y: np.ndarray
        Target array

    Returns
    -------
    float
        The total summed L2 error for the model
    """
    return np.sqrt(np.sum(self.l2_error(X, y)))
def set_score_request(self: SklearnModel, *, sample_weight: Union[bool, ForwardRef(None), str] = '$UNCHANGED$') ‑> SklearnModel

Request metadata passed to the score method.

Note that this method is only relevant if enable_metadata_routing=True (see :func:sklearn.set_config). Please see :ref:User Guide <metadata_routing> on how the routing mechanism works.

The options for each parameter are:

  • True: metadata is requested, and passed to score if provided. The request is ignored if metadata is not provided.

  • False: metadata is not requested and the meta-estimator will not pass it to score.

  • None: metadata is not requested, and the meta-estimator will raise an error if the user provides it.

  • str: metadata should be passed to the meta-estimator with this given alias instead of the original name.

The default (sklearn.utils.metadata_routing.UNCHANGED) retains the existing request. This allows you to change the request for some parameters and not others.

Added in version: 1.3

Note

This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:~sklearn.pipeline.Pipeline. Otherwise it has no effect.

Parameters

sample_weight : str, True, False, or None, default=sklearn.utils.metadata_routing.UNCHANGED
Metadata routing for sample_weight parameter in score.

Returns

self : object
The updated object.
Expand source code
def func(*args, **kw):
    """Updates the request for provided parameters

    This docstring is overwritten below.
    See REQUESTER_DOC for expected functionality
    """
    if not _routing_enabled():
        raise RuntimeError(
            "This method is only available when metadata routing is enabled."
            " You can enable it using"
            " sklearn.set_config(enable_metadata_routing=True)."
        )

    if self.validate_keys and (set(kw) - set(self.keys)):
        raise TypeError(
            f"Unexpected args: {set(kw) - set(self.keys)} in {self.name}. "
            f"Accepted arguments are: {set(self.keys)}"
        )

    # This makes it possible to use the decorated method as an unbound method,
    # for instance when monkeypatching.
    # https://github.com/scikit-learn/scikit-learn/issues/28632
    if instance is None:
        _instance = args[0]
        args = args[1:]
    else:
        _instance = instance

    # Replicating python's behavior when positional args are given other than
    # `self`, and `self` is only allowed if this method is unbound.
    if args:
        raise TypeError(
            f"set_{self.name}_request() takes 0 positional argument but"
            f" {len(args)} were given"
        )

    requests = _instance._get_metadata_request()
    method_metadata_request = getattr(requests, self.name)

    for prop, alias in kw.items():
        if alias is not UNCHANGED:
            method_metadata_request.add_request(param=prop, alias=alias)
    _instance._metadata_request = requests

    return _instance