Expand source code
import random
from copy import deepcopy
from queue import deque
import numpy as np
from mlxtend.classifier import LogisticRegression
from sklearn import datasets
from sklearn.base import BaseEstimator, RegressorMixin, ClassifierMixin
from sklearn.linear_model import LinearRegression
from sklearn.metrics import get_scorer
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor, export_text
from sklearn.utils import check_X_y
from imodels.util.arguments import check_fit_arguments
class TaoTree(BaseEstimator):
def __init__(self, model_type: str = 'CART',
n_iters: int = 20,
model_args: dict = {'max_leaf_nodes': 15},
randomize_tree=False,
update_scoring='accuracy',
min_node_samples_tao=3,
min_leaf_samples_tao=2,
node_model='stump',
node_model_args: dict = {},
reg_param: float = 1e-3,
weight_errors: bool = False,
verbose: int = 0,
):
"""TAO: Alternating optimization of decision trees, with application to learning sparse oblique trees (Neurips 2018)
https://proceedings.neurips.cc/paper/2018/hash/185c29dc24325934ee377cfda20e414c-Abstract.html
Note: this implementation learns single-feature splits rather than oblique trees.
Currently supports
- given a CART tree, posthoc improve it with TAO
- also works with HSTreeCV
Todo
- update bottom to top otherwise input points don't get updated
- update leaf nodes
- support regression
- support FIGS
- support error-weighting
- support oblique trees
- support generic models at decision node
- support pruning (e.g. if weights -> 0, then remove a node)
- support classifiers in leaves
Parameters
----------
model_type: str
'CART' or 'FIGS'
n_iters
Number of iterations to run TAO
model_args
Arguments to pass to the model
randomize_tree
Whether to randomize the tree before each iteration
min_node_samples_tao: int
Minimum number of samples in a node to apply tao
min_leaf_samples_tao: int
node_model: str
'stump' or 'linear'
reg_param
Regularization parameter for node-wise linear model (if node_model is 'linear')
verbose: int
Verbosity level
"""
super().__init__()
self.model_type = model_type
self.n_iters = n_iters
self.model_args = model_args
self.randomize_tree = randomize_tree
self.update_scoring = update_scoring
self.min_node_samples_tao = min_node_samples_tao
self.min_leaf_samples_tao = min_leaf_samples_tao
self.node_model = node_model
self.node_model_args = node_model_args
self.reg_param = reg_param
self.weight_errors = weight_errors
self.verbose = verbose
self._init_prediction_task() # decides between regressor and classifier
def _init_prediction_task(self):
"""
TaoRegressor and TaoClassifier override this method
to alter the prediction task. When using this class directly,
it is equivalent to SuperCARTRegressor
"""
self.prediction_task = 'classification'
def fit(self, X, y=None, feature_names=None, sample_weight=None):
"""
Params
------
_sample_weight: array-like of shape (n_samples,), default=None
Sample weights. If None, then samples are equally weighted.
Splits that would create child nodes with net zero or negative weight
are ignored while searching for a split in each node.
"""
X, y, feature_names = check_fit_arguments(self, X, y, feature_names)
if isinstance(self, RegressorMixin):
raise Warning('TAO Regression is not yet tested')
X, y = check_X_y(X, y)
y = y.astype(float)
if feature_names is not None:
self.feature_names_ = feature_names
if self.model_type == 'CART':
if isinstance(self, ClassifierMixin):
self.model = DecisionTreeClassifier(**self.model_args)
elif isinstance(self, RegressorMixin):
self.model = DecisionTreeRegressor(**self.model_args)
self.model.fit(X, y, sample_weight=sample_weight)
if self.verbose:
print(export_text(self.model))
# plot_tree(self.model)
# plt.savefig('/Users/chandan/Desktop/tree.png', dpi=300)
# plt.show()
if self.randomize_tree:
# shuffle CART features
np.random.shuffle(self.model.tree_.feature)
# np.random.shuffle(self.model.tree_.threshold)
for i in range(self.model.tree_.node_count): # split on feature medians
self.model.tree_.threshold[i] = np.median(
X[:, self.model.tree_.feature[i]])
if self.verbose:
print('starting score', self.model.score(X, y))
for i in range(self.n_iters):
num_updates = self._tao_iter_cart(
X, y, self.model.tree_, sample_weight=sample_weight)
if num_updates == 0:
break
return self
def _tao_iter_cart(self, X, y, tree, X_score=None, y_score=None, sample_weight=None):
"""Updates tree by applying the tao algorithm to the tree
Params
------
X: array-like of shape (n_samples, n_features)
The input samples.
y: array-like of shape (n_samples,)
The target values.
model: DecisionTreeClassifier.tree_ or DecisionTreeRegressor.tree_
The model to be post-hoc improved
"""
# Tree properties
children_left = tree.children_left
children_right = tree.children_right
feature = tree.feature
threshold = tree.threshold
value = tree.value
# For each node, store the path to that node #######################################################
# data structure with (index, path_to_node_index)
indexes_with_prefix_paths = []
# e.g. if if node 3 is the left child of node 1 which is the right child of node 0
# then we get (3, [(0, R), (1, L)])
# start with the root node id (0) and its depth (0)
queue = deque()
queue.append((0, []))
while len(queue) > 0:
node_id, path_to_node_index = queue.popleft()
indexes_with_prefix_paths.append((node_id, path_to_node_index))
# If a split node, append left and right children and depth to queue
if children_left[node_id] != children_right[node_id]:
queue.append(
(children_left[node_id], path_to_node_index + [(node_id, 'L')]))
queue.append(
(children_right[node_id], path_to_node_index + [(node_id, 'R')]))
# print(indexes_with_prefix_paths)
num_updates = 0
# Reversing BFS queue presents nodes bottom -> top one level at a time
for (node_id, path_to_node_index) in reversed(indexes_with_prefix_paths):
# For each each node, try a TAO update
# print('node_id', node_id, path_to_node_index)
# Compute the points being input to the node ######################################
def filter_points_by_path(X, y, path_to_node_index):
"""Returns the points in X that are in the path to the node"""
for node_id, direction in path_to_node_index:
idxs = X[:, feature[node_id]] <= threshold[node_id]
if direction == 'R':
idxs = ~idxs
# print('idxs', idxs.size, idxs.sum())
X = X[idxs]
y = y[idxs]
return X, y
X_node, y_node = filter_points_by_path(X, y, path_to_node_index)
if sample_weight is not None:
sample_weight_node = filter_points_by_path(
X, sample_weight, path_to_node_index)[1]
else:
sample_weight_node = np.ones(y_node.size)
# Skip over leaf nodes and nodes with too few samples ######################################
if children_left[node_id] == children_right[node_id]: # is leaf node
if isinstance(self, RegressorMixin) and X_node.shape[0] >= self.min_leaf_samples_tao:
# old_score = self.model.score(X, y)
value[node_id] = np.mean(y_node)
"""
new_score = self.model.score(X, y)
if new_score > old_score:
print(f'\tLeaf improved score from {old_score:0.3f} to {new_score:0.3f}')
if new_score < old_score:
print(f'\tLeaf reduced score from {old_score:0.3f} to {new_score:0.3f}')
# raise ValueError('Leaf update reduced score')
"""
# print('\tshapes', X_node.shape, y_node.shape)
# print('\tvals:', value[node_id][0][0], np.mean(y_node))
# assert value[node_id][0][0] == np.mean(y_node), 'unless tree changed, vals should be leaf means'
continue
elif X_node.shape[0] < self.min_node_samples_tao:
continue
# Compute the outputs for these points if they go left or right ######################################
def predict_from_node(X, node_id):
"""Returns predictions for X starting at node node_id"""
def predict_from_node(x, node_id):
"""Returns predictions for x starting at node node_id"""
if children_left[node_id] == children_right[node_id]:
if isinstance(self, RegressorMixin):
return value[node_id]
if isinstance(self, ClassifierMixin):
# note value stores counts for each class
return np.argmax(value[node_id])
if x[feature[node_id]] <= threshold[node_id]:
return predict_from_node(x, children_left[node_id])
else:
return predict_from_node(x, children_right[node_id])
preds = np.zeros(X.shape[0])
for i in range(X.shape[0]):
preds[i] = predict_from_node(X[i], node_id)
return preds
y_node_left = predict_from_node(X_node, children_left[node_id])
y_node_right = predict_from_node(X_node, children_right[node_id])
if node_id == 0: # root node
assert np.all(np.logical_or(self.model.predict(X_node) == y_node_left,
self.model.predict(
X_node) == y_node_right)), \
'actual predictions should match either predict_from_node left or right'
# Decide on prediction target (want to go left (0) / right (1) when advantageous)
# TAO paper binarizes these (e.g. predict 0 or 1 depending on which of these is better)
y_node_absolute_errors = np.abs(np.vstack((y_node - y_node_left,
y_node - y_node_right))).T
# screen out indexes where going left/right has no effect
idxs_relevant = y_node_absolute_errors[:,
0] != y_node_absolute_errors[:, 1]
if idxs_relevant.sum() <= 1: # nothing to change
if self.verbose:
print('no errors to change')
continue
# assert np.all((self.model.predict(X) != y)[idxs_relevant]), 'relevant indexes should be errors'
y_node_target = np.argmin(y_node_absolute_errors, axis=1)
y_node_target = y_node_target[idxs_relevant]
# here, we optionally weight these errors by the size of the error
# if we want this to work for classification, must switch to predict_proba
# if self.prediction_task == 'regression':
# weight by the difference in error ###############################################################
if self.weight_errors:
sample_weight_node *= np.abs(
y_node_absolute_errors[:, 1] - y_node_absolute_errors[:, 0])
sample_weight_node_target = sample_weight_node[idxs_relevant]
X_node = X_node[idxs_relevant]
# Fit a 1-variable binary classification model on these outputs ######################################
# Note: this could be customized (e.g. for sparse oblique trees)
best_score = -np.inf
best_feat_num = None
for feat_num in range(X.shape[1]):
if isinstance(self, ClassifierMixin):
if self.node_model == 'linear':
m = LogisticRegression(**self.node_model_args)
elif self.node_model == 'stump':
m = DecisionTreeClassifier(
max_depth=1, **self.node_model_args)
if isinstance(self, RegressorMixin):
if self.node_model == 'linear':
m = LinearRegression(**self.node_model_args)
elif self.node_model == 'stump':
m = DecisionTreeRegressor(
max_depth=1, **self.node_model_args)
X_node_single_feat = X_node[:, feat_num: feat_num + 1]
m.fit(X_node_single_feat, y_node_target,
sample_weight=sample_weight_node_target)
score = m.score(X_node_single_feat, y_node_target,
sample_weight=sample_weight_node_target)
if score > best_score:
best_score = score
best_feat_num = feat_num
best_model = deepcopy(m)
if self.node_model == 'linear':
best_threshold = -best_model.intercept_ / \
best_model.coef_[0]
elif self.node_model == 'stump':
best_threshold = best_model.tree_.threshold[0]
# print((feature[node_id], threshold[node_id]), '\n->',
# (best_feat_num, best_threshold))
# Update the node with the new feature / threshold ######################################
old_feat_num = feature[node_id]
old_threshold = threshold[node_id]
# print(X.sum(), y.sum())
if X_score is None:
X_score = X
if y_score is None:
y_score = y
scorer = get_scorer(self.update_scoring)
old_score = scorer(self.model, X_score, y_score)
feature[node_id] = best_feat_num
threshold[node_id] = best_threshold
new_score = scorer(self.model, X_score, y_score)
# debugging
if self.verbose > 1:
if old_score == new_score:
print('\tno change', best_feat_num, old_feat_num)
print(f'\tscore_total {old_score:0.4f} -> {new_score:0.4f}')
if old_score >= new_score:
feature[node_id] = old_feat_num
threshold[node_id] = old_threshold
else:
# (Track if any updates were necessary)
num_updates += 1
if self.verbose > 0:
print(
f'Improved score from {old_score:0.4f} to {new_score:0.4f}')
# debugging snippet (if score_m_new > score_m_old, then new_score should be > old_score, but it isn't!!!!)
if self.verbose > 1:
"""
X_node_single_feat = X_node[:, best_feat_num: best_feat_num + 1]
score_m_new = best_model.score(X_node_single_feat, y_node_target, sample_weight=sample_weight)
best_model.tree_.feature[0] = old_feat_num
best_model.tree_.threshold[0] = old_threshold
X_node_single_feat = X_node[:, old_feat_num: old_feat_num + 1]
score_m_old = best_model.score(X_node_single_feat, y_node_target, sample_weight=sample_weight)
print('\t\t', f'score_local {score_m_old:0.4f} -> {score_m_new:0.4f}')
"""
return num_updates
def predict(self, X):
preds = self.model.predict(X)
if hasattr(self, "classes_"):
print("classes_", self.classes_, 'preds', preds)
return np.array([self.classes_[int(i)] for i in preds])
else:
return preds
def predict_proba(self, X):
return self.model.predict_proba(X)
# def score(self, X, y):
# return self.model.score(X, y)
class TaoTreeRegressor(TaoTree, RegressorMixin):
pass
class TaoTreeClassifier(TaoTree, ClassifierMixin):
pass
if __name__ == '__main__':
np.random.seed(13)
random.seed(13)
X, y = datasets.load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.33, random_state=42
)
print('X.shape', X.shape)
print('ys', np.unique(y_train), '\n\n')
m = TaoTreeClassifier(randomize_tree=False, weight_errors=False,
node_model='stump', model_args={'max_depth': 3},
verbose=1)
m.fit(X_train, y_train)
print('Train acc', np.mean(m.predict(X_train) == y_train))
print('Test acc', np.mean(m.predict(X_test) == y_test))
# print(m.predict(X_train), m.predict_proba(X_train).shape)
# print(m.predict_proba(X_train))
# X, y = datasets.load_diabetes(return_X_y=True) # regression
# X = np.random.randn(500, 10)
# y = (X[:, 0] > 0).astype(float) + (X[:, 1] > 1).astype(float)
# X_train, X_test, y_train, y_test = train_test_split(
# X, y, test_size=0.33, random_state=42
# )
# m = TaoRegressor()
# m.fit(X_train, y_train)
# print('mse', np.mean(np.square(m.predict(X_test) - y_test)),
# 'baseline', np.mean(np.square(y_test)))
Classes
class TaoTree (model_type: str = 'CART', n_iters: int = 20, model_args: dict = {'max_leaf_nodes': 15}, randomize_tree=False, update_scoring='accuracy', min_node_samples_tao=3, min_leaf_samples_tao=2, node_model='stump', node_model_args: dict = {}, reg_param: float = 0.001, weight_errors: bool = False, verbose: int = 0)
-
Base class for all estimators in scikit-learn.
Inheriting from this class provides default implementations of:
- setting and getting parameters used by
GridSearchCV
and friends; - textual and HTML representation displayed in terminals and IDEs;
- estimator serialization;
- parameters validation;
- data validation;
- feature names validation.
Read more in the :ref:
User Guide <rolling_your_own_estimator>
.Notes
All estimators should specify all the parameters that can be set at the class level in their
__init__
as explicit keyword arguments (no*args
or**kwargs
).Examples
>>> import numpy as np >>> from sklearn.base import BaseEstimator >>> class MyEstimator(BaseEstimator): ... def __init__(self, *, param=1): ... self.param = param ... def fit(self, X, y=None): ... self.is_fitted_ = True ... return self ... def predict(self, X): ... return np.full(shape=X.shape[0], fill_value=self.param) >>> estimator = MyEstimator(param=2) >>> estimator.get_params() {'param': 2} >>> X = np.array([[1, 2], [2, 3], [3, 4]]) >>> y = np.array([1, 0, 1]) >>> estimator.fit(X, y).predict(X) array([2, 2, 2]) >>> estimator.set_params(param=3).fit(X, y).predict(X) array([3, 3, 3])
TAO: Alternating optimization of decision trees, with application to learning sparse oblique trees (Neurips 2018) https://proceedings.neurips.cc/paper/2018/hash/185c29dc24325934ee377cfda20e414c-Abstract.html Note: this implementation learns single-feature splits rather than oblique trees.
Currently supports - given a CART tree, posthoc improve it with TAO - also works with HSTreeCV
Todo - update bottom to top otherwise input points don't get updated - update leaf nodes - support regression - support FIGS - support error-weighting - support oblique trees - support generic models at decision node - support pruning (e.g. if weights -> 0, then remove a node) - support classifiers in leaves
Parameters
model_type
:str
- 'CART' or 'FIGS'
n_iters
- Number of iterations to run TAO
model_args
- Arguments to pass to the model
randomize_tree
- Whether to randomize the tree before each iteration
min_node_samples_tao
:int
- Minimum number of samples in a node to apply tao
min_leaf_samples_tao
:int
node_model
:str
- 'stump' or 'linear'
reg_param
- Regularization parameter for node-wise linear model (if node_model is 'linear')
verbose
:int
- Verbosity level
Expand source code
class TaoTree(BaseEstimator): def __init__(self, model_type: str = 'CART', n_iters: int = 20, model_args: dict = {'max_leaf_nodes': 15}, randomize_tree=False, update_scoring='accuracy', min_node_samples_tao=3, min_leaf_samples_tao=2, node_model='stump', node_model_args: dict = {}, reg_param: float = 1e-3, weight_errors: bool = False, verbose: int = 0, ): """TAO: Alternating optimization of decision trees, with application to learning sparse oblique trees (Neurips 2018) https://proceedings.neurips.cc/paper/2018/hash/185c29dc24325934ee377cfda20e414c-Abstract.html Note: this implementation learns single-feature splits rather than oblique trees. Currently supports - given a CART tree, posthoc improve it with TAO - also works with HSTreeCV Todo - update bottom to top otherwise input points don't get updated - update leaf nodes - support regression - support FIGS - support error-weighting - support oblique trees - support generic models at decision node - support pruning (e.g. if weights -> 0, then remove a node) - support classifiers in leaves Parameters ---------- model_type: str 'CART' or 'FIGS' n_iters Number of iterations to run TAO model_args Arguments to pass to the model randomize_tree Whether to randomize the tree before each iteration min_node_samples_tao: int Minimum number of samples in a node to apply tao min_leaf_samples_tao: int node_model: str 'stump' or 'linear' reg_param Regularization parameter for node-wise linear model (if node_model is 'linear') verbose: int Verbosity level """ super().__init__() self.model_type = model_type self.n_iters = n_iters self.model_args = model_args self.randomize_tree = randomize_tree self.update_scoring = update_scoring self.min_node_samples_tao = min_node_samples_tao self.min_leaf_samples_tao = min_leaf_samples_tao self.node_model = node_model self.node_model_args = node_model_args self.reg_param = reg_param self.weight_errors = weight_errors self.verbose = verbose self._init_prediction_task() # decides between regressor and classifier def _init_prediction_task(self): """ TaoRegressor and TaoClassifier override this method to alter the prediction task. When using this class directly, it is equivalent to SuperCARTRegressor """ self.prediction_task = 'classification' def fit(self, X, y=None, feature_names=None, sample_weight=None): """ Params ------ _sample_weight: array-like of shape (n_samples,), default=None Sample weights. If None, then samples are equally weighted. Splits that would create child nodes with net zero or negative weight are ignored while searching for a split in each node. """ X, y, feature_names = check_fit_arguments(self, X, y, feature_names) if isinstance(self, RegressorMixin): raise Warning('TAO Regression is not yet tested') X, y = check_X_y(X, y) y = y.astype(float) if feature_names is not None: self.feature_names_ = feature_names if self.model_type == 'CART': if isinstance(self, ClassifierMixin): self.model = DecisionTreeClassifier(**self.model_args) elif isinstance(self, RegressorMixin): self.model = DecisionTreeRegressor(**self.model_args) self.model.fit(X, y, sample_weight=sample_weight) if self.verbose: print(export_text(self.model)) # plot_tree(self.model) # plt.savefig('/Users/chandan/Desktop/tree.png', dpi=300) # plt.show() if self.randomize_tree: # shuffle CART features np.random.shuffle(self.model.tree_.feature) # np.random.shuffle(self.model.tree_.threshold) for i in range(self.model.tree_.node_count): # split on feature medians self.model.tree_.threshold[i] = np.median( X[:, self.model.tree_.feature[i]]) if self.verbose: print('starting score', self.model.score(X, y)) for i in range(self.n_iters): num_updates = self._tao_iter_cart( X, y, self.model.tree_, sample_weight=sample_weight) if num_updates == 0: break return self def _tao_iter_cart(self, X, y, tree, X_score=None, y_score=None, sample_weight=None): """Updates tree by applying the tao algorithm to the tree Params ------ X: array-like of shape (n_samples, n_features) The input samples. y: array-like of shape (n_samples,) The target values. model: DecisionTreeClassifier.tree_ or DecisionTreeRegressor.tree_ The model to be post-hoc improved """ # Tree properties children_left = tree.children_left children_right = tree.children_right feature = tree.feature threshold = tree.threshold value = tree.value # For each node, store the path to that node ####################################################### # data structure with (index, path_to_node_index) indexes_with_prefix_paths = [] # e.g. if if node 3 is the left child of node 1 which is the right child of node 0 # then we get (3, [(0, R), (1, L)]) # start with the root node id (0) and its depth (0) queue = deque() queue.append((0, [])) while len(queue) > 0: node_id, path_to_node_index = queue.popleft() indexes_with_prefix_paths.append((node_id, path_to_node_index)) # If a split node, append left and right children and depth to queue if children_left[node_id] != children_right[node_id]: queue.append( (children_left[node_id], path_to_node_index + [(node_id, 'L')])) queue.append( (children_right[node_id], path_to_node_index + [(node_id, 'R')])) # print(indexes_with_prefix_paths) num_updates = 0 # Reversing BFS queue presents nodes bottom -> top one level at a time for (node_id, path_to_node_index) in reversed(indexes_with_prefix_paths): # For each each node, try a TAO update # print('node_id', node_id, path_to_node_index) # Compute the points being input to the node ###################################### def filter_points_by_path(X, y, path_to_node_index): """Returns the points in X that are in the path to the node""" for node_id, direction in path_to_node_index: idxs = X[:, feature[node_id]] <= threshold[node_id] if direction == 'R': idxs = ~idxs # print('idxs', idxs.size, idxs.sum()) X = X[idxs] y = y[idxs] return X, y X_node, y_node = filter_points_by_path(X, y, path_to_node_index) if sample_weight is not None: sample_weight_node = filter_points_by_path( X, sample_weight, path_to_node_index)[1] else: sample_weight_node = np.ones(y_node.size) # Skip over leaf nodes and nodes with too few samples ###################################### if children_left[node_id] == children_right[node_id]: # is leaf node if isinstance(self, RegressorMixin) and X_node.shape[0] >= self.min_leaf_samples_tao: # old_score = self.model.score(X, y) value[node_id] = np.mean(y_node) """ new_score = self.model.score(X, y) if new_score > old_score: print(f'\tLeaf improved score from {old_score:0.3f} to {new_score:0.3f}') if new_score < old_score: print(f'\tLeaf reduced score from {old_score:0.3f} to {new_score:0.3f}') # raise ValueError('Leaf update reduced score') """ # print('\tshapes', X_node.shape, y_node.shape) # print('\tvals:', value[node_id][0][0], np.mean(y_node)) # assert value[node_id][0][0] == np.mean(y_node), 'unless tree changed, vals should be leaf means' continue elif X_node.shape[0] < self.min_node_samples_tao: continue # Compute the outputs for these points if they go left or right ###################################### def predict_from_node(X, node_id): """Returns predictions for X starting at node node_id""" def predict_from_node(x, node_id): """Returns predictions for x starting at node node_id""" if children_left[node_id] == children_right[node_id]: if isinstance(self, RegressorMixin): return value[node_id] if isinstance(self, ClassifierMixin): # note value stores counts for each class return np.argmax(value[node_id]) if x[feature[node_id]] <= threshold[node_id]: return predict_from_node(x, children_left[node_id]) else: return predict_from_node(x, children_right[node_id]) preds = np.zeros(X.shape[0]) for i in range(X.shape[0]): preds[i] = predict_from_node(X[i], node_id) return preds y_node_left = predict_from_node(X_node, children_left[node_id]) y_node_right = predict_from_node(X_node, children_right[node_id]) if node_id == 0: # root node assert np.all(np.logical_or(self.model.predict(X_node) == y_node_left, self.model.predict( X_node) == y_node_right)), \ 'actual predictions should match either predict_from_node left or right' # Decide on prediction target (want to go left (0) / right (1) when advantageous) # TAO paper binarizes these (e.g. predict 0 or 1 depending on which of these is better) y_node_absolute_errors = np.abs(np.vstack((y_node - y_node_left, y_node - y_node_right))).T # screen out indexes where going left/right has no effect idxs_relevant = y_node_absolute_errors[:, 0] != y_node_absolute_errors[:, 1] if idxs_relevant.sum() <= 1: # nothing to change if self.verbose: print('no errors to change') continue # assert np.all((self.model.predict(X) != y)[idxs_relevant]), 'relevant indexes should be errors' y_node_target = np.argmin(y_node_absolute_errors, axis=1) y_node_target = y_node_target[idxs_relevant] # here, we optionally weight these errors by the size of the error # if we want this to work for classification, must switch to predict_proba # if self.prediction_task == 'regression': # weight by the difference in error ############################################################### if self.weight_errors: sample_weight_node *= np.abs( y_node_absolute_errors[:, 1] - y_node_absolute_errors[:, 0]) sample_weight_node_target = sample_weight_node[idxs_relevant] X_node = X_node[idxs_relevant] # Fit a 1-variable binary classification model on these outputs ###################################### # Note: this could be customized (e.g. for sparse oblique trees) best_score = -np.inf best_feat_num = None for feat_num in range(X.shape[1]): if isinstance(self, ClassifierMixin): if self.node_model == 'linear': m = LogisticRegression(**self.node_model_args) elif self.node_model == 'stump': m = DecisionTreeClassifier( max_depth=1, **self.node_model_args) if isinstance(self, RegressorMixin): if self.node_model == 'linear': m = LinearRegression(**self.node_model_args) elif self.node_model == 'stump': m = DecisionTreeRegressor( max_depth=1, **self.node_model_args) X_node_single_feat = X_node[:, feat_num: feat_num + 1] m.fit(X_node_single_feat, y_node_target, sample_weight=sample_weight_node_target) score = m.score(X_node_single_feat, y_node_target, sample_weight=sample_weight_node_target) if score > best_score: best_score = score best_feat_num = feat_num best_model = deepcopy(m) if self.node_model == 'linear': best_threshold = -best_model.intercept_ / \ best_model.coef_[0] elif self.node_model == 'stump': best_threshold = best_model.tree_.threshold[0] # print((feature[node_id], threshold[node_id]), '\n->', # (best_feat_num, best_threshold)) # Update the node with the new feature / threshold ###################################### old_feat_num = feature[node_id] old_threshold = threshold[node_id] # print(X.sum(), y.sum()) if X_score is None: X_score = X if y_score is None: y_score = y scorer = get_scorer(self.update_scoring) old_score = scorer(self.model, X_score, y_score) feature[node_id] = best_feat_num threshold[node_id] = best_threshold new_score = scorer(self.model, X_score, y_score) # debugging if self.verbose > 1: if old_score == new_score: print('\tno change', best_feat_num, old_feat_num) print(f'\tscore_total {old_score:0.4f} -> {new_score:0.4f}') if old_score >= new_score: feature[node_id] = old_feat_num threshold[node_id] = old_threshold else: # (Track if any updates were necessary) num_updates += 1 if self.verbose > 0: print( f'Improved score from {old_score:0.4f} to {new_score:0.4f}') # debugging snippet (if score_m_new > score_m_old, then new_score should be > old_score, but it isn't!!!!) if self.verbose > 1: """ X_node_single_feat = X_node[:, best_feat_num: best_feat_num + 1] score_m_new = best_model.score(X_node_single_feat, y_node_target, sample_weight=sample_weight) best_model.tree_.feature[0] = old_feat_num best_model.tree_.threshold[0] = old_threshold X_node_single_feat = X_node[:, old_feat_num: old_feat_num + 1] score_m_old = best_model.score(X_node_single_feat, y_node_target, sample_weight=sample_weight) print('\t\t', f'score_local {score_m_old:0.4f} -> {score_m_new:0.4f}') """ return num_updates def predict(self, X): preds = self.model.predict(X) if hasattr(self, "classes_"): print("classes_", self.classes_, 'preds', preds) return np.array([self.classes_[int(i)] for i in preds]) else: return preds def predict_proba(self, X): return self.model.predict_proba(X) # def score(self, X, y): # return self.model.score(X, y)
Ancestors
- sklearn.base.BaseEstimator
- sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
Subclasses
Methods
def fit(self, X, y=None, feature_names=None, sample_weight=None)
-
Params
_sample_weight: array-like of shape (n_samples,), default=None Sample weights. If None, then samples are equally weighted. Splits that would create child nodes with net zero or negative weight are ignored while searching for a split in each node.
Expand source code
def fit(self, X, y=None, feature_names=None, sample_weight=None): """ Params ------ _sample_weight: array-like of shape (n_samples,), default=None Sample weights. If None, then samples are equally weighted. Splits that would create child nodes with net zero or negative weight are ignored while searching for a split in each node. """ X, y, feature_names = check_fit_arguments(self, X, y, feature_names) if isinstance(self, RegressorMixin): raise Warning('TAO Regression is not yet tested') X, y = check_X_y(X, y) y = y.astype(float) if feature_names is not None: self.feature_names_ = feature_names if self.model_type == 'CART': if isinstance(self, ClassifierMixin): self.model = DecisionTreeClassifier(**self.model_args) elif isinstance(self, RegressorMixin): self.model = DecisionTreeRegressor(**self.model_args) self.model.fit(X, y, sample_weight=sample_weight) if self.verbose: print(export_text(self.model)) # plot_tree(self.model) # plt.savefig('/Users/chandan/Desktop/tree.png', dpi=300) # plt.show() if self.randomize_tree: # shuffle CART features np.random.shuffle(self.model.tree_.feature) # np.random.shuffle(self.model.tree_.threshold) for i in range(self.model.tree_.node_count): # split on feature medians self.model.tree_.threshold[i] = np.median( X[:, self.model.tree_.feature[i]]) if self.verbose: print('starting score', self.model.score(X, y)) for i in range(self.n_iters): num_updates = self._tao_iter_cart( X, y, self.model.tree_, sample_weight=sample_weight) if num_updates == 0: break return self
def predict(self, X)
-
Expand source code
def predict(self, X): preds = self.model.predict(X) if hasattr(self, "classes_"): print("classes_", self.classes_, 'preds', preds) return np.array([self.classes_[int(i)] for i in preds]) else: return preds
def predict_proba(self, X)
-
Expand source code
def predict_proba(self, X): return self.model.predict_proba(X) # def score(self, X, y): # return self.model.score(X, y)
def set_fit_request(self: TaoTree, *, feature_names: Union[bool, ForwardRef(None), str] = '$UNCHANGED$', sample_weight: Union[bool, ForwardRef(None), str] = '$UNCHANGED$') ‑> TaoTree
-
Request metadata passed to the
fit
method.Note that this method is only relevant if
enable_metadata_routing=True
(see :func:sklearn.set_config
). Please see :ref:User Guide <metadata_routing>
on how the routing mechanism works.The options for each parameter are:
-
True
: metadata is requested, and passed tofit
if provided. The request is ignored if metadata is not provided. -
False
: metadata is not requested and the meta-estimator will not pass it tofit
. -
None
: metadata is not requested, and the meta-estimator will raise an error if the user provides it. -
str
: metadata should be passed to the meta-estimator with this given alias instead of the original name.
The default (
sklearn.utils.metadata_routing.UNCHANGED
) retains the existing request. This allows you to change the request for some parameters and not others.Added in version: 1.3
Note
This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:
~sklearn.pipeline.Pipeline
. Otherwise it has no effect.Parameters
feature_names
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
feature_names
parameter infit
. sample_weight
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
sample_weight
parameter infit
.
Returns
self
:object
- The updated object.
Expand source code
def func(*args, **kw): """Updates the request for provided parameters This docstring is overwritten below. See REQUESTER_DOC for expected functionality """ if not _routing_enabled(): raise RuntimeError( "This method is only available when metadata routing is enabled." " You can enable it using" " sklearn.set_config(enable_metadata_routing=True)." ) if self.validate_keys and (set(kw) - set(self.keys)): raise TypeError( f"Unexpected args: {set(kw) - set(self.keys)} in {self.name}. " f"Accepted arguments are: {set(self.keys)}" ) # This makes it possible to use the decorated method as an unbound method, # for instance when monkeypatching. # https://github.com/scikit-learn/scikit-learn/issues/28632 if instance is None: _instance = args[0] args = args[1:] else: _instance = instance # Replicating python's behavior when positional args are given other than # `self`, and `self` is only allowed if this method is unbound. if args: raise TypeError( f"set_{self.name}_request() takes 0 positional argument but" f" {len(args)} were given" ) requests = _instance._get_metadata_request() method_metadata_request = getattr(requests, self.name) for prop, alias in kw.items(): if alias is not UNCHANGED: method_metadata_request.add_request(param=prop, alias=alias) _instance._metadata_request = requests return _instance
-
- setting and getting parameters used by
class TaoTreeClassifier (model_type: str = 'CART', n_iters: int = 20, model_args: dict = {'max_leaf_nodes': 15}, randomize_tree=False, update_scoring='accuracy', min_node_samples_tao=3, min_leaf_samples_tao=2, node_model='stump', node_model_args: dict = {}, reg_param: float = 0.001, weight_errors: bool = False, verbose: int = 0)
-
Base class for all estimators in scikit-learn.
Inheriting from this class provides default implementations of:
- setting and getting parameters used by
GridSearchCV
and friends; - textual and HTML representation displayed in terminals and IDEs;
- estimator serialization;
- parameters validation;
- data validation;
- feature names validation.
Read more in the :ref:
User Guide <rolling_your_own_estimator>
.Notes
All estimators should specify all the parameters that can be set at the class level in their
__init__
as explicit keyword arguments (no*args
or**kwargs
).Examples
>>> import numpy as np >>> from sklearn.base import BaseEstimator >>> class MyEstimator(BaseEstimator): ... def __init__(self, *, param=1): ... self.param = param ... def fit(self, X, y=None): ... self.is_fitted_ = True ... return self ... def predict(self, X): ... return np.full(shape=X.shape[0], fill_value=self.param) >>> estimator = MyEstimator(param=2) >>> estimator.get_params() {'param': 2} >>> X = np.array([[1, 2], [2, 3], [3, 4]]) >>> y = np.array([1, 0, 1]) >>> estimator.fit(X, y).predict(X) array([2, 2, 2]) >>> estimator.set_params(param=3).fit(X, y).predict(X) array([3, 3, 3])
TAO: Alternating optimization of decision trees, with application to learning sparse oblique trees (Neurips 2018) https://proceedings.neurips.cc/paper/2018/hash/185c29dc24325934ee377cfda20e414c-Abstract.html Note: this implementation learns single-feature splits rather than oblique trees.
Currently supports - given a CART tree, posthoc improve it with TAO - also works with HSTreeCV
Todo - update bottom to top otherwise input points don't get updated - update leaf nodes - support regression - support FIGS - support error-weighting - support oblique trees - support generic models at decision node - support pruning (e.g. if weights -> 0, then remove a node) - support classifiers in leaves
Parameters
model_type
:str
- 'CART' or 'FIGS'
n_iters
- Number of iterations to run TAO
model_args
- Arguments to pass to the model
randomize_tree
- Whether to randomize the tree before each iteration
min_node_samples_tao
:int
- Minimum number of samples in a node to apply tao
min_leaf_samples_tao
:int
node_model
:str
- 'stump' or 'linear'
reg_param
- Regularization parameter for node-wise linear model (if node_model is 'linear')
verbose
:int
- Verbosity level
Expand source code
class TaoTreeClassifier(TaoTree, ClassifierMixin): pass
Ancestors
- TaoTree
- sklearn.base.BaseEstimator
- sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
- sklearn.base.ClassifierMixin
Methods
def set_score_request(self: TaoTreeClassifier, *, sample_weight: Union[bool, ForwardRef(None), str] = '$UNCHANGED$') ‑> TaoTreeClassifier
-
Request metadata passed to the
score
method.Note that this method is only relevant if
enable_metadata_routing=True
(see :func:sklearn.set_config
). Please see :ref:User Guide <metadata_routing>
on how the routing mechanism works.The options for each parameter are:
-
True
: metadata is requested, and passed toscore
if provided. The request is ignored if metadata is not provided. -
False
: metadata is not requested and the meta-estimator will not pass it toscore
. -
None
: metadata is not requested, and the meta-estimator will raise an error if the user provides it. -
str
: metadata should be passed to the meta-estimator with this given alias instead of the original name.
The default (
sklearn.utils.metadata_routing.UNCHANGED
) retains the existing request. This allows you to change the request for some parameters and not others.Added in version: 1.3
Note
This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:
~sklearn.pipeline.Pipeline
. Otherwise it has no effect.Parameters
sample_weight
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
sample_weight
parameter inscore
.
Returns
self
:object
- The updated object.
Expand source code
def func(*args, **kw): """Updates the request for provided parameters This docstring is overwritten below. See REQUESTER_DOC for expected functionality """ if not _routing_enabled(): raise RuntimeError( "This method is only available when metadata routing is enabled." " You can enable it using" " sklearn.set_config(enable_metadata_routing=True)." ) if self.validate_keys and (set(kw) - set(self.keys)): raise TypeError( f"Unexpected args: {set(kw) - set(self.keys)} in {self.name}. " f"Accepted arguments are: {set(self.keys)}" ) # This makes it possible to use the decorated method as an unbound method, # for instance when monkeypatching. # https://github.com/scikit-learn/scikit-learn/issues/28632 if instance is None: _instance = args[0] args = args[1:] else: _instance = instance # Replicating python's behavior when positional args are given other than # `self`, and `self` is only allowed if this method is unbound. if args: raise TypeError( f"set_{self.name}_request() takes 0 positional argument but" f" {len(args)} were given" ) requests = _instance._get_metadata_request() method_metadata_request = getattr(requests, self.name) for prop, alias in kw.items(): if alias is not UNCHANGED: method_metadata_request.add_request(param=prop, alias=alias) _instance._metadata_request = requests return _instance
-
Inherited members
- setting and getting parameters used by
class TaoTreeRegressor (model_type: str = 'CART', n_iters: int = 20, model_args: dict = {'max_leaf_nodes': 15}, randomize_tree=False, update_scoring='accuracy', min_node_samples_tao=3, min_leaf_samples_tao=2, node_model='stump', node_model_args: dict = {}, reg_param: float = 0.001, weight_errors: bool = False, verbose: int = 0)
-
Base class for all estimators in scikit-learn.
Inheriting from this class provides default implementations of:
- setting and getting parameters used by
GridSearchCV
and friends; - textual and HTML representation displayed in terminals and IDEs;
- estimator serialization;
- parameters validation;
- data validation;
- feature names validation.
Read more in the :ref:
User Guide <rolling_your_own_estimator>
.Notes
All estimators should specify all the parameters that can be set at the class level in their
__init__
as explicit keyword arguments (no*args
or**kwargs
).Examples
>>> import numpy as np >>> from sklearn.base import BaseEstimator >>> class MyEstimator(BaseEstimator): ... def __init__(self, *, param=1): ... self.param = param ... def fit(self, X, y=None): ... self.is_fitted_ = True ... return self ... def predict(self, X): ... return np.full(shape=X.shape[0], fill_value=self.param) >>> estimator = MyEstimator(param=2) >>> estimator.get_params() {'param': 2} >>> X = np.array([[1, 2], [2, 3], [3, 4]]) >>> y = np.array([1, 0, 1]) >>> estimator.fit(X, y).predict(X) array([2, 2, 2]) >>> estimator.set_params(param=3).fit(X, y).predict(X) array([3, 3, 3])
TAO: Alternating optimization of decision trees, with application to learning sparse oblique trees (Neurips 2018) https://proceedings.neurips.cc/paper/2018/hash/185c29dc24325934ee377cfda20e414c-Abstract.html Note: this implementation learns single-feature splits rather than oblique trees.
Currently supports - given a CART tree, posthoc improve it with TAO - also works with HSTreeCV
Todo - update bottom to top otherwise input points don't get updated - update leaf nodes - support regression - support FIGS - support error-weighting - support oblique trees - support generic models at decision node - support pruning (e.g. if weights -> 0, then remove a node) - support classifiers in leaves
Parameters
model_type
:str
- 'CART' or 'FIGS'
n_iters
- Number of iterations to run TAO
model_args
- Arguments to pass to the model
randomize_tree
- Whether to randomize the tree before each iteration
min_node_samples_tao
:int
- Minimum number of samples in a node to apply tao
min_leaf_samples_tao
:int
node_model
:str
- 'stump' or 'linear'
reg_param
- Regularization parameter for node-wise linear model (if node_model is 'linear')
verbose
:int
- Verbosity level
Expand source code
class TaoTreeRegressor(TaoTree, RegressorMixin): pass
Ancestors
- TaoTree
- sklearn.base.BaseEstimator
- sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
- sklearn.base.RegressorMixin
Methods
def set_score_request(self: TaoTreeRegressor, *, sample_weight: Union[bool, ForwardRef(None), str] = '$UNCHANGED$') ‑> TaoTreeRegressor
-
Request metadata passed to the
score
method.Note that this method is only relevant if
enable_metadata_routing=True
(see :func:sklearn.set_config
). Please see :ref:User Guide <metadata_routing>
on how the routing mechanism works.The options for each parameter are:
-
True
: metadata is requested, and passed toscore
if provided. The request is ignored if metadata is not provided. -
False
: metadata is not requested and the meta-estimator will not pass it toscore
. -
None
: metadata is not requested, and the meta-estimator will raise an error if the user provides it. -
str
: metadata should be passed to the meta-estimator with this given alias instead of the original name.
The default (
sklearn.utils.metadata_routing.UNCHANGED
) retains the existing request. This allows you to change the request for some parameters and not others.Added in version: 1.3
Note
This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:
~sklearn.pipeline.Pipeline
. Otherwise it has no effect.Parameters
sample_weight
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
sample_weight
parameter inscore
.
Returns
self
:object
- The updated object.
Expand source code
def func(*args, **kw): """Updates the request for provided parameters This docstring is overwritten below. See REQUESTER_DOC for expected functionality """ if not _routing_enabled(): raise RuntimeError( "This method is only available when metadata routing is enabled." " You can enable it using" " sklearn.set_config(enable_metadata_routing=True)." ) if self.validate_keys and (set(kw) - set(self.keys)): raise TypeError( f"Unexpected args: {set(kw) - set(self.keys)} in {self.name}. " f"Accepted arguments are: {set(self.keys)}" ) # This makes it possible to use the decorated method as an unbound method, # for instance when monkeypatching. # https://github.com/scikit-learn/scikit-learn/issues/28632 if instance is None: _instance = args[0] args = args[1:] else: _instance = instance # Replicating python's behavior when positional args are given other than # `self`, and `self` is only allowed if this method is unbound. if args: raise TypeError( f"set_{self.name}_request() takes 0 positional argument but" f" {len(args)} were given" ) requests = _instance._get_metadata_request() method_metadata_request = getattr(requests, self.name) for prop, alias in kw.items(): if alias is not UNCHANGED: method_metadata_request.add_request(param=prop, alias=alias) _instance._metadata_request = requests return _instance
-
Inherited members
- setting and getting parameters used by