Bridging random forests and deep neural networks. Code to convert a sklearn decision tree to a pytorch neural network following "Neural Random Forests" https://arxiv.org/abs/1604.07143
Example
from sklearn.tree import DecisionTreeClassifier
import numpy as np
np.random.seed(13)
num_features = 4
N = 1000
max_depth = 100
# prepare data
X = np.random.rand(N, num_features)
y = np.random.rand(N)
X_t = torch.Tensor(X)
# train rf
dt = DecisionTreeRegressor(max_depth=max_depth)
dt.fit(X, y)
# prepare net
net = Net(dt)
# check if preds are close
preds_dt = dt.predict(X).flatten()
preds_net = net(X_t).detach().numpy().flatten()
assert np.isclose(preds_dt, preds_net).all(), 'preds are not close'
Expand source code
"""Bridging random forests and deep neural networks. Code to convert a sklearn decision tree to a pytorch neural network
following "Neural Random Forests" https://arxiv.org/abs/1604.07143
Example
-------
from sklearn.tree import DecisionTreeClassifier
import numpy as np
np.random.seed(13)
num_features = 4
N = 1000
max_depth = 100
# prepare data
X = np.random.rand(N, num_features)
y = np.random.rand(N)
X_t = torch.Tensor(X)
# train rf
dt = DecisionTreeRegressor(max_depth=max_depth)
dt.fit(X, y)
# prepare net
net = Net(dt)
# check if preds are close
preds_dt = dt.predict(X).flatten()
preds_net = net(X_t).detach().numpy().flatten()
assert np.isclose(preds_dt, preds_net).all(), 'preds are not close'
"""
import time
from copy import deepcopy
import numpy as np
from torch import nn
class Net(nn.Module):
'''
class which converts estimator (decision tree type) to a dnn
'''
def __init__(self, estimator):
super(Net, self).__init__()
n_nodes = estimator.tree_.node_count
children_left = estimator.tree_.children_left # left_child, id of the left child of the node
children_right = estimator.tree_.children_right # right_child, id of the right child of the node
feature = estimator.tree_.feature # feature, feature used for splitting the node
threshold = estimator.tree_.threshold # threshold, threshold value at the node
num_leaves = estimator.tree_.n_leaves
num_non_leaves = estimator.tree_.node_count - num_leaves
node_depth, is_leaves = self.calc_depths_and_leaves(n_nodes, children_left, children_right)
self.values = estimator.tree_.value
self.all_leaf_paths = {}
self.calc_all_leaf_paths(0, n_nodes, children_left, children_right, running_list=[]) # set all_leaf_paths
# initialize layers to zero
self.layers = nn.Sequential(
nn.Linear(estimator.n_features_, num_non_leaves),
nn.Linear(num_non_leaves, num_leaves),
nn.Linear(num_leaves, 1, bias=False)
)
for i in range(2):
self.layers[i].weight.data *= 0
self.layers[i].bias.data *= 0
# set the first layer
nonleaf_node_to_nonleaf_neuron_num = {} # np.zeros(num_non_leaves)
nonleaf_neuron_num = 0
for i in range(n_nodes):
if not is_leaves[i]:
self.layers[0].weight.data[nonleaf_neuron_num, feature[i]] = 1
self.layers[0].bias.data[nonleaf_neuron_num] = -threshold[i]
nonleaf_node_to_nonleaf_neuron_num[i] = nonleaf_neuron_num
nonleaf_neuron_num += 1
# set the 2nd + 3rd layer
for leaf_neuron_num, leaf_idx in enumerate(sorted(self.all_leaf_paths.keys())):
path = self.all_leaf_paths[leaf_idx]
# 2nd lay
for (nonleaf_node, sign) in path:
self.layers[1].weight.data[leaf_neuron_num,
nonleaf_node_to_nonleaf_neuron_num[
nonleaf_node]] = sign # num_leaves x num_non_leaves
self.layers[1].bias.data[leaf_neuron_num] = -1 * float(node_depth[leaf_idx])
# 3rd lay
self.layers[2].weight.data[0, leaf_neuron_num] = self.values[leaf_idx][
0, 0] # note, this will be multivariate for classification!
# placeholder so class compiles
def forward(self, x):
# t0 = time.perf_counter()
x = x.reshape(x.shape[0], -1)
x = self.layers[0](x)
t1 = time.perf_counter()
x[x < 0] = -1
x[x >= 0] = 1
# t2 = time.perf_counter()
x = self.layers[1](x)
x = (x == 0).float()
x = self.layers[2](x)
# t3 = time.perf_counter()
# print(f't1: {t1-t0:0.2e}, t2: {t2-t1:0.2e} t3: {t3-t2:0.2e}')
return x
def calc_depths_and_leaves(self, n_nodes, children_left, children_right):
'''
calculate numpy arrays representing the depth of each node and whether they are leaves or not
'''
# The tree structure can be traversed to compute various properties such
# as the depth of each node and whether or not it is a leaf.
node_depth = np.zeros(shape=n_nodes, dtype=np.int64)
is_leaves = np.zeros(shape=n_nodes, dtype=bool)
# calculate node_depth and is_leaves
stack = [(0, -1)] # seed is the root node id and its parent depth
while len(stack) > 0:
node_id, parent_depth = stack.pop()
node_depth[node_id] = parent_depth + 1
# If we have a test node
if (children_left[node_id] != children_right[node_id]):
stack.append((children_left[node_id], parent_depth + 1))
stack.append((children_right[node_id], parent_depth + 1))
else:
is_leaves[node_id] = True
return node_depth, is_leaves
def calc_all_leaf_paths(self, node_idx, n_nodes, children_left, children_right, running_list):
'''
recursively store all leaf paths into a dictionary as tuples of (node_idxs, weight)
weight is -1/+1 depending on if it left/right
running_list is a reference to one list which is shared by all calls!
'''
# check if we are at a leaf
if children_left[node_idx] == children_right[node_idx]:
self.all_leaf_paths[node_idx] = deepcopy(running_list)
else:
running_list.append((node_idx, -1)) # assign weight of -1 to left
self.calc_all_leaf_paths(children_left[node_idx], n_nodes, children_left, children_right, running_list)
running_list.pop()
running_list.append((node_idx, +1)) # assign weight of +1 to right
self.calc_all_leaf_paths(children_right[node_idx], n_nodes, children_left, children_right, running_list)
running_list.pop()
def extract_util_np(self):
b0 = self.layers[0].bias.data.numpy()
idxs0 = self.layers[0].weight.data.argmax(dim=1).numpy()
w1 = self.layers[1].weight.data.numpy().T
b1 = self.layers[1].bias.data.numpy()
num_leaves = self.layers[2].weight.shape[1]
idxs2 = np.zeros(num_leaves) # leaf_neuron_num_to_val
# iterate over leaves and map to values
for leaf_neuron_num, i in enumerate(sorted(self.all_leaf_paths.keys())):
idxs2[leaf_neuron_num] = self.values[i, 0, 0]
return b0, idxs0, w1, b1, idxs2
Classes
class Net (estimator)
-
class which converts estimator (decision tree type) to a dnn
Initialize internal Module state, shared by both nn.Module and ScriptModule.
Expand source code
class Net(nn.Module): ''' class which converts estimator (decision tree type) to a dnn ''' def __init__(self, estimator): super(Net, self).__init__() n_nodes = estimator.tree_.node_count children_left = estimator.tree_.children_left # left_child, id of the left child of the node children_right = estimator.tree_.children_right # right_child, id of the right child of the node feature = estimator.tree_.feature # feature, feature used for splitting the node threshold = estimator.tree_.threshold # threshold, threshold value at the node num_leaves = estimator.tree_.n_leaves num_non_leaves = estimator.tree_.node_count - num_leaves node_depth, is_leaves = self.calc_depths_and_leaves(n_nodes, children_left, children_right) self.values = estimator.tree_.value self.all_leaf_paths = {} self.calc_all_leaf_paths(0, n_nodes, children_left, children_right, running_list=[]) # set all_leaf_paths # initialize layers to zero self.layers = nn.Sequential( nn.Linear(estimator.n_features_, num_non_leaves), nn.Linear(num_non_leaves, num_leaves), nn.Linear(num_leaves, 1, bias=False) ) for i in range(2): self.layers[i].weight.data *= 0 self.layers[i].bias.data *= 0 # set the first layer nonleaf_node_to_nonleaf_neuron_num = {} # np.zeros(num_non_leaves) nonleaf_neuron_num = 0 for i in range(n_nodes): if not is_leaves[i]: self.layers[0].weight.data[nonleaf_neuron_num, feature[i]] = 1 self.layers[0].bias.data[nonleaf_neuron_num] = -threshold[i] nonleaf_node_to_nonleaf_neuron_num[i] = nonleaf_neuron_num nonleaf_neuron_num += 1 # set the 2nd + 3rd layer for leaf_neuron_num, leaf_idx in enumerate(sorted(self.all_leaf_paths.keys())): path = self.all_leaf_paths[leaf_idx] # 2nd lay for (nonleaf_node, sign) in path: self.layers[1].weight.data[leaf_neuron_num, nonleaf_node_to_nonleaf_neuron_num[ nonleaf_node]] = sign # num_leaves x num_non_leaves self.layers[1].bias.data[leaf_neuron_num] = -1 * float(node_depth[leaf_idx]) # 3rd lay self.layers[2].weight.data[0, leaf_neuron_num] = self.values[leaf_idx][ 0, 0] # note, this will be multivariate for classification! # placeholder so class compiles def forward(self, x): # t0 = time.perf_counter() x = x.reshape(x.shape[0], -1) x = self.layers[0](x) t1 = time.perf_counter() x[x < 0] = -1 x[x >= 0] = 1 # t2 = time.perf_counter() x = self.layers[1](x) x = (x == 0).float() x = self.layers[2](x) # t3 = time.perf_counter() # print(f't1: {t1-t0:0.2e}, t2: {t2-t1:0.2e} t3: {t3-t2:0.2e}') return x def calc_depths_and_leaves(self, n_nodes, children_left, children_right): ''' calculate numpy arrays representing the depth of each node and whether they are leaves or not ''' # The tree structure can be traversed to compute various properties such # as the depth of each node and whether or not it is a leaf. node_depth = np.zeros(shape=n_nodes, dtype=np.int64) is_leaves = np.zeros(shape=n_nodes, dtype=bool) # calculate node_depth and is_leaves stack = [(0, -1)] # seed is the root node id and its parent depth while len(stack) > 0: node_id, parent_depth = stack.pop() node_depth[node_id] = parent_depth + 1 # If we have a test node if (children_left[node_id] != children_right[node_id]): stack.append((children_left[node_id], parent_depth + 1)) stack.append((children_right[node_id], parent_depth + 1)) else: is_leaves[node_id] = True return node_depth, is_leaves def calc_all_leaf_paths(self, node_idx, n_nodes, children_left, children_right, running_list): ''' recursively store all leaf paths into a dictionary as tuples of (node_idxs, weight) weight is -1/+1 depending on if it left/right running_list is a reference to one list which is shared by all calls! ''' # check if we are at a leaf if children_left[node_idx] == children_right[node_idx]: self.all_leaf_paths[node_idx] = deepcopy(running_list) else: running_list.append((node_idx, -1)) # assign weight of -1 to left self.calc_all_leaf_paths(children_left[node_idx], n_nodes, children_left, children_right, running_list) running_list.pop() running_list.append((node_idx, +1)) # assign weight of +1 to right self.calc_all_leaf_paths(children_right[node_idx], n_nodes, children_left, children_right, running_list) running_list.pop() def extract_util_np(self): b0 = self.layers[0].bias.data.numpy() idxs0 = self.layers[0].weight.data.argmax(dim=1).numpy() w1 = self.layers[1].weight.data.numpy().T b1 = self.layers[1].bias.data.numpy() num_leaves = self.layers[2].weight.shape[1] idxs2 = np.zeros(num_leaves) # leaf_neuron_num_to_val # iterate over leaves and map to values for leaf_neuron_num, i in enumerate(sorted(self.all_leaf_paths.keys())): idxs2[leaf_neuron_num] = self.values[i, 0, 0] return b0, idxs0, w1, b1, idxs2
Ancestors
- torch.nn.modules.module.Module
Methods
def calc_all_leaf_paths(self, node_idx, n_nodes, children_left, children_right, running_list)
-
recursively store all leaf paths into a dictionary as tuples of (node_idxs, weight) weight is -1/+1 depending on if it left/right running_list is a reference to one list which is shared by all calls!
Expand source code
def calc_all_leaf_paths(self, node_idx, n_nodes, children_left, children_right, running_list): ''' recursively store all leaf paths into a dictionary as tuples of (node_idxs, weight) weight is -1/+1 depending on if it left/right running_list is a reference to one list which is shared by all calls! ''' # check if we are at a leaf if children_left[node_idx] == children_right[node_idx]: self.all_leaf_paths[node_idx] = deepcopy(running_list) else: running_list.append((node_idx, -1)) # assign weight of -1 to left self.calc_all_leaf_paths(children_left[node_idx], n_nodes, children_left, children_right, running_list) running_list.pop() running_list.append((node_idx, +1)) # assign weight of +1 to right self.calc_all_leaf_paths(children_right[node_idx], n_nodes, children_left, children_right, running_list) running_list.pop()
def calc_depths_and_leaves(self, n_nodes, children_left, children_right)
-
calculate numpy arrays representing the depth of each node and whether they are leaves or not
Expand source code
def calc_depths_and_leaves(self, n_nodes, children_left, children_right): ''' calculate numpy arrays representing the depth of each node and whether they are leaves or not ''' # The tree structure can be traversed to compute various properties such # as the depth of each node and whether or not it is a leaf. node_depth = np.zeros(shape=n_nodes, dtype=np.int64) is_leaves = np.zeros(shape=n_nodes, dtype=bool) # calculate node_depth and is_leaves stack = [(0, -1)] # seed is the root node id and its parent depth while len(stack) > 0: node_id, parent_depth = stack.pop() node_depth[node_id] = parent_depth + 1 # If we have a test node if (children_left[node_id] != children_right[node_id]): stack.append((children_left[node_id], parent_depth + 1)) stack.append((children_right[node_id], parent_depth + 1)) else: is_leaves[node_id] = True return node_depth, is_leaves
def extract_util_np(self)
-
Expand source code
def extract_util_np(self): b0 = self.layers[0].bias.data.numpy() idxs0 = self.layers[0].weight.data.argmax(dim=1).numpy() w1 = self.layers[1].weight.data.numpy().T b1 = self.layers[1].bias.data.numpy() num_leaves = self.layers[2].weight.shape[1] idxs2 = np.zeros(num_leaves) # leaf_neuron_num_to_val # iterate over leaves and map to values for leaf_neuron_num, i in enumerate(sorted(self.all_leaf_paths.keys())): idxs2[leaf_neuron_num] = self.values[i, 0, 0] return b0, idxs0, w1, b1, idxs2
def forward(self, x) ‑> Callable[..., Any]
-
Define the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the :class:
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.Expand source code
def forward(self, x): # t0 = time.perf_counter() x = x.reshape(x.shape[0], -1) x = self.layers[0](x) t1 = time.perf_counter() x[x < 0] = -1 x[x >= 0] = 1 # t2 = time.perf_counter() x = self.layers[1](x) x = (x == 0).float() x = self.layers[2](x) # t3 = time.perf_counter() # print(f't1: {t1-t0:0.2e}, t2: {t2-t1:0.2e} t3: {t3-t2:0.2e}') return x