Expand source code
import numpy as np
from collections import defaultdict

from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.decomposition import PCA
from sklearn.ensemble import BaseEnsemble


class LocalDecisionStump:
    """
    An object that implements a callable local decision stump function and that also includes some meta-data that
    allows for an API to interact with other methods in the package.

    A local decision stump is a tri-valued function that is zero outside of a rectangular region, and on that region,
    takes either a positive or negative value, depending on whether a single designated feature is above or below a
    threshold. For more information on what a local decision stump is, refer to our paper.

    :param feature: int
        Feature used in the decision stump
    :param threshold: float
        Threshold used in the decision stump
    :param left_val: float
        The value taken when x_k <= threshold
    :param right_val: float
        The value taken when x_k > threshold
    :param a_features: list of ints
        List of ancestor feature indices (ordered from highest ancestor to lowest)
    :param a_thresholds: list of floats
        List of ancestor thresholds (ordered from highest ancestor to lowest)
    :param a_signs: list of bools
        List of signs indicating whether the current node is in the left child (False) or right child (True) of the
        ancestor nodes (ordered from highest ancestor to lowest)
    """

    def __init__(self, feature, threshold, left_val, right_val, a_features, a_thresholds, a_signs):
        self.feature = feature
        self.threshold = threshold
        self.left_val = left_val
        self.right_val = right_val
        self.a_features = a_features
        self.a_thresholds = a_thresholds
        self.a_signs = a_signs

    def __call__(self, data):
        """
        Return values of the local decision stump function on an input data matrix with samples as rows

        :param data: array-like of shape (n_samples, n_features)
            Data matrix to feed into the local decision stump function
        :return: array-like of shape (n_samples,)
            Function values on the data
        """

        root_to_stump_path_indicators = _compare_all(data, self.a_features, np.array(self.a_thresholds),
                                                     np.array(self.a_signs))
        in_node = np.all(root_to_stump_path_indicators, axis=1).astype(int)
        is_right = _compare(data, self.feature, self.threshold).astype(int)
        result = in_node * (is_right * self.right_val + (1 - is_right) * self.left_val)

        return result

    def __repr__(self):
        return f"LocalDecisionStump(feature={self.feature}, threshold={self.threshold}, left_val={self.left_val}, " \
               f"right_val={self.right_val}, a_features={self.a_features}, a_thresholds={self.a_thresholds}, " \
               f"a_signs={self.a_signs})"


def make_stump(node_no, tree_struct, parent_stump, is_right_child, normalize=False):
    """
    Create a single local decision stump corresponding to a node in a scikit-learn tree structure object.
    The nonzero values of the stump are chosen so that the vector of local decision stump values over the training
    set (used to fit the tree) is orthogonal to those of all ancestor nodes.

    :param node_no: int
        The index of the node
    :param tree_struct: object
        The scikit-learn tree object
    :param parent_stump: LocalDecisionStump object
        The local decision stump corresponding to the parent of the node in question
    :param is_right_child: bool
        True if the new node is the right child of the parent node, False otherwise
    :param normalize: bool
        Flag. If set to True, then divide the nonzero function values by sqrt(n_samples in node) so that the
        vector of function values on the training set has unit norm. If False, then do not divide, so that the
        vector of function values on the training set has norm equal to n_samples in node.
    :return: LocalDecisionStump object
        The local decision stump corresponding to the node in question
    """
    # Get features, thresholds and signs for ancestors
    if parent_stump is None:  # If root node
        a_features = []
        a_thresholds = []
        a_signs = []
    else:
        a_features = parent_stump.a_features + [parent_stump.feature]
        a_thresholds = parent_stump.a_thresholds + [parent_stump.threshold]
        a_signs = parent_stump.a_signs + [is_right_child]
    # Get indices for left and right children of the node in question
    left_child = tree_struct.children_left[node_no]
    right_child = tree_struct.children_right[node_no]
    # Get quantities relevant to the node in question
    feature = tree_struct.feature[node_no]
    threshold = tree_struct.threshold[node_no]
    left_size = tree_struct.n_node_samples[left_child]
    right_size = tree_struct.n_node_samples[right_child]
    parent_size = tree_struct.n_node_samples[node_no]
    normalization = parent_size if normalize else 1
    left_val = - np.sqrt(right_size / (left_size * normalization))
    right_val = np.sqrt(left_size / (right_size * normalization))

    return LocalDecisionStump(feature, threshold, left_val, right_val, a_features, a_thresholds, a_signs)


def make_stumps(tree_struct, normalize=False):
    """
    Create a collection of local decision stumps corresponding to all internal nodes in a scikit-learn tree structure
    object.

    :param tree_struct: object
        The scikit-learn tree object
    :param normalize: bool
        Flag. If set to True, then divide the nonzero function values by sqrt(n_samples in node) so that the
        vector of function values on the training set has unit norm. If False, then do not divide, so that the
        vector of function values on the training set has norm equal to n_samples in node.
    :return:
        stumps: list of LocalDecisionStump objects
            The local decision stumps corresponding to all internal node in the tree structure
        num_splits_per_feature: array-like of shape (n_features,)
            The number of splits in the tree on each original feature
    """
    stumps = []
    num_splits_per_feature = [0] * tree_struct.n_features

    def make_stump_iter(node_no, tree_struct, parent_stump, is_right_child, normalize, stumps, num_splits_per_feature):
        """
        Helper function for iteratively making local decision stump objects and appending them to the list stumps.
        """
        new_stump = make_stump(node_no, tree_struct, parent_stump, is_right_child, normalize)
        stumps.append(new_stump)
        num_splits_per_feature[new_stump.feature] += 1
        left_child = tree_struct.children_left[node_no]
        right_child = tree_struct.children_right[node_no]
        if tree_struct.feature[left_child] != -2:  # is not leaf
            make_stump_iter(left_child, tree_struct, new_stump, False, normalize, stumps, num_splits_per_feature)
        if tree_struct.feature[right_child] != -2:  # is not leaf
            make_stump_iter(right_child, tree_struct, new_stump, True, normalize, stumps, num_splits_per_feature)

    make_stump_iter(0, tree_struct, None, None, normalize, stumps, num_splits_per_feature)
    return stumps, num_splits_per_feature


def tree_feature_transform(stumps, X):
    """
    Transform the data matrix X using a mapping derived from a collection of local decision stump functions.

    :param stumps: list of LocalDecisionStump objects
        List of stump functions to use to transform data
    :param X: array-like of shape (n_samples, n_features)
        Original data matrix
    :return: X_transformed: array-like of shape (n_samples, n_stumps)
        Transformed data matrix
    """
    transformed_feature_vectors = []
    for stump in stumps:
        transformed_feature_vec = stump(X)
        transformed_feature_vectors.append(transformed_feature_vec)
    X_transformed = np.vstack(transformed_feature_vectors).T

    return X_transformed


class TreeTransformer(TransformerMixin, BaseEstimator):
    """
    A transformer that transforms data using a representation built from local decision stumps from a tree or tree
    ensemble. The transformer also comes with meta data on the local decision stumps and methods that allow
    for transformations using sub-representations corresponding to each of the original features.

    :param estimator: scikit-learn estimator
        The scikit-learn tree or tree ensemble estimator object
    :param pca: bool
        Flag, if False, the sub-representation for each original feature is just the concatenation of the local
        decision stumps splitting on that feature. If true, the sub-representation are the principal components of the
        set of local decision stump vectors
    :param max_components_type: {"median_splits", "max_splits", "nsamples", "nstumps", "min_nsamples_nstumps",
        "min_fracnsamples_nstumps"} or int
        Method for choosing the max number of components for PCA transformer for each sub-representation corresponding
        to an original feature:
            - If "median_splits", then max_components is alpha * median number of splits on the original feature
              among trees in the estimator
            - If "max_splits", then max_components is alpha * maximum number of splits on the original feature among
              trees in the estimator
            - If "nsamples", then max_components is alpha * n_samples
            - If "nstumps", then max_components is alpha * n_stumps
            - If "min_nsamples_nstumps", then max_components is alpha * min(n_samples, n_stumps), where n_stumps is
              total number of local decision stumps splitting on that feature in the ensemble
            - If "min_fracnsamples_nstumps", then max_components is min(alpha * n_samples, n_stumps), where n_stumps is
              total number of local decision stumps splitting on that feature in the ensemble
            - If int, then max_components is the given integer
    :param alpha: float
        Parameter for adjusting the max number of components for PCA.
    :param normalize: bool
        Flag. If set to True, then divide the nonzero function values for each local decision stump by
        sqrt(n_samples in node) so that the vector of function values on the training set has unit norm. If False,
        then do not divide, so that the vector of function values on the training set has norm equal to n_samples
        in node.
    """

    def __init__(self, estimator, pca=True, max_components_type="min_fracnsamples_nstumps", alpha=0.5, normalize=False):
        self.estimator = estimator
        self.pca = pca
        self.max_components_type = max_components_type
        self.alpha = alpha
        self.normalize = normalize
        # Check if single tree or tree ensemble
        tree_models = estimator.estimators_ if isinstance(estimator, BaseEnsemble) else [estimator]
        # Make stumps for each tree
        num_splits_per_feature_all = []
        self.all_stumps = []
        for tree_model in tree_models:
            tree_stumps, num_splits_per_feature = make_stumps(tree_model.tree_, normalize)
            self.all_stumps += tree_stumps
            num_splits_per_feature_all.append(num_splits_per_feature)
        # Identify the stumps that split on feature k, for each k
        self._original_feat_to_stump_mapping = defaultdict(list)
        for idx, stump in enumerate(self.all_stumps):
            self._original_feat_to_stump_mapping[stump.feature].append(idx)
        # Obtain the median and max number of splits on each feature across trees
        self.median_splits = np.median(num_splits_per_feature_all, axis=0)
        self.max_splits = np.max(num_splits_per_feature_all, axis=0)
        # Initialize list of PCA transformers, one for each set of stumps corresponding to each original feature
        self.pca_transformers = defaultdict(lambda: None)

    def fit(self, X, y=None):

        def pca_on_stumps(k):
            """
            Helper function to fit PCA transformer on stumps corresponding to original feature k
            """
            restricted_stumps = self.get_stumps_for_feature(k)
            n_stumps = len(restricted_stumps)
            n_samples = X.shape[0]
            # Get the number of components to use for PCA
            if self.max_components_type == 'median_splits':
                max_components = int(self.median_splits[k] * self.alpha)
            elif self.max_components_type == "max_splits":
                max_components = int(self.max_splits[k] * self.alpha)
            elif self.max_components_type == "nsamples":
                max_components = int(n_samples * self.alpha)
            elif self.max_components_type == "nstumps":
                max_components = int(n_stumps * self.alpha)
            elif self.max_components_type == "min_nsamples_nstumps":
                max_components = int(min(n_samples, n_stumps) * self.alpha)
            elif self.max_components_type == "min_fracnsamples_nstumps":
                max_components = int(min(n_samples * self.alpha, n_stumps))
            elif isinstance(self.max_components_type, int):
                max_components = self.max_components_type
            else:
                raise ValueError("Invalid max components type")
            n_components = min(max_components, n_stumps, n_samples)
            if n_components == 0:
                pca_transformer = None
            else:
                X_transformed = tree_feature_transform(restricted_stumps, X)
                pca_transformer = PCA(n_components=n_components)
                pca_transformer.fit(X_transformed)

            return pca_transformer

        if self.pca:
            n_features = X.shape[1]
            for k in np.arange(n_features):
                self.pca_transformers[k] = pca_on_stumps(k)
        else:
            pass

    def transform(self, X):
        """
        Obtain all engineered features.

        :param X: array-like of shape (n_samples, n_features)
            Original data matrix
        :return: X_transformed: array-like of shape (n_samples, n_new_features)
            Transformed data matrix
        """
        X_transformed = []
        n_features = X.shape[1]
        for k in range(n_features):
            X_transformed_k = self.transform_one_feature(X, k)
            if X_transformed_k is not None:
                X_transformed.append(X_transformed_k)
        X_transformed = np.hstack(X_transformed)

        return X_transformed

    def transform_one_feature(self, X, k):
        """
        Obtain the engineered features corresponding to a given original feature X_k

        :param X: array-like of shape (n_samples, n_features)
            Original data matrix
        :param k: int
            Index of original feature
        :return: X_transformed: array-like of shape (n_samples, n_new_features)
            Transformed data matrix
        """
        restricted_stumps = self.get_stumps_for_feature(k)
        if len(restricted_stumps) == 0:
            return None
        else:
            X_transformed = tree_feature_transform(restricted_stumps, X)
            if self.pca_transformers[k] is not None:
                X_transformed = self.pca_transformers[k].transform(X_transformed)
        return X_transformed

    def get_stumps_for_feature(self, k):
        """
        Get the list of local decision stumps that split on feature k

        :param k: int
            Index of original feature
        :return: restricted_stumps: list of LocalDecisionStump objects
        """
        restricted_stump_indices = self._original_feat_to_stump_mapping[k]
        restricted_stumps = [self.all_stumps[idx] for idx in restricted_stump_indices]

        return restricted_stumps


def _compare(data, k, threshold, sign=True):
    """
    Obtain indicator vector for the samples with k-th feature > threshold

    :param data: array-like of shape (n_sample, n_feat)
    :param k: int
        Index of feature in question
    :param threshold: float
        Threshold for the comparison
    :param sign: bool
        Flag, if False, return indicator of the complement
    :return: array-like of shape (n_samples,)
    """
    if sign:
        return data[:, k] > threshold
    else:
        return data[:, k] <= threshold


def _compare_all(data, ks, thresholds, signs):
    """
    Obtain indicator vector for the samples with k-th feature > threshold or <= threshold (depending on sign)
    for all k in ks

    :param data: array-like of shape (n_sample, n_feat)
    :param ks: list of ints
        Indices of feature in question
    :param thresholds: list of floats
        Threshold for the comparison
    :param signs: list of bools
        Flags, if k-th element if True, then add the condition k-th feature > threshold, otherwise add the
        condition k-th feature <= threshold
    :return:
    """
    return ~np.logical_xor(data[:, ks] > thresholds, signs)

Functions

def make_stump(node_no, tree_struct, parent_stump, is_right_child, normalize=False)

Create a single local decision stump corresponding to a node in a scikit-learn tree structure object. The nonzero values of the stump are chosen so that the vector of local decision stump values over the training set (used to fit the tree) is orthogonal to those of all ancestor nodes.

:param node_no: int The index of the node :param tree_struct: object The scikit-learn tree object :param parent_stump: LocalDecisionStump object The local decision stump corresponding to the parent of the node in question :param is_right_child: bool True if the new node is the right child of the parent node, False otherwise :param normalize: bool Flag. If set to True, then divide the nonzero function values by sqrt(n_samples in node) so that the vector of function values on the training set has unit norm. If False, then do not divide, so that the vector of function values on the training set has norm equal to n_samples in node. :return: LocalDecisionStump object The local decision stump corresponding to the node in question

Expand source code
def make_stump(node_no, tree_struct, parent_stump, is_right_child, normalize=False):
    """
    Create a single local decision stump corresponding to a node in a scikit-learn tree structure object.
    The nonzero values of the stump are chosen so that the vector of local decision stump values over the training
    set (used to fit the tree) is orthogonal to those of all ancestor nodes.

    :param node_no: int
        The index of the node
    :param tree_struct: object
        The scikit-learn tree object
    :param parent_stump: LocalDecisionStump object
        The local decision stump corresponding to the parent of the node in question
    :param is_right_child: bool
        True if the new node is the right child of the parent node, False otherwise
    :param normalize: bool
        Flag. If set to True, then divide the nonzero function values by sqrt(n_samples in node) so that the
        vector of function values on the training set has unit norm. If False, then do not divide, so that the
        vector of function values on the training set has norm equal to n_samples in node.
    :return: LocalDecisionStump object
        The local decision stump corresponding to the node in question
    """
    # Get features, thresholds and signs for ancestors
    if parent_stump is None:  # If root node
        a_features = []
        a_thresholds = []
        a_signs = []
    else:
        a_features = parent_stump.a_features + [parent_stump.feature]
        a_thresholds = parent_stump.a_thresholds + [parent_stump.threshold]
        a_signs = parent_stump.a_signs + [is_right_child]
    # Get indices for left and right children of the node in question
    left_child = tree_struct.children_left[node_no]
    right_child = tree_struct.children_right[node_no]
    # Get quantities relevant to the node in question
    feature = tree_struct.feature[node_no]
    threshold = tree_struct.threshold[node_no]
    left_size = tree_struct.n_node_samples[left_child]
    right_size = tree_struct.n_node_samples[right_child]
    parent_size = tree_struct.n_node_samples[node_no]
    normalization = parent_size if normalize else 1
    left_val = - np.sqrt(right_size / (left_size * normalization))
    right_val = np.sqrt(left_size / (right_size * normalization))

    return LocalDecisionStump(feature, threshold, left_val, right_val, a_features, a_thresholds, a_signs)
def make_stumps(tree_struct, normalize=False)

Create a collection of local decision stumps corresponding to all internal nodes in a scikit-learn tree structure object.

:param tree_struct: object The scikit-learn tree object :param normalize: bool Flag. If set to True, then divide the nonzero function values by sqrt(n_samples in node) so that the vector of function values on the training set has unit norm. If False, then do not divide, so that the vector of function values on the training set has norm equal to n_samples in node. :return: stumps: list of LocalDecisionStump objects The local decision stumps corresponding to all internal node in the tree structure num_splits_per_feature: array-like of shape (n_features,) The number of splits in the tree on each original feature

Expand source code
def make_stumps(tree_struct, normalize=False):
    """
    Create a collection of local decision stumps corresponding to all internal nodes in a scikit-learn tree structure
    object.

    :param tree_struct: object
        The scikit-learn tree object
    :param normalize: bool
        Flag. If set to True, then divide the nonzero function values by sqrt(n_samples in node) so that the
        vector of function values on the training set has unit norm. If False, then do not divide, so that the
        vector of function values on the training set has norm equal to n_samples in node.
    :return:
        stumps: list of LocalDecisionStump objects
            The local decision stumps corresponding to all internal node in the tree structure
        num_splits_per_feature: array-like of shape (n_features,)
            The number of splits in the tree on each original feature
    """
    stumps = []
    num_splits_per_feature = [0] * tree_struct.n_features

    def make_stump_iter(node_no, tree_struct, parent_stump, is_right_child, normalize, stumps, num_splits_per_feature):
        """
        Helper function for iteratively making local decision stump objects and appending them to the list stumps.
        """
        new_stump = make_stump(node_no, tree_struct, parent_stump, is_right_child, normalize)
        stumps.append(new_stump)
        num_splits_per_feature[new_stump.feature] += 1
        left_child = tree_struct.children_left[node_no]
        right_child = tree_struct.children_right[node_no]
        if tree_struct.feature[left_child] != -2:  # is not leaf
            make_stump_iter(left_child, tree_struct, new_stump, False, normalize, stumps, num_splits_per_feature)
        if tree_struct.feature[right_child] != -2:  # is not leaf
            make_stump_iter(right_child, tree_struct, new_stump, True, normalize, stumps, num_splits_per_feature)

    make_stump_iter(0, tree_struct, None, None, normalize, stumps, num_splits_per_feature)
    return stumps, num_splits_per_feature
def tree_feature_transform(stumps, X)

Transform the data matrix X using a mapping derived from a collection of local decision stump functions.

:param stumps: list of LocalDecisionStump objects List of stump functions to use to transform data :param X: array-like of shape (n_samples, n_features) Original data matrix :return: X_transformed: array-like of shape (n_samples, n_stumps) Transformed data matrix

Expand source code
def tree_feature_transform(stumps, X):
    """
    Transform the data matrix X using a mapping derived from a collection of local decision stump functions.

    :param stumps: list of LocalDecisionStump objects
        List of stump functions to use to transform data
    :param X: array-like of shape (n_samples, n_features)
        Original data matrix
    :return: X_transformed: array-like of shape (n_samples, n_stumps)
        Transformed data matrix
    """
    transformed_feature_vectors = []
    for stump in stumps:
        transformed_feature_vec = stump(X)
        transformed_feature_vectors.append(transformed_feature_vec)
    X_transformed = np.vstack(transformed_feature_vectors).T

    return X_transformed

Classes

class LocalDecisionStump (feature, threshold, left_val, right_val, a_features, a_thresholds, a_signs)

An object that implements a callable local decision stump function and that also includes some meta-data that allows for an API to interact with other methods in the package.

A local decision stump is a tri-valued function that is zero outside of a rectangular region, and on that region, takes either a positive or negative value, depending on whether a single designated feature is above or below a threshold. For more information on what a local decision stump is, refer to our paper.

:param feature: int Feature used in the decision stump :param threshold: float Threshold used in the decision stump :param left_val: float The value taken when x_k <= threshold :param right_val: float The value taken when x_k > threshold :param a_features: list of ints List of ancestor feature indices (ordered from highest ancestor to lowest) :param a_thresholds: list of floats List of ancestor thresholds (ordered from highest ancestor to lowest) :param a_signs: list of bools List of signs indicating whether the current node is in the left child (False) or right child (True) of the ancestor nodes (ordered from highest ancestor to lowest)

Expand source code
class LocalDecisionStump:
    """
    An object that implements a callable local decision stump function and that also includes some meta-data that
    allows for an API to interact with other methods in the package.

    A local decision stump is a tri-valued function that is zero outside of a rectangular region, and on that region,
    takes either a positive or negative value, depending on whether a single designated feature is above or below a
    threshold. For more information on what a local decision stump is, refer to our paper.

    :param feature: int
        Feature used in the decision stump
    :param threshold: float
        Threshold used in the decision stump
    :param left_val: float
        The value taken when x_k <= threshold
    :param right_val: float
        The value taken when x_k > threshold
    :param a_features: list of ints
        List of ancestor feature indices (ordered from highest ancestor to lowest)
    :param a_thresholds: list of floats
        List of ancestor thresholds (ordered from highest ancestor to lowest)
    :param a_signs: list of bools
        List of signs indicating whether the current node is in the left child (False) or right child (True) of the
        ancestor nodes (ordered from highest ancestor to lowest)
    """

    def __init__(self, feature, threshold, left_val, right_val, a_features, a_thresholds, a_signs):
        self.feature = feature
        self.threshold = threshold
        self.left_val = left_val
        self.right_val = right_val
        self.a_features = a_features
        self.a_thresholds = a_thresholds
        self.a_signs = a_signs

    def __call__(self, data):
        """
        Return values of the local decision stump function on an input data matrix with samples as rows

        :param data: array-like of shape (n_samples, n_features)
            Data matrix to feed into the local decision stump function
        :return: array-like of shape (n_samples,)
            Function values on the data
        """

        root_to_stump_path_indicators = _compare_all(data, self.a_features, np.array(self.a_thresholds),
                                                     np.array(self.a_signs))
        in_node = np.all(root_to_stump_path_indicators, axis=1).astype(int)
        is_right = _compare(data, self.feature, self.threshold).astype(int)
        result = in_node * (is_right * self.right_val + (1 - is_right) * self.left_val)

        return result

    def __repr__(self):
        return f"LocalDecisionStump(feature={self.feature}, threshold={self.threshold}, left_val={self.left_val}, " \
               f"right_val={self.right_val}, a_features={self.a_features}, a_thresholds={self.a_thresholds}, " \
               f"a_signs={self.a_signs})"
class TreeTransformer (estimator, pca=True, max_components_type='min_fracnsamples_nstumps', alpha=0.5, normalize=False)

A transformer that transforms data using a representation built from local decision stumps from a tree or tree ensemble. The transformer also comes with meta data on the local decision stumps and methods that allow for transformations using sub-representations corresponding to each of the original features.

:param estimator: scikit-learn estimator The scikit-learn tree or tree ensemble estimator object :param pca: bool Flag, if False, the sub-representation for each original feature is just the concatenation of the local decision stumps splitting on that feature. If true, the sub-representation are the principal components of the set of local decision stump vectors :param max_components_type: {"median_splits", "max_splits", "nsamples", "nstumps", "min_nsamples_nstumps", "min_fracnsamples_nstumps"} or int Method for choosing the max number of components for PCA transformer for each sub-representation corresponding to an original feature: - If "median_splits", then max_components is alpha * median number of splits on the original feature among trees in the estimator - If "max_splits", then max_components is alpha * maximum number of splits on the original feature among trees in the estimator - If "nsamples", then max_components is alpha * n_samples - If "nstumps", then max_components is alpha * n_stumps - If "min_nsamples_nstumps", then max_components is alpha * min(n_samples, n_stumps), where n_stumps is total number of local decision stumps splitting on that feature in the ensemble - If "min_fracnsamples_nstumps", then max_components is min(alpha * n_samples, n_stumps), where n_stumps is total number of local decision stumps splitting on that feature in the ensemble - If int, then max_components is the given integer :param alpha: float Parameter for adjusting the max number of components for PCA. :param normalize: bool Flag. If set to True, then divide the nonzero function values for each local decision stump by sqrt(n_samples in node) so that the vector of function values on the training set has unit norm. If False, then do not divide, so that the vector of function values on the training set has norm equal to n_samples in node.

Expand source code
class TreeTransformer(TransformerMixin, BaseEstimator):
    """
    A transformer that transforms data using a representation built from local decision stumps from a tree or tree
    ensemble. The transformer also comes with meta data on the local decision stumps and methods that allow
    for transformations using sub-representations corresponding to each of the original features.

    :param estimator: scikit-learn estimator
        The scikit-learn tree or tree ensemble estimator object
    :param pca: bool
        Flag, if False, the sub-representation for each original feature is just the concatenation of the local
        decision stumps splitting on that feature. If true, the sub-representation are the principal components of the
        set of local decision stump vectors
    :param max_components_type: {"median_splits", "max_splits", "nsamples", "nstumps", "min_nsamples_nstumps",
        "min_fracnsamples_nstumps"} or int
        Method for choosing the max number of components for PCA transformer for each sub-representation corresponding
        to an original feature:
            - If "median_splits", then max_components is alpha * median number of splits on the original feature
              among trees in the estimator
            - If "max_splits", then max_components is alpha * maximum number of splits on the original feature among
              trees in the estimator
            - If "nsamples", then max_components is alpha * n_samples
            - If "nstumps", then max_components is alpha * n_stumps
            - If "min_nsamples_nstumps", then max_components is alpha * min(n_samples, n_stumps), where n_stumps is
              total number of local decision stumps splitting on that feature in the ensemble
            - If "min_fracnsamples_nstumps", then max_components is min(alpha * n_samples, n_stumps), where n_stumps is
              total number of local decision stumps splitting on that feature in the ensemble
            - If int, then max_components is the given integer
    :param alpha: float
        Parameter for adjusting the max number of components for PCA.
    :param normalize: bool
        Flag. If set to True, then divide the nonzero function values for each local decision stump by
        sqrt(n_samples in node) so that the vector of function values on the training set has unit norm. If False,
        then do not divide, so that the vector of function values on the training set has norm equal to n_samples
        in node.
    """

    def __init__(self, estimator, pca=True, max_components_type="min_fracnsamples_nstumps", alpha=0.5, normalize=False):
        self.estimator = estimator
        self.pca = pca
        self.max_components_type = max_components_type
        self.alpha = alpha
        self.normalize = normalize
        # Check if single tree or tree ensemble
        tree_models = estimator.estimators_ if isinstance(estimator, BaseEnsemble) else [estimator]
        # Make stumps for each tree
        num_splits_per_feature_all = []
        self.all_stumps = []
        for tree_model in tree_models:
            tree_stumps, num_splits_per_feature = make_stumps(tree_model.tree_, normalize)
            self.all_stumps += tree_stumps
            num_splits_per_feature_all.append(num_splits_per_feature)
        # Identify the stumps that split on feature k, for each k
        self._original_feat_to_stump_mapping = defaultdict(list)
        for idx, stump in enumerate(self.all_stumps):
            self._original_feat_to_stump_mapping[stump.feature].append(idx)
        # Obtain the median and max number of splits on each feature across trees
        self.median_splits = np.median(num_splits_per_feature_all, axis=0)
        self.max_splits = np.max(num_splits_per_feature_all, axis=0)
        # Initialize list of PCA transformers, one for each set of stumps corresponding to each original feature
        self.pca_transformers = defaultdict(lambda: None)

    def fit(self, X, y=None):

        def pca_on_stumps(k):
            """
            Helper function to fit PCA transformer on stumps corresponding to original feature k
            """
            restricted_stumps = self.get_stumps_for_feature(k)
            n_stumps = len(restricted_stumps)
            n_samples = X.shape[0]
            # Get the number of components to use for PCA
            if self.max_components_type == 'median_splits':
                max_components = int(self.median_splits[k] * self.alpha)
            elif self.max_components_type == "max_splits":
                max_components = int(self.max_splits[k] * self.alpha)
            elif self.max_components_type == "nsamples":
                max_components = int(n_samples * self.alpha)
            elif self.max_components_type == "nstumps":
                max_components = int(n_stumps * self.alpha)
            elif self.max_components_type == "min_nsamples_nstumps":
                max_components = int(min(n_samples, n_stumps) * self.alpha)
            elif self.max_components_type == "min_fracnsamples_nstumps":
                max_components = int(min(n_samples * self.alpha, n_stumps))
            elif isinstance(self.max_components_type, int):
                max_components = self.max_components_type
            else:
                raise ValueError("Invalid max components type")
            n_components = min(max_components, n_stumps, n_samples)
            if n_components == 0:
                pca_transformer = None
            else:
                X_transformed = tree_feature_transform(restricted_stumps, X)
                pca_transformer = PCA(n_components=n_components)
                pca_transformer.fit(X_transformed)

            return pca_transformer

        if self.pca:
            n_features = X.shape[1]
            for k in np.arange(n_features):
                self.pca_transformers[k] = pca_on_stumps(k)
        else:
            pass

    def transform(self, X):
        """
        Obtain all engineered features.

        :param X: array-like of shape (n_samples, n_features)
            Original data matrix
        :return: X_transformed: array-like of shape (n_samples, n_new_features)
            Transformed data matrix
        """
        X_transformed = []
        n_features = X.shape[1]
        for k in range(n_features):
            X_transformed_k = self.transform_one_feature(X, k)
            if X_transformed_k is not None:
                X_transformed.append(X_transformed_k)
        X_transformed = np.hstack(X_transformed)

        return X_transformed

    def transform_one_feature(self, X, k):
        """
        Obtain the engineered features corresponding to a given original feature X_k

        :param X: array-like of shape (n_samples, n_features)
            Original data matrix
        :param k: int
            Index of original feature
        :return: X_transformed: array-like of shape (n_samples, n_new_features)
            Transformed data matrix
        """
        restricted_stumps = self.get_stumps_for_feature(k)
        if len(restricted_stumps) == 0:
            return None
        else:
            X_transformed = tree_feature_transform(restricted_stumps, X)
            if self.pca_transformers[k] is not None:
                X_transformed = self.pca_transformers[k].transform(X_transformed)
        return X_transformed

    def get_stumps_for_feature(self, k):
        """
        Get the list of local decision stumps that split on feature k

        :param k: int
            Index of original feature
        :return: restricted_stumps: list of LocalDecisionStump objects
        """
        restricted_stump_indices = self._original_feat_to_stump_mapping[k]
        restricted_stumps = [self.all_stumps[idx] for idx in restricted_stump_indices]

        return restricted_stumps

Ancestors

  • sklearn.base.TransformerMixin
  • sklearn.base.BaseEstimator

Methods

def fit(self, X, y=None)
Expand source code
def fit(self, X, y=None):

    def pca_on_stumps(k):
        """
        Helper function to fit PCA transformer on stumps corresponding to original feature k
        """
        restricted_stumps = self.get_stumps_for_feature(k)
        n_stumps = len(restricted_stumps)
        n_samples = X.shape[0]
        # Get the number of components to use for PCA
        if self.max_components_type == 'median_splits':
            max_components = int(self.median_splits[k] * self.alpha)
        elif self.max_components_type == "max_splits":
            max_components = int(self.max_splits[k] * self.alpha)
        elif self.max_components_type == "nsamples":
            max_components = int(n_samples * self.alpha)
        elif self.max_components_type == "nstumps":
            max_components = int(n_stumps * self.alpha)
        elif self.max_components_type == "min_nsamples_nstumps":
            max_components = int(min(n_samples, n_stumps) * self.alpha)
        elif self.max_components_type == "min_fracnsamples_nstumps":
            max_components = int(min(n_samples * self.alpha, n_stumps))
        elif isinstance(self.max_components_type, int):
            max_components = self.max_components_type
        else:
            raise ValueError("Invalid max components type")
        n_components = min(max_components, n_stumps, n_samples)
        if n_components == 0:
            pca_transformer = None
        else:
            X_transformed = tree_feature_transform(restricted_stumps, X)
            pca_transformer = PCA(n_components=n_components)
            pca_transformer.fit(X_transformed)

        return pca_transformer

    if self.pca:
        n_features = X.shape[1]
        for k in np.arange(n_features):
            self.pca_transformers[k] = pca_on_stumps(k)
    else:
        pass
def get_stumps_for_feature(self, k)

Get the list of local decision stumps that split on feature k

:param k: int Index of original feature :return: restricted_stumps: list of LocalDecisionStump objects

Expand source code
def get_stumps_for_feature(self, k):
    """
    Get the list of local decision stumps that split on feature k

    :param k: int
        Index of original feature
    :return: restricted_stumps: list of LocalDecisionStump objects
    """
    restricted_stump_indices = self._original_feat_to_stump_mapping[k]
    restricted_stumps = [self.all_stumps[idx] for idx in restricted_stump_indices]

    return restricted_stumps
def transform(self, X)

Obtain all engineered features.

:param X: array-like of shape (n_samples, n_features) Original data matrix :return: X_transformed: array-like of shape (n_samples, n_new_features) Transformed data matrix

Expand source code
def transform(self, X):
    """
    Obtain all engineered features.

    :param X: array-like of shape (n_samples, n_features)
        Original data matrix
    :return: X_transformed: array-like of shape (n_samples, n_new_features)
        Transformed data matrix
    """
    X_transformed = []
    n_features = X.shape[1]
    for k in range(n_features):
        X_transformed_k = self.transform_one_feature(X, k)
        if X_transformed_k is not None:
            X_transformed.append(X_transformed_k)
    X_transformed = np.hstack(X_transformed)

    return X_transformed
def transform_one_feature(self, X, k)

Obtain the engineered features corresponding to a given original feature X_k

:param X: array-like of shape (n_samples, n_features) Original data matrix :param k: int Index of original feature :return: X_transformed: array-like of shape (n_samples, n_new_features) Transformed data matrix

Expand source code
def transform_one_feature(self, X, k):
    """
    Obtain the engineered features corresponding to a given original feature X_k

    :param X: array-like of shape (n_samples, n_features)
        Original data matrix
    :param k: int
        Index of original feature
    :return: X_transformed: array-like of shape (n_samples, n_new_features)
        Transformed data matrix
    """
    restricted_stumps = self.get_stumps_for_feature(k)
    if len(restricted_stumps) == 0:
        return None
    else:
        X_transformed = tree_feature_transform(restricted_stumps, X)
        if self.pca_transformers[k] is not None:
            X_transformed = self.pca_transformers[k].transform(X_transformed)
    return X_transformed