Expand source code
import random
from copy import deepcopy
from queue import deque
import numpy as np
from mlxtend.classifier import LogisticRegression
from sklearn import datasets
from sklearn.base import BaseEstimator, RegressorMixin, ClassifierMixin
from sklearn.linear_model import LinearRegression
from sklearn.metrics import get_scorer
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor, export_text
from sklearn.utils import check_X_y
class TaoTree(BaseEstimator):
def __init__(self, model_type: str = 'CART',
n_iters: int = 20,
model_args: dict = {'max_leaf_nodes': 15},
randomize_tree=False,
update_scoring='accuracy',
min_node_samples_tao=3,
min_leaf_samples_tao=2,
node_model='stump',
node_model_args: dict = {},
reg_param: float = 1e-3,
weight_errors: bool = False,
verbose: int = 0,
):
"""TAO: Alternating optimization of decision trees, with application to learning sparse oblique trees (Neurips 2018)
https://proceedings.neurips.cc/paper/2018/hash/185c29dc24325934ee377cfda20e414c-Abstract.html
Note: this implementation learns single-feature splits rather than oblique trees.
Currently supports
- given a CART tree, posthoc improve it with TAO
- also works with HSTreeCV
Todo
- update bottom to top otherwise input points don't get updated
- update leaf nodes
- support regression
- support FIGS
- support error-weighting
- support oblique trees
- support generic models at decision node
- support pruning (e.g. if weights -> 0, then remove a node)
- support classifiers in leaves
Parameters
----------
model_type: str
'CART' or 'FIGS'
n_iters
Number of iterations to run TAO
model_args
Arguments to pass to the model
randomize_tree
Whether to randomize the tree before each iteration
min_node_samples_tao: int
Minimum number of samples in a node to apply tao
min_leaf_samples_tao: int
node_model: str
'stump' or 'linear'
reg_param
Regularization parameter for node-wise linear model (if node_model is 'linear')
verbose: int
Verbosity level
"""
super().__init__()
self.model_type = model_type
self.n_iters = n_iters
self.model_args = model_args
self.randomize_tree = randomize_tree
self.update_scoring = update_scoring
self.min_node_samples_tao = min_node_samples_tao
self.min_leaf_samples_tao = min_leaf_samples_tao
self.node_model = node_model
self.node_model_args = node_model_args
self.reg_param = reg_param
self.weight_errors = weight_errors
self.verbose = verbose
self._init_prediction_task() # decides between regressor and classifier
def _init_prediction_task(self):
"""
TaoRegressor and TaoClassifier override this method
to alter the prediction task. When using this class directly,
it is equivalent to SuperCARTRegressor
"""
self.prediction_task = 'classification'
def fit(self, X, y=None, feature_names=None, sample_weight=None):
"""
Params
------
_sample_weight: array-like of shape (n_samples,), default=None
Sample weights. If None, then samples are equally weighted.
Splits that would create child nodes with net zero or negative weight
are ignored while searching for a split in each node.
"""
if self.prediction_task == 'classification':
self.classes_, y = np.unique(y, return_inverse=True) # deals with str inputs
else:
raise Warning('TAO Regression is not yet tested')
X, y = check_X_y(X, y)
y = y.astype(float)
if feature_names is not None:
self.feature_names_ = feature_names
if self.model_type == 'CART':
if self.prediction_task == 'classification':
self.model = DecisionTreeClassifier(**self.model_args)
elif self.prediction_task == 'regression':
self.model = DecisionTreeRegressor(**self.model_args)
self.model.fit(X, y, sample_weight=sample_weight)
if self.verbose:
print(export_text(self.model))
# plot_tree(self.model)
# plt.savefig('/Users/chandan/Desktop/tree.png', dpi=300)
# plt.show()
if self.randomize_tree:
np.random.shuffle(self.model.tree_.feature) # shuffle CART features
# np.random.shuffle(self.model.tree_.threshold)
for i in range(self.model.tree_.node_count): # split on feature medians
self.model.tree_.threshold[i] = np.median(
X[:, self.model.tree_.feature[i]])
if self.verbose:
print('starting score', self.model.score(X, y))
for i in range(self.n_iters):
num_updates = self._tao_iter_cart(X, y, self.model.tree_, sample_weight=sample_weight)
if num_updates == 0:
break
return self
def _tao_iter_cart(self, X, y, tree, X_score=None, y_score=None, sample_weight=None):
"""Updates tree by applying the tao algorithm to the tree
Params
------
X: array-like of shape (n_samples, n_features)
The input samples.
y: array-like of shape (n_samples,)
The target values.
model: DecisionTreeClassifier.tree_ or DecisionTreeRegressor.tree_
The model to be post-hoc improved
"""
# Tree properties
children_left = tree.children_left
children_right = tree.children_right
feature = tree.feature
threshold = tree.threshold
value = tree.value
# For each node, store the path to that node #######################################################
indexes_with_prefix_paths = [] # data structure with (index, path_to_node_index)
# e.g. if if node 3 is the left child of node 1 which is the right child of node 0
# then we get (3, [(0, R), (1, L)])
# start with the root node id (0) and its depth (0)
queue = deque()
queue.append((0, []))
while len(queue) > 0:
node_id, path_to_node_index = queue.popleft()
indexes_with_prefix_paths.append((node_id, path_to_node_index))
# If a split node, append left and right children and depth to queue
if children_left[node_id] != children_right[node_id]:
queue.append((children_left[node_id], path_to_node_index + [(node_id, 'L')]))
queue.append((children_right[node_id], path_to_node_index + [(node_id, 'R')]))
# print(indexes_with_prefix_paths)
num_updates = 0
# Reversing BFS queue presents nodes bottom -> top one level at a time
for (node_id, path_to_node_index) in reversed(indexes_with_prefix_paths):
# For each each node, try a TAO update
# print('node_id', node_id, path_to_node_index)
# Compute the points being input to the node ######################################
def filter_points_by_path(X, y, path_to_node_index):
"""Returns the points in X that are in the path to the node"""
for node_id, direction in path_to_node_index:
idxs = X[:, feature[node_id]] <= threshold[node_id]
if direction == 'R':
idxs = ~idxs
# print('idxs', idxs.size, idxs.sum())
X = X[idxs]
y = y[idxs]
return X, y
X_node, y_node = filter_points_by_path(X, y, path_to_node_index)
if sample_weight is not None:
sample_weight_node = filter_points_by_path(X, sample_weight, path_to_node_index)[1]
else:
sample_weight_node = np.ones(y_node.size)
# Skip over leaf nodes and nodes with too few samples ######################################
if children_left[node_id] == children_right[node_id]: # is leaf node
if self.prediction_task == 'regression' and X_node.shape[0] >= self.min_leaf_samples_tao:
# old_score = self.model.score(X, y)
value[node_id] = np.mean(y_node)
"""
new_score = self.model.score(X, y)
if new_score > old_score:
print(f'\tLeaf improved score from {old_score:0.3f} to {new_score:0.3f}')
if new_score < old_score:
print(f'\tLeaf reduced score from {old_score:0.3f} to {new_score:0.3f}')
# raise ValueError('Leaf update reduced score')
"""
# print('\tshapes', X_node.shape, y_node.shape)
# print('\tvals:', value[node_id][0][0], np.mean(y_node))
# assert value[node_id][0][0] == np.mean(y_node), 'unless tree changed, vals should be leaf means'
continue
elif X_node.shape[0] < self.min_node_samples_tao:
continue
# Compute the outputs for these points if they go left or right ######################################
def predict_from_node(X, node_id):
"""Returns predictions for X starting at node node_id"""
def predict_from_node(x, node_id):
"""Returns predictions for x starting at node node_id"""
if children_left[node_id] == children_right[node_id]:
if self.prediction_task == 'regression':
return value[node_id]
elif self.prediction_task == 'classification':
return np.argmax(value[node_id]) # note value stores counts for each class
if x[feature[node_id]] <= threshold[node_id]:
return predict_from_node(x, children_left[node_id])
else:
return predict_from_node(x, children_right[node_id])
preds = np.zeros(X.shape[0])
for i in range(X.shape[0]):
preds[i] = predict_from_node(X[i], node_id)
return preds
y_node_left = predict_from_node(X_node, children_left[node_id])
y_node_right = predict_from_node(X_node, children_right[node_id])
if node_id == 0: # root node
assert np.all(np.logical_or(self.model.predict(X_node) == y_node_left,
self.model.predict(
X_node) == y_node_right)), \
'actual predictions should match either predict_from_node left or right'
# Decide on prediction target (want to go left (0) / right (1) when advantageous)
# TAO paper binarizes these (e.g. predict 0 or 1 depending on which of these is better)
y_node_absolute_errors = np.abs(np.vstack((y_node - y_node_left,
y_node - y_node_right))).T
# screen out indexes where going left/right has no effect
idxs_relevant = y_node_absolute_errors[:, 0] != y_node_absolute_errors[:, 1]
if idxs_relevant.sum() <= 1: # nothing to change
if self.verbose:
print('no errors to change')
continue
# assert np.all((self.model.predict(X) != y)[idxs_relevant]), 'relevant indexes should be errors'
y_node_target = np.argmin(y_node_absolute_errors, axis=1)
y_node_target = y_node_target[idxs_relevant]
# here, we optionally weight these errors by the size of the error
# if we want this to work for classification, must switch to predict_proba
# if self.prediction_task == 'regression':
# weight by the difference in error ###############################################################
if self.weight_errors:
sample_weight_node *= np.abs(y_node_absolute_errors[:, 1] - y_node_absolute_errors[:, 0])
sample_weight_node_target = sample_weight_node[idxs_relevant]
X_node = X_node[idxs_relevant]
# Fit a 1-variable binary classification model on these outputs ######################################
# Note: this could be customized (e.g. for sparse oblique trees)
best_score = -np.inf
best_feat_num = None
for feat_num in range(X.shape[1]):
if self.prediction_task == 'classification':
if self.node_model == 'linear':
m = LogisticRegression(**self.node_model_args)
elif self.node_model == 'stump':
m = DecisionTreeClassifier(max_depth=1, **self.node_model_args)
elif self.prediction_task == 'regression':
if self.node_model == 'linear':
m = LinearRegression(**self.node_model_args)
elif self.node_model == 'stump':
m = DecisionTreeRegressor(max_depth=1, **self.node_model_args)
X_node_single_feat = X_node[:, feat_num: feat_num + 1]
m.fit(X_node_single_feat, y_node_target, sample_weight=sample_weight_node_target)
score = m.score(X_node_single_feat, y_node_target, sample_weight=sample_weight_node_target)
if score > best_score:
best_score = score
best_feat_num = feat_num
best_model = deepcopy(m)
if self.node_model == 'linear':
best_threshold = -best_model.intercept_ / best_model.coef_[0]
elif self.node_model == 'stump':
best_threshold = best_model.tree_.threshold[0]
# print((feature[node_id], threshold[node_id]), '\n->',
# (best_feat_num, best_threshold))
# Update the node with the new feature / threshold ######################################
old_feat_num = feature[node_id]
old_threshold = threshold[node_id]
# print(X.sum(), y.sum())
if X_score is None:
X_score = X
if y_score is None:
y_score = y
scorer = get_scorer(self.update_scoring)
old_score = scorer(self.model, X_score, y_score)
feature[node_id] = best_feat_num
threshold[node_id] = best_threshold
new_score = scorer(self.model, X_score, y_score)
# debugging
if self.verbose > 1:
if old_score == new_score:
print('\tno change', best_feat_num, old_feat_num)
print(f'\tscore_total {old_score:0.4f} -> {new_score:0.4f}')
if old_score >= new_score:
feature[node_id] = old_feat_num
threshold[node_id] = old_threshold
else:
# (Track if any updates were necessary)
num_updates += 1
if self.verbose > 0:
print(f'Improved score from {old_score:0.4f} to {new_score:0.4f}')
# debugging snippet (if score_m_new > score_m_old, then new_score should be > old_score, but it isn't!!!!)
if self.verbose > 1:
"""
X_node_single_feat = X_node[:, best_feat_num: best_feat_num + 1]
score_m_new = best_model.score(X_node_single_feat, y_node_target, sample_weight=sample_weight)
best_model.tree_.feature[0] = old_feat_num
best_model.tree_.threshold[0] = old_threshold
X_node_single_feat = X_node[:, old_feat_num: old_feat_num + 1]
score_m_old = best_model.score(X_node_single_feat, y_node_target, sample_weight=sample_weight)
print('\t\t', f'score_local {score_m_old:0.4f} -> {score_m_new:0.4f}')
"""
return num_updates
def predict(self, X):
return self.model.predict(X)
def predict_proba(self, X):
return self.model.predict_proba(X)
def score(self, X, y):
return self.model.score(X, y)
class TaoTreeRegressor(TaoTree, RegressorMixin):
def _init_prediction_task(self):
self.prediction_task = 'regression'
class TaoTreeClassifier(TaoTree, ClassifierMixin):
def _init_prediction_task(self):
self.prediction_task = 'classification'
if __name__ == '__main__':
np.random.seed(13)
random.seed(13)
X, y = datasets.load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.33, random_state=42
)
print('X.shape', X.shape)
print('ys', np.unique(y_train), '\n\n')
m = TaoTreeClassifier(randomize_tree=False, weight_errors=False,
node_model='stump', model_args={'max_depth': 3},
verbose=1)
m.fit(X_train, y_train)
print('Train acc', np.mean(m.predict(X_train) == y_train))
print('Test acc', np.mean(m.predict(X_test) == y_test))
# print(m.predict(X_train), m.predict_proba(X_train).shape)
# print(m.predict_proba(X_train))
# X, y = datasets.load_diabetes(return_X_y=True) # regression
# X = np.random.randn(500, 10)
# y = (X[:, 0] > 0).astype(float) + (X[:, 1] > 1).astype(float)
# X_train, X_test, y_train, y_test = train_test_split(
# X, y, test_size=0.33, random_state=42
# )
# m = TaoRegressor()
# m.fit(X_train, y_train)
# print('mse', np.mean(np.square(m.predict(X_test) - y_test)),
# 'baseline', np.mean(np.square(y_test)))
Classes
class TaoTree (model_type: str = 'CART', n_iters: int = 20, model_args: dict = {'max_leaf_nodes': 15}, randomize_tree=False, update_scoring='accuracy', min_node_samples_tao=3, min_leaf_samples_tao=2, node_model='stump', node_model_args: dict = {}, reg_param: float = 0.001, weight_errors: bool = False, verbose: int = 0)
-
Base class for all estimators in scikit-learn.
Notes
All estimators should specify all the parameters that can be set at the class level in their
__init__
as explicit keyword arguments (no*args
or**kwargs
).TAO: Alternating optimization of decision trees, with application to learning sparse oblique trees (Neurips 2018) https://proceedings.neurips.cc/paper/2018/hash/185c29dc24325934ee377cfda20e414c-Abstract.html Note: this implementation learns single-feature splits rather than oblique trees.
Currently supports - given a CART tree, posthoc improve it with TAO - also works with HSTreeCV
Todo - update bottom to top otherwise input points don't get updated - update leaf nodes - support regression - support FIGS - support error-weighting - support oblique trees - support generic models at decision node - support pruning (e.g. if weights -> 0, then remove a node) - support classifiers in leaves
Parameters
model_type
:str
- 'CART' or 'FIGS'
n_iters
- Number of iterations to run TAO
model_args
- Arguments to pass to the model
randomize_tree
- Whether to randomize the tree before each iteration
min_node_samples_tao
:int
- Minimum number of samples in a node to apply tao
min_leaf_samples_tao
:int
node_model
:str
- 'stump' or 'linear'
reg_param
- Regularization parameter for node-wise linear model (if node_model is 'linear')
verbose
:int
- Verbosity level
Expand source code
class TaoTree(BaseEstimator): def __init__(self, model_type: str = 'CART', n_iters: int = 20, model_args: dict = {'max_leaf_nodes': 15}, randomize_tree=False, update_scoring='accuracy', min_node_samples_tao=3, min_leaf_samples_tao=2, node_model='stump', node_model_args: dict = {}, reg_param: float = 1e-3, weight_errors: bool = False, verbose: int = 0, ): """TAO: Alternating optimization of decision trees, with application to learning sparse oblique trees (Neurips 2018) https://proceedings.neurips.cc/paper/2018/hash/185c29dc24325934ee377cfda20e414c-Abstract.html Note: this implementation learns single-feature splits rather than oblique trees. Currently supports - given a CART tree, posthoc improve it with TAO - also works with HSTreeCV Todo - update bottom to top otherwise input points don't get updated - update leaf nodes - support regression - support FIGS - support error-weighting - support oblique trees - support generic models at decision node - support pruning (e.g. if weights -> 0, then remove a node) - support classifiers in leaves Parameters ---------- model_type: str 'CART' or 'FIGS' n_iters Number of iterations to run TAO model_args Arguments to pass to the model randomize_tree Whether to randomize the tree before each iteration min_node_samples_tao: int Minimum number of samples in a node to apply tao min_leaf_samples_tao: int node_model: str 'stump' or 'linear' reg_param Regularization parameter for node-wise linear model (if node_model is 'linear') verbose: int Verbosity level """ super().__init__() self.model_type = model_type self.n_iters = n_iters self.model_args = model_args self.randomize_tree = randomize_tree self.update_scoring = update_scoring self.min_node_samples_tao = min_node_samples_tao self.min_leaf_samples_tao = min_leaf_samples_tao self.node_model = node_model self.node_model_args = node_model_args self.reg_param = reg_param self.weight_errors = weight_errors self.verbose = verbose self._init_prediction_task() # decides between regressor and classifier def _init_prediction_task(self): """ TaoRegressor and TaoClassifier override this method to alter the prediction task. When using this class directly, it is equivalent to SuperCARTRegressor """ self.prediction_task = 'classification' def fit(self, X, y=None, feature_names=None, sample_weight=None): """ Params ------ _sample_weight: array-like of shape (n_samples,), default=None Sample weights. If None, then samples are equally weighted. Splits that would create child nodes with net zero or negative weight are ignored while searching for a split in each node. """ if self.prediction_task == 'classification': self.classes_, y = np.unique(y, return_inverse=True) # deals with str inputs else: raise Warning('TAO Regression is not yet tested') X, y = check_X_y(X, y) y = y.astype(float) if feature_names is not None: self.feature_names_ = feature_names if self.model_type == 'CART': if self.prediction_task == 'classification': self.model = DecisionTreeClassifier(**self.model_args) elif self.prediction_task == 'regression': self.model = DecisionTreeRegressor(**self.model_args) self.model.fit(X, y, sample_weight=sample_weight) if self.verbose: print(export_text(self.model)) # plot_tree(self.model) # plt.savefig('/Users/chandan/Desktop/tree.png', dpi=300) # plt.show() if self.randomize_tree: np.random.shuffle(self.model.tree_.feature) # shuffle CART features # np.random.shuffle(self.model.tree_.threshold) for i in range(self.model.tree_.node_count): # split on feature medians self.model.tree_.threshold[i] = np.median( X[:, self.model.tree_.feature[i]]) if self.verbose: print('starting score', self.model.score(X, y)) for i in range(self.n_iters): num_updates = self._tao_iter_cart(X, y, self.model.tree_, sample_weight=sample_weight) if num_updates == 0: break return self def _tao_iter_cart(self, X, y, tree, X_score=None, y_score=None, sample_weight=None): """Updates tree by applying the tao algorithm to the tree Params ------ X: array-like of shape (n_samples, n_features) The input samples. y: array-like of shape (n_samples,) The target values. model: DecisionTreeClassifier.tree_ or DecisionTreeRegressor.tree_ The model to be post-hoc improved """ # Tree properties children_left = tree.children_left children_right = tree.children_right feature = tree.feature threshold = tree.threshold value = tree.value # For each node, store the path to that node ####################################################### indexes_with_prefix_paths = [] # data structure with (index, path_to_node_index) # e.g. if if node 3 is the left child of node 1 which is the right child of node 0 # then we get (3, [(0, R), (1, L)]) # start with the root node id (0) and its depth (0) queue = deque() queue.append((0, [])) while len(queue) > 0: node_id, path_to_node_index = queue.popleft() indexes_with_prefix_paths.append((node_id, path_to_node_index)) # If a split node, append left and right children and depth to queue if children_left[node_id] != children_right[node_id]: queue.append((children_left[node_id], path_to_node_index + [(node_id, 'L')])) queue.append((children_right[node_id], path_to_node_index + [(node_id, 'R')])) # print(indexes_with_prefix_paths) num_updates = 0 # Reversing BFS queue presents nodes bottom -> top one level at a time for (node_id, path_to_node_index) in reversed(indexes_with_prefix_paths): # For each each node, try a TAO update # print('node_id', node_id, path_to_node_index) # Compute the points being input to the node ###################################### def filter_points_by_path(X, y, path_to_node_index): """Returns the points in X that are in the path to the node""" for node_id, direction in path_to_node_index: idxs = X[:, feature[node_id]] <= threshold[node_id] if direction == 'R': idxs = ~idxs # print('idxs', idxs.size, idxs.sum()) X = X[idxs] y = y[idxs] return X, y X_node, y_node = filter_points_by_path(X, y, path_to_node_index) if sample_weight is not None: sample_weight_node = filter_points_by_path(X, sample_weight, path_to_node_index)[1] else: sample_weight_node = np.ones(y_node.size) # Skip over leaf nodes and nodes with too few samples ###################################### if children_left[node_id] == children_right[node_id]: # is leaf node if self.prediction_task == 'regression' and X_node.shape[0] >= self.min_leaf_samples_tao: # old_score = self.model.score(X, y) value[node_id] = np.mean(y_node) """ new_score = self.model.score(X, y) if new_score > old_score: print(f'\tLeaf improved score from {old_score:0.3f} to {new_score:0.3f}') if new_score < old_score: print(f'\tLeaf reduced score from {old_score:0.3f} to {new_score:0.3f}') # raise ValueError('Leaf update reduced score') """ # print('\tshapes', X_node.shape, y_node.shape) # print('\tvals:', value[node_id][0][0], np.mean(y_node)) # assert value[node_id][0][0] == np.mean(y_node), 'unless tree changed, vals should be leaf means' continue elif X_node.shape[0] < self.min_node_samples_tao: continue # Compute the outputs for these points if they go left or right ###################################### def predict_from_node(X, node_id): """Returns predictions for X starting at node node_id""" def predict_from_node(x, node_id): """Returns predictions for x starting at node node_id""" if children_left[node_id] == children_right[node_id]: if self.prediction_task == 'regression': return value[node_id] elif self.prediction_task == 'classification': return np.argmax(value[node_id]) # note value stores counts for each class if x[feature[node_id]] <= threshold[node_id]: return predict_from_node(x, children_left[node_id]) else: return predict_from_node(x, children_right[node_id]) preds = np.zeros(X.shape[0]) for i in range(X.shape[0]): preds[i] = predict_from_node(X[i], node_id) return preds y_node_left = predict_from_node(X_node, children_left[node_id]) y_node_right = predict_from_node(X_node, children_right[node_id]) if node_id == 0: # root node assert np.all(np.logical_or(self.model.predict(X_node) == y_node_left, self.model.predict( X_node) == y_node_right)), \ 'actual predictions should match either predict_from_node left or right' # Decide on prediction target (want to go left (0) / right (1) when advantageous) # TAO paper binarizes these (e.g. predict 0 or 1 depending on which of these is better) y_node_absolute_errors = np.abs(np.vstack((y_node - y_node_left, y_node - y_node_right))).T # screen out indexes where going left/right has no effect idxs_relevant = y_node_absolute_errors[:, 0] != y_node_absolute_errors[:, 1] if idxs_relevant.sum() <= 1: # nothing to change if self.verbose: print('no errors to change') continue # assert np.all((self.model.predict(X) != y)[idxs_relevant]), 'relevant indexes should be errors' y_node_target = np.argmin(y_node_absolute_errors, axis=1) y_node_target = y_node_target[idxs_relevant] # here, we optionally weight these errors by the size of the error # if we want this to work for classification, must switch to predict_proba # if self.prediction_task == 'regression': # weight by the difference in error ############################################################### if self.weight_errors: sample_weight_node *= np.abs(y_node_absolute_errors[:, 1] - y_node_absolute_errors[:, 0]) sample_weight_node_target = sample_weight_node[idxs_relevant] X_node = X_node[idxs_relevant] # Fit a 1-variable binary classification model on these outputs ###################################### # Note: this could be customized (e.g. for sparse oblique trees) best_score = -np.inf best_feat_num = None for feat_num in range(X.shape[1]): if self.prediction_task == 'classification': if self.node_model == 'linear': m = LogisticRegression(**self.node_model_args) elif self.node_model == 'stump': m = DecisionTreeClassifier(max_depth=1, **self.node_model_args) elif self.prediction_task == 'regression': if self.node_model == 'linear': m = LinearRegression(**self.node_model_args) elif self.node_model == 'stump': m = DecisionTreeRegressor(max_depth=1, **self.node_model_args) X_node_single_feat = X_node[:, feat_num: feat_num + 1] m.fit(X_node_single_feat, y_node_target, sample_weight=sample_weight_node_target) score = m.score(X_node_single_feat, y_node_target, sample_weight=sample_weight_node_target) if score > best_score: best_score = score best_feat_num = feat_num best_model = deepcopy(m) if self.node_model == 'linear': best_threshold = -best_model.intercept_ / best_model.coef_[0] elif self.node_model == 'stump': best_threshold = best_model.tree_.threshold[0] # print((feature[node_id], threshold[node_id]), '\n->', # (best_feat_num, best_threshold)) # Update the node with the new feature / threshold ###################################### old_feat_num = feature[node_id] old_threshold = threshold[node_id] # print(X.sum(), y.sum()) if X_score is None: X_score = X if y_score is None: y_score = y scorer = get_scorer(self.update_scoring) old_score = scorer(self.model, X_score, y_score) feature[node_id] = best_feat_num threshold[node_id] = best_threshold new_score = scorer(self.model, X_score, y_score) # debugging if self.verbose > 1: if old_score == new_score: print('\tno change', best_feat_num, old_feat_num) print(f'\tscore_total {old_score:0.4f} -> {new_score:0.4f}') if old_score >= new_score: feature[node_id] = old_feat_num threshold[node_id] = old_threshold else: # (Track if any updates were necessary) num_updates += 1 if self.verbose > 0: print(f'Improved score from {old_score:0.4f} to {new_score:0.4f}') # debugging snippet (if score_m_new > score_m_old, then new_score should be > old_score, but it isn't!!!!) if self.verbose > 1: """ X_node_single_feat = X_node[:, best_feat_num: best_feat_num + 1] score_m_new = best_model.score(X_node_single_feat, y_node_target, sample_weight=sample_weight) best_model.tree_.feature[0] = old_feat_num best_model.tree_.threshold[0] = old_threshold X_node_single_feat = X_node[:, old_feat_num: old_feat_num + 1] score_m_old = best_model.score(X_node_single_feat, y_node_target, sample_weight=sample_weight) print('\t\t', f'score_local {score_m_old:0.4f} -> {score_m_new:0.4f}') """ return num_updates def predict(self, X): return self.model.predict(X) def predict_proba(self, X): return self.model.predict_proba(X) def score(self, X, y): return self.model.score(X, y)
Ancestors
- sklearn.base.BaseEstimator
Subclasses
Methods
def fit(self, X, y=None, feature_names=None, sample_weight=None)
-
Params
_sample_weight: array-like of shape (n_samples,), default=None Sample weights. If None, then samples are equally weighted. Splits that would create child nodes with net zero or negative weight are ignored while searching for a split in each node.
Expand source code
def fit(self, X, y=None, feature_names=None, sample_weight=None): """ Params ------ _sample_weight: array-like of shape (n_samples,), default=None Sample weights. If None, then samples are equally weighted. Splits that would create child nodes with net zero or negative weight are ignored while searching for a split in each node. """ if self.prediction_task == 'classification': self.classes_, y = np.unique(y, return_inverse=True) # deals with str inputs else: raise Warning('TAO Regression is not yet tested') X, y = check_X_y(X, y) y = y.astype(float) if feature_names is not None: self.feature_names_ = feature_names if self.model_type == 'CART': if self.prediction_task == 'classification': self.model = DecisionTreeClassifier(**self.model_args) elif self.prediction_task == 'regression': self.model = DecisionTreeRegressor(**self.model_args) self.model.fit(X, y, sample_weight=sample_weight) if self.verbose: print(export_text(self.model)) # plot_tree(self.model) # plt.savefig('/Users/chandan/Desktop/tree.png', dpi=300) # plt.show() if self.randomize_tree: np.random.shuffle(self.model.tree_.feature) # shuffle CART features # np.random.shuffle(self.model.tree_.threshold) for i in range(self.model.tree_.node_count): # split on feature medians self.model.tree_.threshold[i] = np.median( X[:, self.model.tree_.feature[i]]) if self.verbose: print('starting score', self.model.score(X, y)) for i in range(self.n_iters): num_updates = self._tao_iter_cart(X, y, self.model.tree_, sample_weight=sample_weight) if num_updates == 0: break return self
def predict(self, X)
-
Expand source code
def predict(self, X): return self.model.predict(X)
def predict_proba(self, X)
-
Expand source code
def predict_proba(self, X): return self.model.predict_proba(X)
def score(self, X, y)
-
Expand source code
def score(self, X, y): return self.model.score(X, y)
class TaoTreeClassifier (model_type: str = 'CART', n_iters: int = 20, model_args: dict = {'max_leaf_nodes': 15}, randomize_tree=False, update_scoring='accuracy', min_node_samples_tao=3, min_leaf_samples_tao=2, node_model='stump', node_model_args: dict = {}, reg_param: float = 0.001, weight_errors: bool = False, verbose: int = 0)
-
Base class for all estimators in scikit-learn.
Notes
All estimators should specify all the parameters that can be set at the class level in their
__init__
as explicit keyword arguments (no*args
or**kwargs
).TAO: Alternating optimization of decision trees, with application to learning sparse oblique trees (Neurips 2018) https://proceedings.neurips.cc/paper/2018/hash/185c29dc24325934ee377cfda20e414c-Abstract.html Note: this implementation learns single-feature splits rather than oblique trees.
Currently supports - given a CART tree, posthoc improve it with TAO - also works with HSTreeCV
Todo - update bottom to top otherwise input points don't get updated - update leaf nodes - support regression - support FIGS - support error-weighting - support oblique trees - support generic models at decision node - support pruning (e.g. if weights -> 0, then remove a node) - support classifiers in leaves
Parameters
model_type
:str
- 'CART' or 'FIGS'
n_iters
- Number of iterations to run TAO
model_args
- Arguments to pass to the model
randomize_tree
- Whether to randomize the tree before each iteration
min_node_samples_tao
:int
- Minimum number of samples in a node to apply tao
min_leaf_samples_tao
:int
node_model
:str
- 'stump' or 'linear'
reg_param
- Regularization parameter for node-wise linear model (if node_model is 'linear')
verbose
:int
- Verbosity level
Expand source code
class TaoTreeClassifier(TaoTree, ClassifierMixin): def _init_prediction_task(self): self.prediction_task = 'classification'
Ancestors
- TaoTree
- sklearn.base.BaseEstimator
- sklearn.base.ClassifierMixin
Inherited members
class TaoTreeRegressor (model_type: str = 'CART', n_iters: int = 20, model_args: dict = {'max_leaf_nodes': 15}, randomize_tree=False, update_scoring='accuracy', min_node_samples_tao=3, min_leaf_samples_tao=2, node_model='stump', node_model_args: dict = {}, reg_param: float = 0.001, weight_errors: bool = False, verbose: int = 0)
-
Base class for all estimators in scikit-learn.
Notes
All estimators should specify all the parameters that can be set at the class level in their
__init__
as explicit keyword arguments (no*args
or**kwargs
).TAO: Alternating optimization of decision trees, with application to learning sparse oblique trees (Neurips 2018) https://proceedings.neurips.cc/paper/2018/hash/185c29dc24325934ee377cfda20e414c-Abstract.html Note: this implementation learns single-feature splits rather than oblique trees.
Currently supports - given a CART tree, posthoc improve it with TAO - also works with HSTreeCV
Todo - update bottom to top otherwise input points don't get updated - update leaf nodes - support regression - support FIGS - support error-weighting - support oblique trees - support generic models at decision node - support pruning (e.g. if weights -> 0, then remove a node) - support classifiers in leaves
Parameters
model_type
:str
- 'CART' or 'FIGS'
n_iters
- Number of iterations to run TAO
model_args
- Arguments to pass to the model
randomize_tree
- Whether to randomize the tree before each iteration
min_node_samples_tao
:int
- Minimum number of samples in a node to apply tao
min_leaf_samples_tao
:int
node_model
:str
- 'stump' or 'linear'
reg_param
- Regularization parameter for node-wise linear model (if node_model is 'linear')
verbose
:int
- Verbosity level
Expand source code
class TaoTreeRegressor(TaoTree, RegressorMixin): def _init_prediction_task(self): self.prediction_task = 'regression'
Ancestors
- TaoTree
- sklearn.base.BaseEstimator
- sklearn.base.RegressorMixin
Inherited members