Expand source code
from copy import deepcopy
import numpy as np
from matplotlib import pyplot as plt
from sklearn import datasets
from sklearn import tree
from sklearn.base import BaseEstimator
from sklearn.linear_model import RidgeCV, RidgeClassifierCV
from sklearn.model_selection import train_test_split
from sklearn.tree import plot_tree
from sklearn.utils import check_X_y
from imodels.tree.viz_utils import extract_sklearn_tree_from_figs
class Node:
def __init__(self, feature: int = None, threshold: int = None,
value=None, idxs=None, is_root: bool = False, left=None,
impurity_reduction: float = None, tree_num: int = None,
right=None, split_or_linear='split', n_samples=0):
"""Node class for splitting
"""
# split or linear
self.is_root = is_root
self.idxs = idxs
self.tree_num = tree_num
self.split_or_linear = split_or_linear
self.feature = feature
self.n_samples = n_samples
self.impurity_reduction = impurity_reduction
# different meanings
self.value = value # for split this is mean, for linear this is weight
# split-specific (for linear these should all be None)
self.threshold = threshold
self.left = left
self.right = right
self.left_temp = None
self.right_temp = None
def update_values(self, X, y):
self.value = y.mean()
if self.threshold is not None:
right_indicator = np.apply_along_axis(
lambda x: x[self.feature] > self.threshold, 1, X)
X_right = X[right_indicator, :]
X_left = X[~right_indicator, :]
y_right = y[right_indicator]
y_left = y[~right_indicator]
if self.left is not None:
self.left.update_values(X_left, y_left)
if self.right is not None:
self.right.update_values(X_right, y_right)
def shrink(self, reg_param, cum_sum=0):
if self.is_root:
cum_sum = self.value
if self.left is None: # if leaf node, change prediction
self.value = cum_sum
else:
shrunk_diff = (self.left.value - self.value) / \
(1 + reg_param / self.n_samples)
self.left.shrink(reg_param, cum_sum + shrunk_diff)
shrunk_diff = (self.right.value - self.value) / \
(1 + reg_param / self.n_samples)
self.right.shrink(reg_param, cum_sum + shrunk_diff)
def setattrs(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
def __str__(self):
if self.split_or_linear == 'linear':
if self.is_root:
return f'X_{self.feature} * {self.value:0.3f} (Tree #{self.tree_num} linear root)'
else:
return f'X_{self.feature} * {self.value:0.3f} (linear)'
else:
if self.is_root:
return f'X_{self.feature} <= {self.threshold:0.3f} (Tree #{self.tree_num} root)'
elif self.left is None and self.right is None:
return f'Val: {self.value[0][0]:0.3f} (leaf)'
else:
return f'X_{self.feature} <= {self.threshold:0.3f} (split)'
def __repr__(self):
return self.__str__()
class FIGSExt(BaseEstimator):
"""FIGSExt (sum of trees) classifier.
Fast Interpretable Greedy-Tree Sums (FIGS) is an algorithm for fitting concise rule-based models.
Specifically, FIGS generalizes CART to simultaneously grow a flexible number of trees in a summation.
The total number of splits across all the trees can be restricted by a pre-specified threshold, keeping the model interpretable.
Experiments across a wide array of real-world datasets show that FIGS achieves state-of-the-art prediction performance when restricted to just a few splits (e.g. less than 20).
https://arxiv.org/abs/2201.11931
"""
def __init__(self, max_rules: int = None, posthoc_ridge: bool = False,
include_linear: bool = False,
max_features=None, min_impurity_decrease: float = 0.0,
k1: int = 0, k2: int = 0):
"""
max_features
The number of features to consider when looking for the best split
k1: number of iterations of tree-prediction backfitting to do after making each split
k2: number of iterations of tree-prediction backfitting to do after the end of the entire
tree-growing phase
"""
super().__init__()
self.max_rules = max_rules
self.posthoc_ridge = posthoc_ridge
self.include_linear = include_linear
self.max_features = max_features
self.weighted_model_ = None # set if using posthoc_ridge
self.min_impurity_decrease = min_impurity_decrease
self.k1 = k1
self.k2 = k2
self._init_prediction_task() # decides between regressor and classifier
def _init_prediction_task(self):
"""
FIGSExtRegressor and FIGSExtClassifier override this method
to alter the prediction task. When using this class directly,
it is equivalent to FIGSExtRegressor
"""
self.prediction_task = 'regression'
def _init_decision_function(self):
"""Sets decision function based on prediction_task
"""
# used by sklearn GridSearchCV, BaggingClassifier
if self.prediction_task == 'classification':
def decision_function(x): return self.predict_proba(x)[:, 1]
elif self.prediction_task == 'regression':
decision_function = self.predict
def _construct_node_linear(self, X, y, idxs, tree_num=0, sample_weight=None):
"""This can be made a lot faster
Assumes there are at least 5 points in node
Doesn't currently support _sample_weight!
"""
y_target = y[idxs]
impurity_orig = np.mean(np.square(y_target)) * idxs.sum()
# find best linear split
best_impurity = impurity_orig
best_linear_coef = None
best_feature = None
for feature_num in range(X.shape[1]):
x = X[idxs, feature_num].reshape(-1, 1)
m = RidgeCV(fit_intercept=False)
m.fit(x, y_target)
impurity = np.min(-m.best_score_) * idxs.sum()
assert impurity >= 0, 'impurity should not be negative'
if impurity < best_impurity:
best_impurity = impurity
best_linear_coef = m.coef_[0]
best_feature = feature_num
impurity_reduction = impurity_orig - best_impurity
# no good linear fit found
if impurity_reduction == 0:
return Node(idxs=idxs, value=np.mean(y_target), tree_num=tree_num,
feature=None, threshold=None,
impurity_reduction=-1, split_or_linear='split') # leaf node that just returns its value
else:
assert isinstance(best_linear_coef,
float), 'coef should be a float'
return Node(idxs=idxs, value=best_linear_coef, tree_num=tree_num,
feature=best_feature, threshold=None,
impurity_reduction=impurity_reduction, split_or_linear='linear')
def _construct_node_with_stump(self, X, y, idxs, tree_num, sample_weight=None, max_features=None):
# array indices
SPLIT = 0
LEFT = 1
RIGHT = 2
# fit stump
stump = tree.DecisionTreeRegressor(
max_depth=1, max_features=max_features)
if sample_weight is not None:
sample_weight = sample_weight[idxs]
stump.fit(X[idxs], y[idxs], sample_weight=sample_weight)
# these are all arrays, arr[0] is split node
# note: -2 is dummy
feature = stump.tree_.feature
threshold = stump.tree_.threshold
impurity = stump.tree_.impurity
n_node_samples = stump.tree_.n_node_samples
value = stump.tree_.value
# no split
if len(feature) == 1:
# print('no split found!', idxs.sum(), impurity, feature)
return Node(idxs=idxs, value=value[SPLIT], tree_num=tree_num,
feature=feature[SPLIT], threshold=threshold[SPLIT],
impurity_reduction=-1, n_samples=n_node_samples)
# split node
impurity_reduction = (
impurity[SPLIT] -
impurity[LEFT] * n_node_samples[LEFT] / n_node_samples[SPLIT] -
impurity[RIGHT] * n_node_samples[RIGHT] / n_node_samples[SPLIT]
) * idxs.sum()
node_split = Node(idxs=idxs, value=value[SPLIT], tree_num=tree_num,
feature=feature[SPLIT], threshold=threshold[SPLIT],
impurity_reduction=impurity_reduction, n_samples=n_node_samples)
# print('\t>>>', node_split, 'impurity', impurity, 'num_pts', idxs.sum(), 'imp_reduc', impurity_reduction)
# manage children
idxs_split = X[:, feature[SPLIT]] <= threshold[SPLIT]
idxs_left = idxs_split & idxs
idxs_right = ~idxs_split & idxs
node_left = Node(idxs=idxs_left, value=value[LEFT], tree_num=tree_num)
node_right = Node(
idxs=idxs_right, value=value[RIGHT], tree_num=tree_num)
node_split.setattrs(left_temp=node_left, right_temp=node_right, )
return node_split
def fit(self, X, y=None, feature_names=None, verbose=False, sample_weight=None):
"""
Params
------
_sample_weight: array-like of shape (n_samples,), default=None
Sample weights. If None, then samples are equally weighted.
Splits that would create child nodes with net zero or negative weight
are ignored while searching for a split in each node.
"""
if self.prediction_task == 'classification':
self.classes_, y = np.unique(
y, return_inverse=True) # deals with str inputs
X, y = check_X_y(X, y)
y = y.astype(float)
if feature_names is not None:
self.feature_names_ = feature_names
self.trees_ = [] # list of the root nodes of added trees
self.complexity_ = 0 # tracks the number of rules in the model
y_predictions_per_tree = {} # predictions for each tree
y_residuals_per_tree = {} # based on predictions above
def _update_tree_preds(n_iter):
for k in range(n_iter):
for tree_num_, tree_ in enumerate(self.trees_):
y_residuals_per_tree[tree_num_] = deepcopy(y)
# subtract predictions of all other trees
for tree_num_2_ in range(len(self.trees_)):
if not tree_num_2_ == tree_num_:
y_residuals_per_tree[tree_num_] -= y_predictions_per_tree[tree_num_2_]
tree_.update_values(X, y_residuals_per_tree[tree_num_])
y_predictions_per_tree[tree_num_] = self._predict_tree(self.trees_[
tree_num_], X)
# set up initial potential_splits
# everything in potential_splits either is_root (so it can be added directly to self.trees_)
# or it is a child of a root node that has already been added
idxs = np.ones(X.shape[0], dtype=bool)
node_init = self._construct_node_with_stump(X=X, y=y, idxs=idxs, tree_num=-1,
sample_weight=sample_weight, max_features=self.max_features)
potential_splits = [node_init]
if self.include_linear and idxs.sum() >= 5:
node_init_linear = self._construct_node_linear(X=X, y=y, idxs=idxs, tree_num=-1,
sample_weight=sample_weight)
potential_splits.append(node_init_linear)
for node in potential_splits:
node.setattrs(is_root=True)
potential_splits = sorted(
potential_splits, key=lambda x: x.impurity_reduction)
# start the greedy fitting algorithm
finished = False
while len(potential_splits) > 0 and not finished:
# print('potential_splits', [str(s) for s in potential_splits])
# get node with max impurity_reduction (since it's sorted)
split_node = potential_splits.pop()
# don't split on node
if split_node.impurity_reduction < self.min_impurity_decrease:
finished = True
break
# split on node
if verbose:
print('\nadding ' + str(split_node))
self.complexity_ += 1
# if added a tree root
if split_node.is_root:
# start a new tree
self.trees_.append(split_node)
# update tree_num
for node_ in [split_node, split_node.left_temp, split_node.right_temp]:
if node_ is not None:
node_.tree_num = len(self.trees_) - 1
# add new root potential node
node_new_root = Node(is_root=True, idxs=np.ones(X.shape[0], dtype=bool),
tree_num=-1, split_or_linear=split_node.split_or_linear)
potential_splits.append(node_new_root)
# add children to potential splits (note this doesn't currently add linear potential splits)
if split_node.split_or_linear == 'split':
# assign left_temp, right_temp to be proper children
# (basically adds them to tree in predict method)
split_node.setattrs(left=split_node.left_temp,
right=split_node.right_temp)
# add children to potential_splits
potential_splits.append(split_node.left)
potential_splits.append(split_node.right)
# update predictions for altered tree
for tree_num_ in range(len(self.trees_)):
y_predictions_per_tree[tree_num_] = self._predict_tree(self.trees_[
tree_num_], X)
# dummy 0 preds for possible new trees
y_predictions_per_tree[-1] = np.zeros(X.shape[0])
# update residuals for each tree
# -1 is key for potential new tree
for tree_num_ in list(range(len(self.trees_))) + [-1]:
y_residuals_per_tree[tree_num_] = deepcopy(y)
# subtract predictions of all other trees
for tree_num_2_ in range(len(self.trees_)):
if not tree_num_2_ == tree_num_:
y_residuals_per_tree[tree_num_] -= y_predictions_per_tree[tree_num_2_]
_update_tree_preds(self.k1)
# recompute all impurities + update potential_split children
potential_splits_new = []
for potential_split in potential_splits:
y_target = y_residuals_per_tree[potential_split.tree_num]
if potential_split.split_or_linear == 'split':
# re-calculate the best split
potential_split_updated = self._construct_node_with_stump(X=X,
y=y_target,
idxs=potential_split.idxs,
tree_num=potential_split.tree_num,
sample_weight=sample_weight,
max_features=self.max_features)
# need to preserve certain attributes from before (value at this split + is_root)
# value may change because residuals may have changed, but we want it to store the value from before
potential_split.setattrs(
feature=potential_split_updated.feature,
threshold=potential_split_updated.threshold,
impurity_reduction=potential_split_updated.impurity_reduction,
left_temp=potential_split_updated.left_temp,
right_temp=potential_split_updated.right_temp,
)
elif potential_split.split_or_linear == 'linear':
assert potential_split.is_root, 'Currently, linear node only supported as root'
assert potential_split.idxs.sum(
) == X.shape[0], 'Currently, linear node only supported as root'
potential_split_updated = self._construct_node_linear(idxs=potential_split.idxs,
X=X,
y=y_target,
tree_num=potential_split.tree_num,
sample_weight=sample_weight)
# don't need to retain anything from before (besides maybe is_root)
potential_split.setattrs(
feature=potential_split_updated.feature,
impurity_reduction=potential_split_updated.impurity_reduction,
value=potential_split_updated.value,
)
# this is a valid split
if potential_split.impurity_reduction is not None:
potential_splits_new.append(potential_split)
# sort so largest impurity reduction comes last (should probs make this a heap later)
potential_splits = sorted(
potential_splits_new, key=lambda x: x.impurity_reduction)
if verbose:
print(self)
if self.max_rules is not None and self.complexity_ >= self.max_rules:
finished = True
break
_update_tree_preds(self.k2)
# potentially fit linear model on the tree preds
if self.posthoc_ridge:
if self.prediction_task == 'regression':
self.weighted_model_ = RidgeCV(
alphas=(0.01, 0.1, 0.5, 1.0, 5, 10))
elif self.prediction_task == 'classification':
self.weighted_model_ = RidgeClassifierCV(
alphas=(0.01, 0.1, 0.5, 1.0, 5, 10))
X_feats = self._extract_tree_predictions(X)
self.weighted_model_.fit(X_feats, y)
return self
def _tree_to_str(self, root: Node, prefix=''):
if root is None:
return ''
elif root.split_or_linear == 'linear':
return prefix + str(root)
elif root.threshold is None:
return ''
pprefix = prefix + '\t'
return prefix + str(root) + '\n' + self._tree_to_str(root.left, pprefix) + self._tree_to_str(root.right,
pprefix)
def __str__(self):
s = '------------\n' + \
'\n\t+\n'.join([self._tree_to_str(t) for t in self.trees_])
if hasattr(self, 'feature_names_') and self.feature_names_ is not None:
for i in range(len(self.feature_names_))[::-1]:
s = s.replace(f'X_{i}', self.feature_names_[i])
return s
def predict(self, X):
if self.posthoc_ridge and self.weighted_model_: # note, during fitting don't use the weighted moel
X_feats = self._extract_tree_predictions(X)
return self.weighted_model_.predict(X_feats)
preds = np.zeros(X.shape[0])
for tree in self.trees_:
preds += self._predict_tree(tree, X)
if self.prediction_task == 'regression':
return preds
elif self.prediction_task == 'classification':
return (preds > 0.5).astype(int)
def predict_proba(self, X):
if self.prediction_task == 'regression':
return NotImplemented
elif self.posthoc_ridge and self.weighted_model_: # note, during fitting don't use the weighted moel
X_feats = self._extract_tree_predictions(X)
d = self.weighted_model_.decision_function(
X_feats) # for 2 classes, this (n_samples,)
probs = np.exp(d) / (1 + np.exp(d))
return np.vstack((1 - probs, probs)).transpose()
else:
preds = np.zeros(X.shape[0])
for tree in self.trees_:
preds += self._predict_tree(tree, X)
# constrain to range of probabilities
preds = np.clip(preds, a_min=0., a_max=1.)
return np.vstack((1 - preds, preds)).transpose()
def _extract_tree_predictions(self, X):
"""Extract predictions for all trees
"""
X_feats = np.zeros((X.shape[0], len(self.trees_)))
for tree_num_ in range(len(self.trees_)):
preds_tree = self._predict_tree(self.trees_[tree_num_], X)
X_feats[:, tree_num_] = preds_tree
return X_feats
def _predict_tree(self, root: Node, X):
"""Predict for a single tree
This can be made way faster
"""
def _predict_tree_single_point(root: Node, x):
if root.split_or_linear == 'linear':
return x[root.feature] * root.value
elif root.left is None and root.right is None:
return root.value
left = x[root.feature] <= root.threshold
if left:
if root.left is None: # we don't actually have to worry about this case
return root.value
else:
return _predict_tree_single_point(root.left, x)
else:
if root.right is None: # we don't actually have to worry about this case
return root.value
else:
return _predict_tree_single_point(root.right, x)
preds = np.zeros(X.shape[0])
for i in range(X.shape[0]):
preds[i] = _predict_tree_single_point(root, X[i])
return preds
def plot(self, cols=2, feature_names=None, filename=None, label="all",
impurity=False, tree_number=None, dpi=150, fig_size=None):
is_single_tree = len(self.trees_) < 2 or tree_number is not None
n_cols = int(cols)
n_rows = int(np.ceil(len(self.trees_) / n_cols))
# if is_single_tree:
# fig, ax = plt.subplots(1)
# else:
# fig, axs = plt.subplots(n_rows, n_cols)
n_plots = int(len(self.trees_)) if tree_number is None else 1
fig, axs = plt.subplots(n_plots, dpi=dpi)
if fig_size is not None:
fig.set_size_inches(fig_size, fig_size)
criterion = "squared_error" if self.prediction_task == "regression" else "gini"
n_classes = 1 if self.prediction_task == 'regression' else 2
ax_size = int(len(self.trees_)) # n_cols * n_rows
for i in range(n_plots):
r = i // n_cols
c = i % n_cols
if not is_single_tree:
# ax = axs[r, c]
ax = axs[i]
else:
ax = axs
try:
dt = extract_sklearn_tree_from_figs(
self, i if tree_number is None else tree_number, n_classes)
plot_tree(dt, ax=ax, feature_names=feature_names,
label=label, impurity=impurity)
except IndexError:
ax.axis('off')
continue
ax.set_title(f"Tree {i}")
if filename is not None:
plt.savefig(filename)
return
plt.show()
class FIGSExtRegressor(FIGSExt):
def _init_prediction_task(self):
self.prediction_task = 'regression'
class FIGSExtClassifier(FIGSExt):
def _init_prediction_task(self):
self.prediction_task = 'classification'
if __name__ == '__main__':
np.random.seed(13)
# X, y = datasets.load_breast_cancer(return_X_y=True) # binary classification
X, y = datasets.load_diabetes(return_X_y=True) # regression
# X = np.random.randn(500, 10)
# y = (X[:, 0] > 0).astype(float) + (X[:, 1] > 1).astype(float)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.33, random_state=42
)
print('X.shape', X.shape)
print('ys', np.unique(y_train), '\n\n')
m = FIGSExtClassifier(max_rules=50)
m.fit(X_train, y_train)
print(m.predict_proba(X_train))
m.plot(2, tree_number=0)
Classes
class FIGSExt (max_rules: int = None, posthoc_ridge: bool = False, include_linear: bool = False, max_features=None, min_impurity_decrease: float = 0.0, k1: int = 0, k2: int = 0)
-
FIGSExt (sum of trees) classifier. Fast Interpretable Greedy-Tree Sums (FIGS) is an algorithm for fitting concise rule-based models. Specifically, FIGS generalizes CART to simultaneously grow a flexible number of trees in a summation. The total number of splits across all the trees can be restricted by a pre-specified threshold, keeping the model interpretable. Experiments across a wide array of real-world datasets show that FIGS achieves state-of-the-art prediction performance when restricted to just a few splits (e.g. less than 20). https://arxiv.org/abs/2201.11931
max_features The number of features to consider when looking for the best split k1: number of iterations of tree-prediction backfitting to do after making each split k2: number of iterations of tree-prediction backfitting to do after the end of the entire tree-growing phase
Expand source code
class FIGSExt(BaseEstimator): """FIGSExt (sum of trees) classifier. Fast Interpretable Greedy-Tree Sums (FIGS) is an algorithm for fitting concise rule-based models. Specifically, FIGS generalizes CART to simultaneously grow a flexible number of trees in a summation. The total number of splits across all the trees can be restricted by a pre-specified threshold, keeping the model interpretable. Experiments across a wide array of real-world datasets show that FIGS achieves state-of-the-art prediction performance when restricted to just a few splits (e.g. less than 20). https://arxiv.org/abs/2201.11931 """ def __init__(self, max_rules: int = None, posthoc_ridge: bool = False, include_linear: bool = False, max_features=None, min_impurity_decrease: float = 0.0, k1: int = 0, k2: int = 0): """ max_features The number of features to consider when looking for the best split k1: number of iterations of tree-prediction backfitting to do after making each split k2: number of iterations of tree-prediction backfitting to do after the end of the entire tree-growing phase """ super().__init__() self.max_rules = max_rules self.posthoc_ridge = posthoc_ridge self.include_linear = include_linear self.max_features = max_features self.weighted_model_ = None # set if using posthoc_ridge self.min_impurity_decrease = min_impurity_decrease self.k1 = k1 self.k2 = k2 self._init_prediction_task() # decides between regressor and classifier def _init_prediction_task(self): """ FIGSExtRegressor and FIGSExtClassifier override this method to alter the prediction task. When using this class directly, it is equivalent to FIGSExtRegressor """ self.prediction_task = 'regression' def _init_decision_function(self): """Sets decision function based on prediction_task """ # used by sklearn GridSearchCV, BaggingClassifier if self.prediction_task == 'classification': def decision_function(x): return self.predict_proba(x)[:, 1] elif self.prediction_task == 'regression': decision_function = self.predict def _construct_node_linear(self, X, y, idxs, tree_num=0, sample_weight=None): """This can be made a lot faster Assumes there are at least 5 points in node Doesn't currently support _sample_weight! """ y_target = y[idxs] impurity_orig = np.mean(np.square(y_target)) * idxs.sum() # find best linear split best_impurity = impurity_orig best_linear_coef = None best_feature = None for feature_num in range(X.shape[1]): x = X[idxs, feature_num].reshape(-1, 1) m = RidgeCV(fit_intercept=False) m.fit(x, y_target) impurity = np.min(-m.best_score_) * idxs.sum() assert impurity >= 0, 'impurity should not be negative' if impurity < best_impurity: best_impurity = impurity best_linear_coef = m.coef_[0] best_feature = feature_num impurity_reduction = impurity_orig - best_impurity # no good linear fit found if impurity_reduction == 0: return Node(idxs=idxs, value=np.mean(y_target), tree_num=tree_num, feature=None, threshold=None, impurity_reduction=-1, split_or_linear='split') # leaf node that just returns its value else: assert isinstance(best_linear_coef, float), 'coef should be a float' return Node(idxs=idxs, value=best_linear_coef, tree_num=tree_num, feature=best_feature, threshold=None, impurity_reduction=impurity_reduction, split_or_linear='linear') def _construct_node_with_stump(self, X, y, idxs, tree_num, sample_weight=None, max_features=None): # array indices SPLIT = 0 LEFT = 1 RIGHT = 2 # fit stump stump = tree.DecisionTreeRegressor( max_depth=1, max_features=max_features) if sample_weight is not None: sample_weight = sample_weight[idxs] stump.fit(X[idxs], y[idxs], sample_weight=sample_weight) # these are all arrays, arr[0] is split node # note: -2 is dummy feature = stump.tree_.feature threshold = stump.tree_.threshold impurity = stump.tree_.impurity n_node_samples = stump.tree_.n_node_samples value = stump.tree_.value # no split if len(feature) == 1: # print('no split found!', idxs.sum(), impurity, feature) return Node(idxs=idxs, value=value[SPLIT], tree_num=tree_num, feature=feature[SPLIT], threshold=threshold[SPLIT], impurity_reduction=-1, n_samples=n_node_samples) # split node impurity_reduction = ( impurity[SPLIT] - impurity[LEFT] * n_node_samples[LEFT] / n_node_samples[SPLIT] - impurity[RIGHT] * n_node_samples[RIGHT] / n_node_samples[SPLIT] ) * idxs.sum() node_split = Node(idxs=idxs, value=value[SPLIT], tree_num=tree_num, feature=feature[SPLIT], threshold=threshold[SPLIT], impurity_reduction=impurity_reduction, n_samples=n_node_samples) # print('\t>>>', node_split, 'impurity', impurity, 'num_pts', idxs.sum(), 'imp_reduc', impurity_reduction) # manage children idxs_split = X[:, feature[SPLIT]] <= threshold[SPLIT] idxs_left = idxs_split & idxs idxs_right = ~idxs_split & idxs node_left = Node(idxs=idxs_left, value=value[LEFT], tree_num=tree_num) node_right = Node( idxs=idxs_right, value=value[RIGHT], tree_num=tree_num) node_split.setattrs(left_temp=node_left, right_temp=node_right, ) return node_split def fit(self, X, y=None, feature_names=None, verbose=False, sample_weight=None): """ Params ------ _sample_weight: array-like of shape (n_samples,), default=None Sample weights. If None, then samples are equally weighted. Splits that would create child nodes with net zero or negative weight are ignored while searching for a split in each node. """ if self.prediction_task == 'classification': self.classes_, y = np.unique( y, return_inverse=True) # deals with str inputs X, y = check_X_y(X, y) y = y.astype(float) if feature_names is not None: self.feature_names_ = feature_names self.trees_ = [] # list of the root nodes of added trees self.complexity_ = 0 # tracks the number of rules in the model y_predictions_per_tree = {} # predictions for each tree y_residuals_per_tree = {} # based on predictions above def _update_tree_preds(n_iter): for k in range(n_iter): for tree_num_, tree_ in enumerate(self.trees_): y_residuals_per_tree[tree_num_] = deepcopy(y) # subtract predictions of all other trees for tree_num_2_ in range(len(self.trees_)): if not tree_num_2_ == tree_num_: y_residuals_per_tree[tree_num_] -= y_predictions_per_tree[tree_num_2_] tree_.update_values(X, y_residuals_per_tree[tree_num_]) y_predictions_per_tree[tree_num_] = self._predict_tree(self.trees_[ tree_num_], X) # set up initial potential_splits # everything in potential_splits either is_root (so it can be added directly to self.trees_) # or it is a child of a root node that has already been added idxs = np.ones(X.shape[0], dtype=bool) node_init = self._construct_node_with_stump(X=X, y=y, idxs=idxs, tree_num=-1, sample_weight=sample_weight, max_features=self.max_features) potential_splits = [node_init] if self.include_linear and idxs.sum() >= 5: node_init_linear = self._construct_node_linear(X=X, y=y, idxs=idxs, tree_num=-1, sample_weight=sample_weight) potential_splits.append(node_init_linear) for node in potential_splits: node.setattrs(is_root=True) potential_splits = sorted( potential_splits, key=lambda x: x.impurity_reduction) # start the greedy fitting algorithm finished = False while len(potential_splits) > 0 and not finished: # print('potential_splits', [str(s) for s in potential_splits]) # get node with max impurity_reduction (since it's sorted) split_node = potential_splits.pop() # don't split on node if split_node.impurity_reduction < self.min_impurity_decrease: finished = True break # split on node if verbose: print('\nadding ' + str(split_node)) self.complexity_ += 1 # if added a tree root if split_node.is_root: # start a new tree self.trees_.append(split_node) # update tree_num for node_ in [split_node, split_node.left_temp, split_node.right_temp]: if node_ is not None: node_.tree_num = len(self.trees_) - 1 # add new root potential node node_new_root = Node(is_root=True, idxs=np.ones(X.shape[0], dtype=bool), tree_num=-1, split_or_linear=split_node.split_or_linear) potential_splits.append(node_new_root) # add children to potential splits (note this doesn't currently add linear potential splits) if split_node.split_or_linear == 'split': # assign left_temp, right_temp to be proper children # (basically adds them to tree in predict method) split_node.setattrs(left=split_node.left_temp, right=split_node.right_temp) # add children to potential_splits potential_splits.append(split_node.left) potential_splits.append(split_node.right) # update predictions for altered tree for tree_num_ in range(len(self.trees_)): y_predictions_per_tree[tree_num_] = self._predict_tree(self.trees_[ tree_num_], X) # dummy 0 preds for possible new trees y_predictions_per_tree[-1] = np.zeros(X.shape[0]) # update residuals for each tree # -1 is key for potential new tree for tree_num_ in list(range(len(self.trees_))) + [-1]: y_residuals_per_tree[tree_num_] = deepcopy(y) # subtract predictions of all other trees for tree_num_2_ in range(len(self.trees_)): if not tree_num_2_ == tree_num_: y_residuals_per_tree[tree_num_] -= y_predictions_per_tree[tree_num_2_] _update_tree_preds(self.k1) # recompute all impurities + update potential_split children potential_splits_new = [] for potential_split in potential_splits: y_target = y_residuals_per_tree[potential_split.tree_num] if potential_split.split_or_linear == 'split': # re-calculate the best split potential_split_updated = self._construct_node_with_stump(X=X, y=y_target, idxs=potential_split.idxs, tree_num=potential_split.tree_num, sample_weight=sample_weight, max_features=self.max_features) # need to preserve certain attributes from before (value at this split + is_root) # value may change because residuals may have changed, but we want it to store the value from before potential_split.setattrs( feature=potential_split_updated.feature, threshold=potential_split_updated.threshold, impurity_reduction=potential_split_updated.impurity_reduction, left_temp=potential_split_updated.left_temp, right_temp=potential_split_updated.right_temp, ) elif potential_split.split_or_linear == 'linear': assert potential_split.is_root, 'Currently, linear node only supported as root' assert potential_split.idxs.sum( ) == X.shape[0], 'Currently, linear node only supported as root' potential_split_updated = self._construct_node_linear(idxs=potential_split.idxs, X=X, y=y_target, tree_num=potential_split.tree_num, sample_weight=sample_weight) # don't need to retain anything from before (besides maybe is_root) potential_split.setattrs( feature=potential_split_updated.feature, impurity_reduction=potential_split_updated.impurity_reduction, value=potential_split_updated.value, ) # this is a valid split if potential_split.impurity_reduction is not None: potential_splits_new.append(potential_split) # sort so largest impurity reduction comes last (should probs make this a heap later) potential_splits = sorted( potential_splits_new, key=lambda x: x.impurity_reduction) if verbose: print(self) if self.max_rules is not None and self.complexity_ >= self.max_rules: finished = True break _update_tree_preds(self.k2) # potentially fit linear model on the tree preds if self.posthoc_ridge: if self.prediction_task == 'regression': self.weighted_model_ = RidgeCV( alphas=(0.01, 0.1, 0.5, 1.0, 5, 10)) elif self.prediction_task == 'classification': self.weighted_model_ = RidgeClassifierCV( alphas=(0.01, 0.1, 0.5, 1.0, 5, 10)) X_feats = self._extract_tree_predictions(X) self.weighted_model_.fit(X_feats, y) return self def _tree_to_str(self, root: Node, prefix=''): if root is None: return '' elif root.split_or_linear == 'linear': return prefix + str(root) elif root.threshold is None: return '' pprefix = prefix + '\t' return prefix + str(root) + '\n' + self._tree_to_str(root.left, pprefix) + self._tree_to_str(root.right, pprefix) def __str__(self): s = '------------\n' + \ '\n\t+\n'.join([self._tree_to_str(t) for t in self.trees_]) if hasattr(self, 'feature_names_') and self.feature_names_ is not None: for i in range(len(self.feature_names_))[::-1]: s = s.replace(f'X_{i}', self.feature_names_[i]) return s def predict(self, X): if self.posthoc_ridge and self.weighted_model_: # note, during fitting don't use the weighted moel X_feats = self._extract_tree_predictions(X) return self.weighted_model_.predict(X_feats) preds = np.zeros(X.shape[0]) for tree in self.trees_: preds += self._predict_tree(tree, X) if self.prediction_task == 'regression': return preds elif self.prediction_task == 'classification': return (preds > 0.5).astype(int) def predict_proba(self, X): if self.prediction_task == 'regression': return NotImplemented elif self.posthoc_ridge and self.weighted_model_: # note, during fitting don't use the weighted moel X_feats = self._extract_tree_predictions(X) d = self.weighted_model_.decision_function( X_feats) # for 2 classes, this (n_samples,) probs = np.exp(d) / (1 + np.exp(d)) return np.vstack((1 - probs, probs)).transpose() else: preds = np.zeros(X.shape[0]) for tree in self.trees_: preds += self._predict_tree(tree, X) # constrain to range of probabilities preds = np.clip(preds, a_min=0., a_max=1.) return np.vstack((1 - preds, preds)).transpose() def _extract_tree_predictions(self, X): """Extract predictions for all trees """ X_feats = np.zeros((X.shape[0], len(self.trees_))) for tree_num_ in range(len(self.trees_)): preds_tree = self._predict_tree(self.trees_[tree_num_], X) X_feats[:, tree_num_] = preds_tree return X_feats def _predict_tree(self, root: Node, X): """Predict for a single tree This can be made way faster """ def _predict_tree_single_point(root: Node, x): if root.split_or_linear == 'linear': return x[root.feature] * root.value elif root.left is None and root.right is None: return root.value left = x[root.feature] <= root.threshold if left: if root.left is None: # we don't actually have to worry about this case return root.value else: return _predict_tree_single_point(root.left, x) else: if root.right is None: # we don't actually have to worry about this case return root.value else: return _predict_tree_single_point(root.right, x) preds = np.zeros(X.shape[0]) for i in range(X.shape[0]): preds[i] = _predict_tree_single_point(root, X[i]) return preds def plot(self, cols=2, feature_names=None, filename=None, label="all", impurity=False, tree_number=None, dpi=150, fig_size=None): is_single_tree = len(self.trees_) < 2 or tree_number is not None n_cols = int(cols) n_rows = int(np.ceil(len(self.trees_) / n_cols)) # if is_single_tree: # fig, ax = plt.subplots(1) # else: # fig, axs = plt.subplots(n_rows, n_cols) n_plots = int(len(self.trees_)) if tree_number is None else 1 fig, axs = plt.subplots(n_plots, dpi=dpi) if fig_size is not None: fig.set_size_inches(fig_size, fig_size) criterion = "squared_error" if self.prediction_task == "regression" else "gini" n_classes = 1 if self.prediction_task == 'regression' else 2 ax_size = int(len(self.trees_)) # n_cols * n_rows for i in range(n_plots): r = i // n_cols c = i % n_cols if not is_single_tree: # ax = axs[r, c] ax = axs[i] else: ax = axs try: dt = extract_sklearn_tree_from_figs( self, i if tree_number is None else tree_number, n_classes) plot_tree(dt, ax=ax, feature_names=feature_names, label=label, impurity=impurity) except IndexError: ax.axis('off') continue ax.set_title(f"Tree {i}") if filename is not None: plt.savefig(filename) return plt.show()
Ancestors
- sklearn.base.BaseEstimator
- sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
Subclasses
Methods
def fit(self, X, y=None, feature_names=None, verbose=False, sample_weight=None)
-
Params
_sample_weight: array-like of shape (n_samples,), default=None Sample weights. If None, then samples are equally weighted. Splits that would create child nodes with net zero or negative weight are ignored while searching for a split in each node.
Expand source code
def fit(self, X, y=None, feature_names=None, verbose=False, sample_weight=None): """ Params ------ _sample_weight: array-like of shape (n_samples,), default=None Sample weights. If None, then samples are equally weighted. Splits that would create child nodes with net zero or negative weight are ignored while searching for a split in each node. """ if self.prediction_task == 'classification': self.classes_, y = np.unique( y, return_inverse=True) # deals with str inputs X, y = check_X_y(X, y) y = y.astype(float) if feature_names is not None: self.feature_names_ = feature_names self.trees_ = [] # list of the root nodes of added trees self.complexity_ = 0 # tracks the number of rules in the model y_predictions_per_tree = {} # predictions for each tree y_residuals_per_tree = {} # based on predictions above def _update_tree_preds(n_iter): for k in range(n_iter): for tree_num_, tree_ in enumerate(self.trees_): y_residuals_per_tree[tree_num_] = deepcopy(y) # subtract predictions of all other trees for tree_num_2_ in range(len(self.trees_)): if not tree_num_2_ == tree_num_: y_residuals_per_tree[tree_num_] -= y_predictions_per_tree[tree_num_2_] tree_.update_values(X, y_residuals_per_tree[tree_num_]) y_predictions_per_tree[tree_num_] = self._predict_tree(self.trees_[ tree_num_], X) # set up initial potential_splits # everything in potential_splits either is_root (so it can be added directly to self.trees_) # or it is a child of a root node that has already been added idxs = np.ones(X.shape[0], dtype=bool) node_init = self._construct_node_with_stump(X=X, y=y, idxs=idxs, tree_num=-1, sample_weight=sample_weight, max_features=self.max_features) potential_splits = [node_init] if self.include_linear and idxs.sum() >= 5: node_init_linear = self._construct_node_linear(X=X, y=y, idxs=idxs, tree_num=-1, sample_weight=sample_weight) potential_splits.append(node_init_linear) for node in potential_splits: node.setattrs(is_root=True) potential_splits = sorted( potential_splits, key=lambda x: x.impurity_reduction) # start the greedy fitting algorithm finished = False while len(potential_splits) > 0 and not finished: # print('potential_splits', [str(s) for s in potential_splits]) # get node with max impurity_reduction (since it's sorted) split_node = potential_splits.pop() # don't split on node if split_node.impurity_reduction < self.min_impurity_decrease: finished = True break # split on node if verbose: print('\nadding ' + str(split_node)) self.complexity_ += 1 # if added a tree root if split_node.is_root: # start a new tree self.trees_.append(split_node) # update tree_num for node_ in [split_node, split_node.left_temp, split_node.right_temp]: if node_ is not None: node_.tree_num = len(self.trees_) - 1 # add new root potential node node_new_root = Node(is_root=True, idxs=np.ones(X.shape[0], dtype=bool), tree_num=-1, split_or_linear=split_node.split_or_linear) potential_splits.append(node_new_root) # add children to potential splits (note this doesn't currently add linear potential splits) if split_node.split_or_linear == 'split': # assign left_temp, right_temp to be proper children # (basically adds them to tree in predict method) split_node.setattrs(left=split_node.left_temp, right=split_node.right_temp) # add children to potential_splits potential_splits.append(split_node.left) potential_splits.append(split_node.right) # update predictions for altered tree for tree_num_ in range(len(self.trees_)): y_predictions_per_tree[tree_num_] = self._predict_tree(self.trees_[ tree_num_], X) # dummy 0 preds for possible new trees y_predictions_per_tree[-1] = np.zeros(X.shape[0]) # update residuals for each tree # -1 is key for potential new tree for tree_num_ in list(range(len(self.trees_))) + [-1]: y_residuals_per_tree[tree_num_] = deepcopy(y) # subtract predictions of all other trees for tree_num_2_ in range(len(self.trees_)): if not tree_num_2_ == tree_num_: y_residuals_per_tree[tree_num_] -= y_predictions_per_tree[tree_num_2_] _update_tree_preds(self.k1) # recompute all impurities + update potential_split children potential_splits_new = [] for potential_split in potential_splits: y_target = y_residuals_per_tree[potential_split.tree_num] if potential_split.split_or_linear == 'split': # re-calculate the best split potential_split_updated = self._construct_node_with_stump(X=X, y=y_target, idxs=potential_split.idxs, tree_num=potential_split.tree_num, sample_weight=sample_weight, max_features=self.max_features) # need to preserve certain attributes from before (value at this split + is_root) # value may change because residuals may have changed, but we want it to store the value from before potential_split.setattrs( feature=potential_split_updated.feature, threshold=potential_split_updated.threshold, impurity_reduction=potential_split_updated.impurity_reduction, left_temp=potential_split_updated.left_temp, right_temp=potential_split_updated.right_temp, ) elif potential_split.split_or_linear == 'linear': assert potential_split.is_root, 'Currently, linear node only supported as root' assert potential_split.idxs.sum( ) == X.shape[0], 'Currently, linear node only supported as root' potential_split_updated = self._construct_node_linear(idxs=potential_split.idxs, X=X, y=y_target, tree_num=potential_split.tree_num, sample_weight=sample_weight) # don't need to retain anything from before (besides maybe is_root) potential_split.setattrs( feature=potential_split_updated.feature, impurity_reduction=potential_split_updated.impurity_reduction, value=potential_split_updated.value, ) # this is a valid split if potential_split.impurity_reduction is not None: potential_splits_new.append(potential_split) # sort so largest impurity reduction comes last (should probs make this a heap later) potential_splits = sorted( potential_splits_new, key=lambda x: x.impurity_reduction) if verbose: print(self) if self.max_rules is not None and self.complexity_ >= self.max_rules: finished = True break _update_tree_preds(self.k2) # potentially fit linear model on the tree preds if self.posthoc_ridge: if self.prediction_task == 'regression': self.weighted_model_ = RidgeCV( alphas=(0.01, 0.1, 0.5, 1.0, 5, 10)) elif self.prediction_task == 'classification': self.weighted_model_ = RidgeClassifierCV( alphas=(0.01, 0.1, 0.5, 1.0, 5, 10)) X_feats = self._extract_tree_predictions(X) self.weighted_model_.fit(X_feats, y) return self
def plot(self, cols=2, feature_names=None, filename=None, label='all', impurity=False, tree_number=None, dpi=150, fig_size=None)
-
Expand source code
def plot(self, cols=2, feature_names=None, filename=None, label="all", impurity=False, tree_number=None, dpi=150, fig_size=None): is_single_tree = len(self.trees_) < 2 or tree_number is not None n_cols = int(cols) n_rows = int(np.ceil(len(self.trees_) / n_cols)) # if is_single_tree: # fig, ax = plt.subplots(1) # else: # fig, axs = plt.subplots(n_rows, n_cols) n_plots = int(len(self.trees_)) if tree_number is None else 1 fig, axs = plt.subplots(n_plots, dpi=dpi) if fig_size is not None: fig.set_size_inches(fig_size, fig_size) criterion = "squared_error" if self.prediction_task == "regression" else "gini" n_classes = 1 if self.prediction_task == 'regression' else 2 ax_size = int(len(self.trees_)) # n_cols * n_rows for i in range(n_plots): r = i // n_cols c = i % n_cols if not is_single_tree: # ax = axs[r, c] ax = axs[i] else: ax = axs try: dt = extract_sklearn_tree_from_figs( self, i if tree_number is None else tree_number, n_classes) plot_tree(dt, ax=ax, feature_names=feature_names, label=label, impurity=impurity) except IndexError: ax.axis('off') continue ax.set_title(f"Tree {i}") if filename is not None: plt.savefig(filename) return plt.show()
def predict(self, X)
-
Expand source code
def predict(self, X): if self.posthoc_ridge and self.weighted_model_: # note, during fitting don't use the weighted moel X_feats = self._extract_tree_predictions(X) return self.weighted_model_.predict(X_feats) preds = np.zeros(X.shape[0]) for tree in self.trees_: preds += self._predict_tree(tree, X) if self.prediction_task == 'regression': return preds elif self.prediction_task == 'classification': return (preds > 0.5).astype(int)
def predict_proba(self, X)
-
Expand source code
def predict_proba(self, X): if self.prediction_task == 'regression': return NotImplemented elif self.posthoc_ridge and self.weighted_model_: # note, during fitting don't use the weighted moel X_feats = self._extract_tree_predictions(X) d = self.weighted_model_.decision_function( X_feats) # for 2 classes, this (n_samples,) probs = np.exp(d) / (1 + np.exp(d)) return np.vstack((1 - probs, probs)).transpose() else: preds = np.zeros(X.shape[0]) for tree in self.trees_: preds += self._predict_tree(tree, X) # constrain to range of probabilities preds = np.clip(preds, a_min=0., a_max=1.) return np.vstack((1 - preds, preds)).transpose()
def set_fit_request(self: FIGSExt, *, feature_names: Union[bool, ForwardRef(None), str] = '$UNCHANGED$', sample_weight: Union[bool, ForwardRef(None), str] = '$UNCHANGED$', verbose: Union[bool, ForwardRef(None), str] = '$UNCHANGED$') ‑> FIGSExt
-
Request metadata passed to the
fit
method.Note that this method is only relevant if
enable_metadata_routing=True
(see :func:sklearn.set_config
). Please see :ref:User Guide <metadata_routing>
on how the routing mechanism works.The options for each parameter are:
-
True
: metadata is requested, and passed tofit
if provided. The request is ignored if metadata is not provided. -
False
: metadata is not requested and the meta-estimator will not pass it tofit
. -
None
: metadata is not requested, and the meta-estimator will raise an error if the user provides it. -
str
: metadata should be passed to the meta-estimator with this given alias instead of the original name.
The default (
sklearn.utils.metadata_routing.UNCHANGED
) retains the existing request. This allows you to change the request for some parameters and not others.Added in version: 1.3
Note
This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:
~sklearn.pipeline.Pipeline
. Otherwise it has no effect.Parameters
feature_names
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
feature_names
parameter infit
. sample_weight
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
sample_weight
parameter infit
. verbose
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
verbose
parameter infit
.
Returns
self
:object
- The updated object.
Expand source code
def func(*args, **kw): """Updates the request for provided parameters This docstring is overwritten below. See REQUESTER_DOC for expected functionality """ if not _routing_enabled(): raise RuntimeError( "This method is only available when metadata routing is enabled." " You can enable it using" " sklearn.set_config(enable_metadata_routing=True)." ) if self.validate_keys and (set(kw) - set(self.keys)): raise TypeError( f"Unexpected args: {set(kw) - set(self.keys)} in {self.name}. " f"Accepted arguments are: {set(self.keys)}" ) # This makes it possible to use the decorated method as an unbound method, # for instance when monkeypatching. # https://github.com/scikit-learn/scikit-learn/issues/28632 if instance is None: _instance = args[0] args = args[1:] else: _instance = instance # Replicating python's behavior when positional args are given other than # `self`, and `self` is only allowed if this method is unbound. if args: raise TypeError( f"set_{self.name}_request() takes 0 positional argument but" f" {len(args)} were given" ) requests = _instance._get_metadata_request() method_metadata_request = getattr(requests, self.name) for prop, alias in kw.items(): if alias is not UNCHANGED: method_metadata_request.add_request(param=prop, alias=alias) _instance._metadata_request = requests return _instance
-
class FIGSExtClassifier (max_rules: int = None, posthoc_ridge: bool = False, include_linear: bool = False, max_features=None, min_impurity_decrease: float = 0.0, k1: int = 0, k2: int = 0)
-
FIGSExt (sum of trees) classifier. Fast Interpretable Greedy-Tree Sums (FIGS) is an algorithm for fitting concise rule-based models. Specifically, FIGS generalizes CART to simultaneously grow a flexible number of trees in a summation. The total number of splits across all the trees can be restricted by a pre-specified threshold, keeping the model interpretable. Experiments across a wide array of real-world datasets show that FIGS achieves state-of-the-art prediction performance when restricted to just a few splits (e.g. less than 20). https://arxiv.org/abs/2201.11931
max_features The number of features to consider when looking for the best split k1: number of iterations of tree-prediction backfitting to do after making each split k2: number of iterations of tree-prediction backfitting to do after the end of the entire tree-growing phase
Expand source code
class FIGSExtClassifier(FIGSExt): def _init_prediction_task(self): self.prediction_task = 'classification'
Ancestors
- FIGSExt
- sklearn.base.BaseEstimator
- sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
Inherited members
class FIGSExtRegressor (max_rules: int = None, posthoc_ridge: bool = False, include_linear: bool = False, max_features=None, min_impurity_decrease: float = 0.0, k1: int = 0, k2: int = 0)
-
FIGSExt (sum of trees) classifier. Fast Interpretable Greedy-Tree Sums (FIGS) is an algorithm for fitting concise rule-based models. Specifically, FIGS generalizes CART to simultaneously grow a flexible number of trees in a summation. The total number of splits across all the trees can be restricted by a pre-specified threshold, keeping the model interpretable. Experiments across a wide array of real-world datasets show that FIGS achieves state-of-the-art prediction performance when restricted to just a few splits (e.g. less than 20). https://arxiv.org/abs/2201.11931
max_features The number of features to consider when looking for the best split k1: number of iterations of tree-prediction backfitting to do after making each split k2: number of iterations of tree-prediction backfitting to do after the end of the entire tree-growing phase
Expand source code
class FIGSExtRegressor(FIGSExt): def _init_prediction_task(self): self.prediction_task = 'regression'
Ancestors
- FIGSExt
- sklearn.base.BaseEstimator
- sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
Inherited members
class Node (feature: int = None, threshold: int = None, value=None, idxs=None, is_root: bool = False, left=None, impurity_reduction: float = None, tree_num: int = None, right=None, split_or_linear='split', n_samples=0)
-
Node class for splitting
Expand source code
class Node: def __init__(self, feature: int = None, threshold: int = None, value=None, idxs=None, is_root: bool = False, left=None, impurity_reduction: float = None, tree_num: int = None, right=None, split_or_linear='split', n_samples=0): """Node class for splitting """ # split or linear self.is_root = is_root self.idxs = idxs self.tree_num = tree_num self.split_or_linear = split_or_linear self.feature = feature self.n_samples = n_samples self.impurity_reduction = impurity_reduction # different meanings self.value = value # for split this is mean, for linear this is weight # split-specific (for linear these should all be None) self.threshold = threshold self.left = left self.right = right self.left_temp = None self.right_temp = None def update_values(self, X, y): self.value = y.mean() if self.threshold is not None: right_indicator = np.apply_along_axis( lambda x: x[self.feature] > self.threshold, 1, X) X_right = X[right_indicator, :] X_left = X[~right_indicator, :] y_right = y[right_indicator] y_left = y[~right_indicator] if self.left is not None: self.left.update_values(X_left, y_left) if self.right is not None: self.right.update_values(X_right, y_right) def shrink(self, reg_param, cum_sum=0): if self.is_root: cum_sum = self.value if self.left is None: # if leaf node, change prediction self.value = cum_sum else: shrunk_diff = (self.left.value - self.value) / \ (1 + reg_param / self.n_samples) self.left.shrink(reg_param, cum_sum + shrunk_diff) shrunk_diff = (self.right.value - self.value) / \ (1 + reg_param / self.n_samples) self.right.shrink(reg_param, cum_sum + shrunk_diff) def setattrs(self, **kwargs): for k, v in kwargs.items(): setattr(self, k, v) def __str__(self): if self.split_or_linear == 'linear': if self.is_root: return f'X_{self.feature} * {self.value:0.3f} (Tree #{self.tree_num} linear root)' else: return f'X_{self.feature} * {self.value:0.3f} (linear)' else: if self.is_root: return f'X_{self.feature} <= {self.threshold:0.3f} (Tree #{self.tree_num} root)' elif self.left is None and self.right is None: return f'Val: {self.value[0][0]:0.3f} (leaf)' else: return f'X_{self.feature} <= {self.threshold:0.3f} (split)' def __repr__(self): return self.__str__()
Methods
def setattrs(self, **kwargs)
-
Expand source code
def setattrs(self, **kwargs): for k, v in kwargs.items(): setattr(self, k, v)
def shrink(self, reg_param, cum_sum=0)
-
Expand source code
def shrink(self, reg_param, cum_sum=0): if self.is_root: cum_sum = self.value if self.left is None: # if leaf node, change prediction self.value = cum_sum else: shrunk_diff = (self.left.value - self.value) / \ (1 + reg_param / self.n_samples) self.left.shrink(reg_param, cum_sum + shrunk_diff) shrunk_diff = (self.right.value - self.value) / \ (1 + reg_param / self.n_samples) self.right.shrink(reg_param, cum_sum + shrunk_diff)
def update_values(self, X, y)
-
Expand source code
def update_values(self, X, y): self.value = y.mean() if self.threshold is not None: right_indicator = np.apply_along_axis( lambda x: x[self.feature] > self.threshold, 1, X) X_right = X[right_indicator, :] X_left = X[~right_indicator, :] y_right = y[right_indicator] y_left = y[~right_indicator] if self.left is not None: self.left.update_values(X_left, y_left) if self.right is not None: self.right.update_values(X_right, y_right)