Expand source code
# from abc import ABC, abstractmethod
# import numpy as np
# from collections import defaultdict
# from sklearn.base import TransformerMixin, BaseEstimator
# from sklearn.ensemble import BaseEnsemble
# from sklearn.ensemble._forest import _generate_unsampled_indices, _generate_sample_indices
# from sklearn.preprocessing import StandardScaler
# from .local_stumps import make_stumps, tree_feature_transform
# class BlockPartitionedData:
# """
# Abstraction for a feature matrix in which the columns are grouped into
# blocks.
# Parameters
# ----------
# data_blocks: list of ndarray
# Blocks of feature columns
# common_block: ndarray
# A set of feature columns that should be common to all blocks
# """
# def __init__(self, data_blocks, common_block=None):
# self.n_blocks = len(data_blocks)
# self.n_samples = data_blocks[0].shape[0]
# self._data_blocks = data_blocks
# self._common_block = common_block
# self._create_block_indices()
# self._means = [np.mean(data_block, axis=0) for data_block in
# self._data_blocks]
# def get_all_data(self):
# """
# Returns
# -------
# all_data: ndarray
# Returns the data matrix obtained by concatenating all feature
# blocks together
# """
# if self._common_block is None:
# all_data = np.hstack(self._data_blocks)
# else:
# all_data = np.hstack(self._data_blocks + [self._common_block])
# # Common block appended at the end
# return all_data
# def _create_block_indices(self):
# self._block_indices_dict = dict({})
# start_index = 0
# for k in range(self.n_blocks):
# stop_index = start_index + self._data_blocks[k].shape[1]
# self._block_indices_dict[k] = list(range(start_index, stop_index))
# start_index = stop_index
# if self._common_block is None:
# self._common_block_indices = []
# else:
# stop_index = start_index + self._common_block.shape[1]
# self._common_block_indices = list(range(start_index, stop_index))
# def get_indices(self, k, all_except=False):
# """
# Parameters
# ----------
# all_except
# k: int
# The index of the feature block desired
# Returns
# -------
# block_indices: list of int
# The indices of the features in the desired block
# """
# if k not in self._block_indices_dict.keys():
# raise ValueError(f"{k} not a block index.")
# if all_except:
# indices = []
# for block_no, block_indices in self._block_indices_dict.items():
# if block_no != k:
# indices += block_indices
# else:
# indices = self._block_indices_dict[k]
# indices += self._common_block_indices
# return indices
# def get_blocks(self, k, all_except=False):
# """
# Parameters
# ----------
# k: int
# The index of the feature block desired
# Returns
# -------
# block: ndarray
# The feature block desired
# """
# if k not in self._block_indices_dict.keys():
# raise ValueError(f"{k} not a block index.")
# if all_except:
# blocks = []
# for block_no, block in enumerate(self._data_blocks):
# if block_no != k:
# blocks.append(block)
# else:
# blocks = [self._data_blocks[k]]
# if self._common_block is not None:
# blocks.append(self._common_block)
# if len(blocks) > 1:
# stacked_blocks = np.hstack([self._common_block,
# self._data_blocks[k]])
# else:
# stacked_blocks = blocks[0]
# return stacked_blocks
# def get_modified_data(self, k, mode="keep_k"):
# """
# Modify the data by either imputing the mean of each feature in block k
# (keep_rest) or imputing the mean of each feature not in block k
# (keep_k). Return the full data matrix with the modified data.
# Parameters
# ----------
# k: int
# The index of the feature block not to modify
# mode: string in {"keep_k", "keep_rest"}
# Mode for the method. "keep_k" imputes the mean of each feature not
# in block k, "keep_rest" imputes the mean of each feature in block k
# Returns
# -------
# all_data: ndarray
# Returns the data matrix obtained by concatenating all feature
# blocks together
# """
# modified_blocks = [np.outer(np.ones(self.n_samples), self._means[i])
# for i in range(self.n_blocks)]
# if mode == "keep_k":
# data_blocks = \
# [self._data_blocks[i] if i == k else modified_blocks[i] for
# i in range(self.n_blocks)]
# elif mode == "keep_rest":
# data_blocks = \
# [modified_blocks[i] if i == k else self._data_blocks[i] for
# i in range(self.n_blocks)]
# else:
# raise ValueError("Unsupported mode.")
# if self._common_block is None:
# all_data = np.hstack(data_blocks)
# else:
# all_data = np.hstack(data_blocks + [self._common_block])
# return all_data
# def train_test_split(self, train_indices, test_indices):
# train_blocks = [self.get_blocks(k)[train_indices, :] for
# k in range(self.n_blocks)]
# train_blocked_data = BlockPartitionedData(train_blocks)
# test_blocks = [self.get_blocks(k)[test_indices, :] for
# k in range(self.n_blocks)]
# test_blocked_data = BlockPartitionedData(test_blocks)
# return train_blocked_data, test_blocked_data
# def __repr__(self):
# return self.get_all_data().__repr__()
# class RFPlusFeatureMapping(ABC):
# """
# An interface for block transformers, objects that transform a data matrix
# into a BlockPartitionedData object comprising one block of engineered
# features for each original feature
# """
# def __init__(self, estimator=None, additional_transformer="default",
# drop_features=True, center=True, rescale=False):
# self.n_blocks = None
# self.estimator = estimator
# self.additional_transformer = additional_transformer
# self.drop_features = drop_features
# self._make_stumps()
# self._base_transformers = {}
# self.is_fitted = False
# self.center = center
# self.rescale = rescale
# def _make_stumps(self):
# if isinstance(self.estimator, BaseEnsemble):
# tree_models = self.estimator.estimators_
# else:
# tree_models = [self.estimator]
# # Make stumps for each tree
# all_stumps = []
# for tree_model in tree_models:
# tree_stumps = make_stumps(tree_model.tree_)
# all_stumps += tree_stumps
# # Identify the stumps that split on feature k, for each k
# self.stumps = defaultdict(list)
# for stump in all_stumps:
# self.stumps[stump.feature].append(stump)
# self.n_splits = {k: len(stumps) for k, stumps in self.stumps.items()}
# def fit(self, X):
# self.n_blocks = X.shape[1]
# for k in range(self.n_blocks):
# self._make_base_transformer(X, k)
# self.is_fitted = True
# return self
# def _make_base_transformer(self, X, k):
# if self.drop_features and len(self.stumps[k]) == 0:
# return None
# if self.additional_transformer == "default":
# additional_transformer = Stan
# def check_is_fitted(self):
# if not self.is_fitted:
# raise AttributeError("Transformer has not yet been fitted.")
# def transform(self, X):
# """
# Transform a data matrix into a BlockPartitionedData object comprising
# one block for each original feature in X
# Parameters
# ----------
# X: ndarray
# The data matrix to be transformed
# center: bool
# Flag for whether to center the transformed data
# normalize: bool
# Flag for whether to rescale the transformed data to have unit
# variance
# Returns
# -------
# blocked_data: BlockPartitionedData object
# The transformed data
# """
# self.check_is_fitted()
# data_blocks = [self._base_transformers[k].transform(X) for
# k in range(self.n_blocks)]
# blocked_data = BlockPartitionedData(data_blocks)
# return blocked_data
# def fit_transform(self, X):
# self.fit(X)
# return self.transform(X)
# def _center_and_rescale(self, base_transformer):
# std_scaler = StandardScaler(with_mean=self.center,
# with_std=self.rescale)
# return data_block
# class StumpTransformer(TransformerMixin, BaseEstimator):
# def __init__(self, stumps):
# self.stumps = stumps
# def fit(self, X):
# pass
# def transform(self, X):
# return tree_feature_transform(self.stumps, X)
# class IdentityTransformer(BlockTransformerBase, ABC):
# """
# Block transformer that creates a block partitioned data object with each
# block k containing only the original feature k.
# """
# def _fit_one_feature(self, X, k):
# self._centers[k] = np.mean(X[:, [k]])
# self._scales[k] = np.std(X[:, [k]])
# def _transform_one_feature(self, X, k):
# return X[:, [k]]
# class TreeTransformer(BlockTransformerBase, ABC):
# """
# A block transformer that transforms data using a representation built from
# local decision stumps from a tree or tree ensemble. The transformer also
# comes with metadata on the local decision stumps and methods that allow for
# transformations using sub-representations corresponding to each of the
# original features.
# Parameters
# ----------
# estimator: scikit-learn estimator
# The scikit-learn tree or tree ensemble estimator object.
# data: ndarray
# A data matrix that can be used to update the number of samples in each
# node of the tree(s) in the supplied estimator object. This affects
# the node values of the resulting engineered features.
# """
# def __init__(self, estimator, data=None):
# super().__init__()
# self.estimator = estimator
# self.oob_seed = self.estimator.random_state
# # Check if single tree or tree ensemble
# if isinstance(estimator, BaseEnsemble):
# tree_models = estimator.estimators_
# if data is not None:
# # If a data matrix is supplied, use it to update the number
# # of samples in each node
# for tree_model in tree_models:
# _update_n_node_samples(tree_model, data)
# else:
# tree_models = [estimator]
# # Make stumps for each tree
# all_stumps = []
# for tree_model in tree_models:
# tree_stumps = make_stumps(tree_model.tree_)
# all_stumps += tree_stumps
# # Identify the stumps that split on feature k, for each k
# self.stumps = defaultdict(list)
# for stump in all_stumps:
# self.stumps[stump.feature].append(stump)
# self.n_splits = {k: len(stumps) for k, stumps in self.stumps.items()}
# def _fit_one_feature(self, X, k):
# stump_features = tree_feature_transform(self.stumps[k], X)
# self._centers[k] = np.mean(stump_features, axis=0)
# self._scales[k] = np.std(stump_features, axis=0)
# def _transform_one_feature(self, X, k):
# return tree_feature_transform(self.stumps[k], X)
# def _fit_transform_one_feature(self, X, k):
# stump_features = tree_feature_transform(self.stumps[k], X)
# self._centers[k] = np.mean(stump_features, axis=0)
# self._scales[k] = np.std(stump_features, axis=0)
# return stump_features
# class CompositeTransformer(BlockTransformerBase, ABC):
# """
# A block transformer that is built by concatenating the blocks of the same
# index from a list of block transformers.
# Parameters
# ----------
# block_transformer_list: list of BlockTransformer objects
# The list of block transformers to combine
# rescale_mode: string in {"max", "mean", None}
# Flag for the type of rescaling to be done to the blocks from different
# base transformers. If "max", divide each block by the max std deviation
# of a column within the block. If "mean", divide each block by the mean
# std deviation of a column within the block. If None, do not rescale.
# drop_features: bool
# Flag for whether to return an empty block if that from the first
# transformer in the list is trivial.
# """
# def __init__(self, block_transformer_list, rescale_mode=None, drop_features=True):
# super().__init__()
# self.block_transformer_list = block_transformer_list
# assert len(self.block_transformer_list) > 0, "Need at least one base" \
# "transformer."
# for transformer in block_transformer_list:
# if hasattr(transformer, "oob_seed") and \
# transformer.oob_seed is not None:
# self.oob_seed = transformer.oob_seed
# break
# self.rescale_mode = rescale_mode
# self.drop_features = drop_features
# self._rescale_factors = {}
# self._trivial_block_indices = {}
# def _fit_one_feature(self, X, k):
# data_blocks = []
# centers = []
# scales = []
# for block_transformer in self.block_transformer_list:
# data_block = block_transformer.fit_transform_one_feature(
# X, k, center=False, normalize=False)
# data_blocks.append(data_block)
# centers.append(block_transformer._centers[k])
# scales.append(block_transformer._scales[k])
# # Handle trivial blocks
# self._trivial_block_indices[k] = \
# [idx for idx, data_block in enumerate(data_blocks) if
# _empty_or_constant(data_block)]
# if (0 in self._trivial_block_indices[k] and self.drop_features) or \
# (len(self._trivial_block_indices[k]) == len(data_blocks)):
# # If first block is trivial and self.drop_features is True,
# self._centers[k] = np.array([0])
# self._scales[k] = np.array([1])
# return
# else:
# # Remove trivial blocks
# for idx in reversed(self._trivial_block_indices[k]):
# data_blocks.pop(idx)
# centers.pop(idx)
# scales.pop(idx)
# self._centers[k] = np.hstack(centers)
# self._scales[k] = np.hstack(scales)
# self._rescale_factors[k] = _get_rescale_factors(data_blocks, self.rescale_mode)
# def _transform_one_feature(self, X, k):
# data_blocks = []
# for block_transformer in self.block_transformer_list:
# data_block = block_transformer.transform_one_feature(
# X, k, center=False, normalize=False)
# data_blocks.append(data_block)
# # Handle trivial blocks
# if (0 in self._trivial_block_indices[k] and self.drop_features) or \
# (len(self._trivial_block_indices[k]) == len(data_blocks)):
# # If first block is trivial and self.drop_features is True,
# # return empty block
# return np.empty((X.shape[0], 0))
# else:
# # Remove trivial blocks
# for idx in reversed(self._trivial_block_indices[k]):
# data_blocks.pop(idx)
# composite_block = np.hstack(
# [data_block / scale_factor for data_block, scale_factor in
# zip(data_blocks, self._rescale_factors[k])]
# )
# return composite_block
# def _fit_transform_one_feature(self, X, k):
# data_blocks = []
# centers = []
# scales = []
# for block_transformer in self.block_transformer_list:
# data_block = block_transformer.fit_transform_one_feature(
# X, k, center=False, normalize=False)
# data_blocks.append(data_block)
# centers.append(block_transformer._centers[k])
# scales.append(block_transformer._scales[k])
# # Handle trivial blocks
# self._trivial_block_indices[k] = \
# [idx for idx, data_block in enumerate(data_blocks) if
# _empty_or_constant(data_block)]
# if (0 in self._trivial_block_indices[k] and self.drop_features) or \
# (len(self._trivial_block_indices[k]) == len(data_blocks)):
# # If first block is trivial and self.drop_features is True,
# # return empty block
# self._centers[k] = np.array([0])
# self._scales[k] = np.array([1])
# return np.empty((X.shape[0], 0))
# else:
# # Remove trivial blocks
# for idx in reversed(self._trivial_block_indices[k]):
# data_blocks.pop(idx)
# centers.pop(idx)
# scales.pop(idx)
# self._centers[k] = np.hstack(centers)
# self._scales[k] = np.hstack(scales)
# self._rescale_factors[k] = _get_rescale_factors(data_blocks, self.rescale_mode)
# composite_block = np.hstack(
# [data_block / scale_factor for data_block, scale_factor in
# zip(data_blocks, self._rescale_factors[k])]
# )
# return composite_block
# class GmdiDefaultTransformer(CompositeTransformer, ABC):
# """
# Default block transformer used in GMDI. For each original feature, this
# forms a block comprising the local decision stumps, from a single tree
# model, that split on the feature, and appends the original feature.
# Parameters
# ----------
# tree_model: scikit-learn estimator
# The scikit-learn tree estimator object.
# rescale_mode: string in {"max", "mean", None}
# Flag for the type of rescaling to be done to the blocks from different
# base transformers. If "max", divide each block by the max std deviation
# of a column within the block. If "mean", divide each block by the mean
# std deviation of a column within the block. If None, do not rescale.
# drop_features: bool
# Flag for whether to return an empty block if that from the first
# transformer in the list is trivial.
# """
# def __init__(self, tree_model, rescale_mode="max", drop_features=True):
# super().__init__([TreeTransformer(tree_model), IdentityTransformer()],
# rescale_mode, drop_features)
# def _update_n_node_samples(tree, X):
# node_indicators = tree.decision_path(X)
# new_n_node_samples = node_indicators.getnnz(axis=0)
# for i in range(len(new_n_node_samples)):
# tree.tree_.n_node_samples[i] = new_n_node_samples[i]
# def _get_rescale_factors(data_blocks, rescale_mode):
# if rescale_mode == "max":
# scale_factors = np.array([max(data_block.std(axis=0)) for
# data_block in data_blocks])
# elif rescale_mode == "mean":
# scale_factors = np.array([np.mean(data_block.std(axis=0)) for
# data_block in data_blocks])
# elif rescale_mode is None:
# scale_factors = np.ones(len(data_blocks))
# else:
# raise ValueError("Invalid rescale mode.")
# scale_factors = scale_factors / scale_factors[0]
# return scale_factors
# def _empty_or_constant(data_block):
# return data_block.shape[1] == 0 or max(data_block.std(axis=0)) == 0
# def _blocked_train_test_split(blocked_data, y, oob_seed):
# n_samples = len(y)
# train_indices = _generate_sample_indices(oob_seed, n_samples, n_samples)
# test_indices = _generate_unsampled_indices(oob_seed, n_samples, n_samples)
# train_blocked_data, test_blocked_data = \
# blocked_data.train_test_split(train_indices, test_indices)
# if y.ndim > 1:
# y_train = y[train_indices, :]
# y_test = y[test_indices, :]
# else:
# y_train = y[train_indices]
# y_test = y[test_indices]
# return train_blocked_data, test_blocked_data, y_train, y_test, train_indices, test_indices