Original implementation at https://github.com/wangtongada/BOA
Expand source code
'''Original implementation at https://github.com/wangtongada/BOA
'''
import itertools
import operator
import os
import warnings
from os.path import join as oj
from bisect import bisect_left
from collections import defaultdict
from copy import deepcopy
from itertools import combinations
from random import sample
import numpy as np
import pandas as pd
from mlxtend.frequent_patterns import fpgrowth
from numpy.random import random
from pandas import read_csv
from scipy.sparse import csc_matrix
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.ensemble import RandomForestClassifier
from sklearn.utils.multiclass import check_classification_targets
from sklearn.utils.validation import check_X_y, check_is_fitted
from imodels.rule_set.rule_set import RuleSet
from imodels.util.arguments import check_fit_arguments
class BayesianRuleSetClassifier(RuleSet, BaseEstimator, ClassifierMixin):
'''Bayesian or-of-and algorithm.
Generates patterns that satisfy the minimum support and maximum length and then select the Nrules rules that have the highest entropy.
In function SA_patternbased, each local maximum is stored in maps and the best BOA is returned.
Remember here the BOA contains only the index of selected rules from Nrules self.rules_
'''
def __init__(self, n_rules: int = 2000,
supp=5, maxlen: int = 10,
num_iterations=5000, num_chains=3, q=0.1,
alpha_pos=100, beta_pos=1,
alpha_neg=100, beta_neg=1,
alpha_l=None, beta_l=None,
discretization_method='randomforest', random_state=0):
'''
Params
------
n_rules
number of rules to be used in SA_patternbased and also the output of generate_rules
supp
The higher this supp, the 'larger' a pattern is. 5% is a generally good number
maxlen
maximum length of a pattern
num_iterations
number of iterations in each chain
num_chains
number of chains in the simulated annealing search algorithm
q
alpha_pos
$\rho = alpha/(alpha+beta)$. Make sure $\rho$ is close to one when choosing alpha and beta
The alpha and beta parameters alter the prior distributions for different rules
beta_pos
alpha_neg
beta_neg
alpha_l
beta_l
discretization_method
discretization method
'''
self.n_rules = n_rules
self.supp = supp
self.maxlen = maxlen
self.num_iterations = num_iterations
self.num_chains = num_chains
self.q = q
self.alpha_pos = alpha_pos
self.beta_pos = beta_pos
self.alpha_neg = alpha_neg
self.beta_neg = beta_neg
self.discretization_method = discretization_method
self.alpha_l = alpha_l
self.beta_l = beta_l
self.random_state = 0
def fit(self, X, y, feature_names: list = None, init=[], verbose=False):
'''
Parameters
----------
X : array-like, shape = [n_samples, n_features]
Training data
y : array_like, shape = [n_samples]
Labels
feature_names : array_like, shape = [n_features], optional (default: [])
String labels for each feature.
If empty and X is a DataFrame, column labels are used.
If empty and X is not a DataFrame, then features are simply enumerated
'''
# check inputs
self.attr_level_num = defaultdict(int) # any missing value defaults to 0
self.attr_names = []
X, y, feature_names = check_fit_arguments(self, X, y, feature_names)
np.random.seed(self.random_state)
# convert to pandas DataFrame
X = pd.DataFrame(X, columns=feature_names)
for i, name in enumerate(X.columns):
self.attr_level_num[name] += 1
self.attr_names.append(name)
self.attr_names_orig = deepcopy(self.attr_names)
self.attr_names = list(set(self.attr_names))
# set up patterns
self._set_pattern_space()
# parameter checking
if self.alpha_l is None or self.beta_l is None or len(self.alpha_l) != self.maxlen or len(
self.beta_l) != self.maxlen:
if verbose:
print('No or wrong input for alpha_l and beta_l - the model will use default parameters.')
self.C = [1.0 / self.maxlen] * self.maxlen
self.C.insert(0, -1)
self.alpha_l = [10] * (self.maxlen + 1)
self.beta_l = [10 * self.pattern_space[i] / self.C[i] for i in range(self.maxlen + 1)]
else:
self.alpha_l = [1] + list(self.alpha_l)
self.beta_l = [1] + list(self.beta_l)
# setup
self._generate_rules(X, y, verbose)
n_rules_current = len(self.rules_)
self.rules_len_list = [len(rule) for rule in self.rules_]
maps = defaultdict(list)
T0 = 1000 # initial temperature for simulated annealing
split = 0.7 * self.num_iterations
# run simulated annealing
for chain in range(self.num_chains):
# initialize with a random pattern set
if init != []:
rules_curr = init.copy()
else:
assert n_rules_current > 1, f'Only {n_rules_current} potential rules found, change hyperparams to allow for more'
N = sample(range(1, min(8, n_rules_current), 1), 1)[0]
rules_curr = sample(range(n_rules_current), N)
rules_curr_norm = self._normalize(rules_curr)
pt_curr = -100000000000
maps[chain].append(
[-1, [pt_curr / 3, pt_curr / 3, pt_curr / 3], rules_curr, [self.rules_[i] for i in rules_curr]])
for iter in range(self.num_iterations):
if iter >= split:
p = np.array(range(1 + len(maps[chain])))
p = np.array(list(_accumulate(p)))
p = p / p[-1]
index = _find_lt(p, random())
rules_curr = maps[chain][index][2].copy()
rules_curr_norm = maps[chain][index][2].copy()
# propose new rules
rules_new, rules_norm = self._propose(rules_curr.copy(), rules_curr_norm.copy(), self.q, y)
# compute probability of new rules
cfmatrix, prob = self._compute_prob(rules_new, y)
T = T0 ** (1 - iter / self.num_iterations) # temperature for simulated annealing
pt_new = sum(prob)
with warnings.catch_warnings():
if not verbose:
warnings.simplefilter("ignore")
alpha = np.exp(float(pt_new - pt_curr) / T)
if pt_new > sum(maps[chain][-1][1]):
maps[chain].append([iter, prob, rules_new, [self.rules_[i] for i in rules_new]])
if verbose:
print((
'\n** chain = {}, max at iter = {} ** \n accuracy = {}, TP = {},FP = {}, TN = {}, FN = {}'
'\n pt_new is {}, prior_ChsRules={}, likelihood_1 = {}, likelihood_2 = {}\n').format(
chain, iter, (cfmatrix[0] + cfmatrix[2] + 0.0) / len(y), cfmatrix[0], cfmatrix[1],
cfmatrix[2], cfmatrix[3], sum(prob), prob[0], prob[1], prob[2])
)
self._print_rules(rules_new)
print(rules_new)
if random() <= alpha:
rules_curr_norm, rules_curr, pt_curr = rules_norm.copy(), rules_new.copy(), pt_new
pt_max = [sum(maps[chain][-1][1]) for chain in range(self.num_chains)]
index = pt_max.index(max(pt_max))
self.rules_ = maps[index][-1][3]
return self
def __str__(self):
return ' '.join(str(r) for r in self.rules_)
def predict(self, X):
check_is_fitted(self)
if isinstance(X, np.ndarray):
df = pd.DataFrame(X, columns=self.attr_names_orig)
else:
df = X
Z = [[]] * len(self.rules_)
dfn = 1 - df # df has negative associations
dfn.columns = [name.strip() + '_neg' for name in df.columns]
df = pd.concat([df, dfn], axis=1)
for i, rule in enumerate(self.rules_):
Z[i] = (np.sum(df[list(rule)], axis=1) == len(rule)).astype(int)
Yhat = (np.sum(Z, axis=0) > 0).astype(int)
return Yhat
def predict_proba(self, X):
raise Exception('BOA does not support predicted probabilities.')
def _set_pattern_space(self):
"""Compute the rule space from the levels in each attribute
"""
# add feat_neg to each existing feature feat
for item in self.attr_names:
self.attr_level_num[item + '_neg'] = self.attr_level_num[item]
tmp = [item + '_neg' for item in self.attr_names]
self.attr_names.extend(tmp)
# set up pattern_space
self.pattern_space = np.zeros(self.maxlen + 1)
for k in range(1, self.maxlen + 1, 1):
for subset in combinations(self.attr_names, k):
tmp = 1
for i in subset:
tmp = tmp * self.attr_level_num[i]
# print('subset', subset, 'tmp', tmp, 'k', k)
self.pattern_space[k] = self.pattern_space[k] + tmp
def _generate_rules(self, X, y, verbose):
'''This function generates rules that satisfy supp and maxlen using fpgrowth, then it selects the top n_rules rules that make data have the biggest decrease in entropy.
There are two ways to generate rules. fpgrowth can handle cases where the maxlen is small. If maxlen<=3, fpgrowth can generates rules much faster than randomforest.
If maxlen is big, fpgrowth tends to generate too many rules that overflow the memory.
'''
df = 1 - X # df has negative associations
df.columns = [name.strip() + '_neg' for name in X.columns]
df = pd.concat([X, df], axis=1)
if self.discretization_method == 'fpgrowth' and self.maxlen <= 3:
itemMatrix = [[item for item in df.columns if row[item] == 1] for i, row in df.iterrows()]
pindex = np.where(y == 1)[0]
rules = fpgrowth([itemMatrix[i] for i in pindex], supp=self.supp, zmin=1, zmax=self.maxlen)
rules = [tuple(np.sort(rule[0])) for rule in rules]
rules = list(set(rules))
else:
'''todo: replace this with imodels.RFDiscretizer
'''
rules = []
for length in range(1, self.maxlen + 1, 1):
n_estimators = min(pow(df.shape[1], length), 4000)
clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=length)
clf.fit(X, y)
for n in range(n_estimators):
rules.extend(_extract_rules(clf.estimators_[n], df.columns))
rules = [list(x) for x in set(tuple(x) for x in rules)]
self.rules_ = rules
# select the top n_rules rules using secondary criteria, information gain
self._screen_rules(df, y, verbose) # updates self.rules_
self._set_pattern_space()
def _screen_rules(self, df, y, verbose):
'''Screening rules using information gain
'''
item_ind_dict = {}
for i, name in enumerate(df.columns):
item_ind_dict[name] = i
indices = np.array(
list(itertools.chain.from_iterable([[
item_ind_dict[x] for x in rule]
for rule in self.rules_])))
len_rules = [len(rule) for rule in self.rules_]
indptr = list(_accumulate(len_rules))
indptr.insert(0, 0)
indptr = np.array(indptr)
data = np.ones(len(indices))
rule_matrix = csc_matrix((data, indices, indptr),
shape=(len(df.columns),
len(self.rules_)))
mat = df.values @ rule_matrix
print('mat.shape', mat.shape)
len_matrix = np.array([len_rules] * df.shape[0])
Z = (mat == len_matrix).astype(int)
Zpos = [Z[i] for i in np.where(y > 0)][0]
TP = np.sum(Zpos, axis=0)
supp_select = np.where(TP >= self.supp * sum(y) / 100)[0]
FP = np.sum(Z, axis=0) - TP
TN = len(y) - np.sum(y) - FP
FN = np.sum(y) - TP
p1 = TP.astype(float) / (TP + FP)
p2 = FN.astype(float) / (FN + TN)
pp = (TP + FP).astype(float) / (TP + FP + TN + FN)
# p1 = np.clip(p1, a_min=1e-10, a_max=1-1e-10)
print('\n\n\n\np1.shape', p1.shape, 'pp.shape', pp.shape, 'cond_entropy.shape') # , cond_entropy.shape)
with warnings.catch_warnings():
if not verbose:
warnings.simplefilter("ignore") # ignore warnings about invalid values (e.g. log(0))
cond_entropy = -pp * (p1 * np.log(p1) + (1 - p1) * np.log(1 - p1)) - (1 - pp) * (
p2 * np.log(p2) + (1 - p2) * np.log(1 - p2))
cond_entropy[p1 * (1 - p1) == 0] = -((1 - pp) * (p2 * np.log(p2) + (1 - p2) * np.log(1 - p2)))[
p1 * (1 - p1) == 0]
cond_entropy[p2 * (1 - p2) == 0] = -(pp * (p1 * np.log(p1) + (1 - p1) * np.log(1 - p1)))[p2 * (1 - p2) == 0]
cond_entropy[p1 * (1 - p1) * p2 * (1 - p2) == 0] = 0
select = np.argsort(cond_entropy[supp_select])[::-1][-self.n_rules:]
self.rules_ = [self.rules_[i] for i in supp_select[select]]
self.RMatrix = np.array(Z[:, supp_select[select]])
def _propose(self, rules_curr, rules_norm, q, y):
nRules = len(self.rules_)
yhat = (np.sum(self.RMatrix[:, rules_curr], axis=1) > 0).astype(int)
incorr = np.where(y != yhat)[0]
N = len(rules_curr)
if len(incorr) == 0:
# BOA correctly classified all points but there could be redundant patterns, so cleaning is needed
move = ['clean']
else:
ex = sample(incorr.tolist(), 1)[0]
t = random()
if y[ex] == 1 or N == 1:
if t < 1.0 / 2 or N == 1:
move = ['add'] # action: add
else:
move = ['cut', 'add'] # action: replace
else:
if t < 1.0 / 2:
move = ['cut'] # action: cut
else:
move = ['cut', 'add'] # action: replace
if move[0] == 'cut':
""" cut """
if random() < q:
candidate = list(set(np.where(self.RMatrix[ex, :] == 1)[0]).intersection(rules_curr))
if len(candidate) == 0:
candidate = rules_curr
cut_rule = sample(candidate, 1)[0]
else:
p = []
all_sum = np.sum(self.RMatrix[:, rules_curr], axis=1)
for index, rule in enumerate(rules_curr):
yhat = ((all_sum - np.array(self.RMatrix[:, rule])) > 0).astype(int)
TP, FP, TN, FN = _get_confusion_matrix(yhat, y)
p.append(TP.astype(float) / (TP + FP + 1))
p = [x - min(p) for x in p]
p = np.exp(p)
p = np.insert(p, 0, 0)
p = np.array(list(_accumulate(p)))
if p[-1] == 0:
index = sample(range(len(rules_curr)), 1)[0]
else:
p = p / p[-1]
index = _find_lt(p, random())
cut_rule = rules_curr[index]
rules_curr.remove(cut_rule)
rules_norm = self._normalize(rules_curr)
move.remove('cut')
if len(move) > 0 and move[0] == 'add':
""" add """
if random() < q:
add_rule = sample(range(nRules), 1)[0]
else:
Yhat_neg_index = list(np.where(np.sum(self.RMatrix[:, rules_curr], axis=1) < 1)[0])
mat = np.multiply(self.RMatrix[Yhat_neg_index, :].transpose(), y[Yhat_neg_index])
TP = np.sum(mat, axis=1)
FP = np.array((np.sum(self.RMatrix[Yhat_neg_index, :], axis=0) - TP))
p = (TP.astype(float) / (TP + FP + 1))
p[rules_curr] = 0
add_rule = sample(np.where(p == max(p))[0].tolist(), 1)[0]
if add_rule not in rules_curr:
rules_curr.append(add_rule)
rules_norm = self._normalize(rules_curr)
if len(move) > 0 and move[0] == 'clean':
remove = []
for i, rule in enumerate(rules_norm):
yhat = (np.sum(
self.RMatrix[:, [rule for j, rule in enumerate(rules_norm) if (j != i and j not in remove)]],
axis=1) > 0).astype(int)
TP, FP, TN, FN = _get_confusion_matrix(yhat, y)
if TP + FP == 0:
remove.append(i)
for x in remove:
rules_norm.remove(x)
return rules_curr, rules_norm
return rules_curr, rules_norm
def _compute_prob(self, rules, y):
Yhat = (np.sum(self.RMatrix[:, rules], axis=1) > 0).astype(int)
TP, FP, TN, FN = _get_confusion_matrix(Yhat, y)
Kn_count = list(np.bincount([self.rules_len_list[x] for x in rules], minlength=self.maxlen + 1))
prior_ChsRules = sum([_log_betabin(Kn_count[i], self.pattern_space[i], self.alpha_l[i], self.beta_l[i]) for i in
range(1, len(Kn_count), 1)])
likelihood_1 = _log_betabin(TP, TP + FP, self.alpha_pos, self.beta_pos)
likelihood_2 = _log_betabin(TN, FN + TN, self.alpha_neg, self.beta_neg)
return [TP, FP, TN, FN], [prior_ChsRules, likelihood_1, likelihood_2]
def _normalize_add(self, rules_new, rule_index):
rules = rules_new.copy()
for rule in rules_new:
if set(self.rules_[rule]).issubset(self.rules_[rule_index]):
return rules_new.copy()
if set(self.rules_[rule_index]).issubset(self.rules_[rule]):
rules.remove(rule)
rules.append(rule_index)
return rules
def _normalize(self, rules_new):
try:
rules_len = [len(self.rules_[index]) for index in rules_new]
rules = [rules_new[i] for i in np.argsort(rules_len)[::-1][:len(rules_len)]]
p1 = 0
while p1 < len(rules):
for p2 in range(p1 + 1, len(rules), 1):
if set(self.rules_[rules[p2]]).issubset(set(self.rules_[rules[p1]])):
rules.remove(rules[p1])
p1 -= 1
break
p1 += 1
return rules
except:
return rules_new.copy()
def _print_rules(self, rules_max):
for rule_index in rules_max:
print(self.rules_[rule_index])
def _accumulate(iterable, func=operator.add):
'''Return running totals
Ex. _accumulate([1,2,3,4,5]) --> 1 3 6 10 15
Ex. _accumulate([1,2,3,4,5], operator.mul) --> 1 2 6 24 120
'''
it = iter(iterable)
total = next(it)
yield total
for element in it:
total = func(total, element)
yield total
def _find_lt(a, x):
""" Find rightmost value less than x"""
i = bisect_left(a, x)
if i:
return int(i - 1)
print('in _find_lt,{}'.format(a))
raise ValueError
def _log_gampoiss(k, alpha, beta):
import math
k = int(k)
return math.lgamma(k + alpha) + alpha * np.log(beta) - math.lgamma(alpha) - math.lgamma(k + 1) - (
alpha + k) * np.log(1 + beta)
def _log_betabin(k, n, alpha, beta):
import math
try:
const = math.lgamma(alpha + beta) - math.lgamma(alpha) - math.lgamma(beta)
except:
print('alpha = {}, beta = {}'.format(alpha, beta))
if isinstance(k, list) or isinstance(k, np.ndarray):
if len(k) != len(n):
print('length of k is %d and length of n is %d' % (len(k), len(n)))
raise ValueError
lbeta = []
for ki, ni in zip(k, n):
lbeta.append(math.lgamma(ki + alpha) + math.lgamma(ni - ki + beta) - math.lgamma(ni + alpha + beta) + const)
return np.array(lbeta)
else:
return math.lgamma(k + alpha) + math.lgamma(n - k + beta) - math.lgamma(n + alpha + beta) + const
def _get_confusion_matrix(Yhat, Y):
if len(Yhat) != len(Y):
raise NameError('Yhat has different length')
TP = np.dot(np.array(Y), np.array(Yhat))
FP = np.sum(Yhat) - TP
TN = len(Y) - np.sum(Y) - FP
FN = len(Yhat) - np.sum(Yhat) - TN
return TP, FP, TN, FN
def _extract_rules(tree, feature_names):
left = tree.tree_.children_left
right = tree.tree_.children_right
features = [feature_names[i] for i in tree.tree_.feature]
# get ids of child nodes
idx = np.argwhere(left == -1)[:, 0]
def _recurse(left, right, child, lineage=None):
if lineage is None:
lineage = []
if child in left:
parent = np.where(left == child)[0].item()
suffix = '_neg'
else:
parent = np.where(right == child)[0].item()
suffix = ''
lineage.append((features[parent].strip() + suffix))
if parent == 0:
lineage.reverse()
return lineage
else:
return _recurse(left, right, parent, lineage)
rules = []
for child in idx:
rule = []
for node in _recurse(left, right, child):
rule.append(node)
rules.append(rule)
return rules
if __name__ == '__main__':
test_dir = os.path.dirname(os.path.abspath(__file__))
df = read_csv(oj(test_dir, '../../tests/test_data', 'tictactoe_X.txt'), header=0, sep=" ")
Y = np.loadtxt(open(oj(test_dir, '../../tests/test_data', 'tictactoe_Y.txt'), "rb"), delimiter=" ")
lenY = len(Y)
idxs_train = sample(range(lenY), int(0.50 * lenY))
idxs_test = [i for i in range(lenY) if i not in idxs_train]
y_test = Y[idxs_test]
model = BayesianRuleSetClassifier(n_rules=100,
supp=5,
maxlen=3,
num_iterations=100,
num_chains=2,
alpha_pos=500, beta_pos=1,
alpha_neg=500, beta_neg=1,
alpha_l=None, beta_l=None)
# fit and check accuracy
np.random.seed(13)
# random.seed(13)
model.fit(df.iloc[idxs_train], Y[idxs_train])
y_pred = model.predict(df.iloc[idxs_test])
acc1 = np.mean(y_pred == y_test)
assert acc1 > 0.8
# try fitting np version
np.random.seed(13)
# random.seed(13)
model.fit(df.iloc[idxs_train].values, Y[idxs_train])
y_pred = model.predict(df.iloc[idxs_test].values)
y_test = Y[idxs_test]
acc2 = np.mean(y_pred == y_test)
assert acc2 > 0.8
# assert np.abs(acc1 - acc2) < 0.05 # todo: fix seeding
Functions
def random(size=None)
-
Return random floats in the half-open interval [0.0, 1.0). Alias for
random_sample
to ease forward-porting to the new random API.
Classes
class BayesianRuleSetClassifier (n_rules: int = 2000, supp=5, maxlen: int = 10, num_iterations=5000, num_chains=3, q=0.1, alpha_pos=100, beta_pos=1, alpha_neg=100, beta_neg=1, alpha_l=None, beta_l=None, discretization_method='randomforest', random_state=0)
-
Bayesian or-of-and algorithm. Generates patterns that satisfy the minimum support and maximum length and then select the Nrules rules that have the highest entropy. In function SA_patternbased, each local maximum is stored in maps and the best BOA is returned. Remember here the BOA contains only the index of selected rules from Nrules self.rules_
Params
n_rules number of rules to be used in SA_patternbased and also the output of generate_rules supp The higher this supp, the 'larger' a pattern is. 5% is a generally good number maxlen maximum length of a pattern num_iterations number of iterations in each chain num_chains number of chains in the simulated annealing search algorithm q alpha_pos $ ho = alpha/(alpha+beta)$. Make sure $ ho$ is close to one when choosing alpha and beta The alpha and beta parameters alter the prior distributions for different rules beta_pos alpha_neg beta_neg alpha_l beta_l discretization_method discretization method
Expand source code
class BayesianRuleSetClassifier(RuleSet, BaseEstimator, ClassifierMixin): '''Bayesian or-of-and algorithm. Generates patterns that satisfy the minimum support and maximum length and then select the Nrules rules that have the highest entropy. In function SA_patternbased, each local maximum is stored in maps and the best BOA is returned. Remember here the BOA contains only the index of selected rules from Nrules self.rules_ ''' def __init__(self, n_rules: int = 2000, supp=5, maxlen: int = 10, num_iterations=5000, num_chains=3, q=0.1, alpha_pos=100, beta_pos=1, alpha_neg=100, beta_neg=1, alpha_l=None, beta_l=None, discretization_method='randomforest', random_state=0): ''' Params ------ n_rules number of rules to be used in SA_patternbased and also the output of generate_rules supp The higher this supp, the 'larger' a pattern is. 5% is a generally good number maxlen maximum length of a pattern num_iterations number of iterations in each chain num_chains number of chains in the simulated annealing search algorithm q alpha_pos $\rho = alpha/(alpha+beta)$. Make sure $\rho$ is close to one when choosing alpha and beta The alpha and beta parameters alter the prior distributions for different rules beta_pos alpha_neg beta_neg alpha_l beta_l discretization_method discretization method ''' self.n_rules = n_rules self.supp = supp self.maxlen = maxlen self.num_iterations = num_iterations self.num_chains = num_chains self.q = q self.alpha_pos = alpha_pos self.beta_pos = beta_pos self.alpha_neg = alpha_neg self.beta_neg = beta_neg self.discretization_method = discretization_method self.alpha_l = alpha_l self.beta_l = beta_l self.random_state = 0 def fit(self, X, y, feature_names: list = None, init=[], verbose=False): ''' Parameters ---------- X : array-like, shape = [n_samples, n_features] Training data y : array_like, shape = [n_samples] Labels feature_names : array_like, shape = [n_features], optional (default: []) String labels for each feature. If empty and X is a DataFrame, column labels are used. If empty and X is not a DataFrame, then features are simply enumerated ''' # check inputs self.attr_level_num = defaultdict(int) # any missing value defaults to 0 self.attr_names = [] X, y, feature_names = check_fit_arguments(self, X, y, feature_names) np.random.seed(self.random_state) # convert to pandas DataFrame X = pd.DataFrame(X, columns=feature_names) for i, name in enumerate(X.columns): self.attr_level_num[name] += 1 self.attr_names.append(name) self.attr_names_orig = deepcopy(self.attr_names) self.attr_names = list(set(self.attr_names)) # set up patterns self._set_pattern_space() # parameter checking if self.alpha_l is None or self.beta_l is None or len(self.alpha_l) != self.maxlen or len( self.beta_l) != self.maxlen: if verbose: print('No or wrong input for alpha_l and beta_l - the model will use default parameters.') self.C = [1.0 / self.maxlen] * self.maxlen self.C.insert(0, -1) self.alpha_l = [10] * (self.maxlen + 1) self.beta_l = [10 * self.pattern_space[i] / self.C[i] for i in range(self.maxlen + 1)] else: self.alpha_l = [1] + list(self.alpha_l) self.beta_l = [1] + list(self.beta_l) # setup self._generate_rules(X, y, verbose) n_rules_current = len(self.rules_) self.rules_len_list = [len(rule) for rule in self.rules_] maps = defaultdict(list) T0 = 1000 # initial temperature for simulated annealing split = 0.7 * self.num_iterations # run simulated annealing for chain in range(self.num_chains): # initialize with a random pattern set if init != []: rules_curr = init.copy() else: assert n_rules_current > 1, f'Only {n_rules_current} potential rules found, change hyperparams to allow for more' N = sample(range(1, min(8, n_rules_current), 1), 1)[0] rules_curr = sample(range(n_rules_current), N) rules_curr_norm = self._normalize(rules_curr) pt_curr = -100000000000 maps[chain].append( [-1, [pt_curr / 3, pt_curr / 3, pt_curr / 3], rules_curr, [self.rules_[i] for i in rules_curr]]) for iter in range(self.num_iterations): if iter >= split: p = np.array(range(1 + len(maps[chain]))) p = np.array(list(_accumulate(p))) p = p / p[-1] index = _find_lt(p, random()) rules_curr = maps[chain][index][2].copy() rules_curr_norm = maps[chain][index][2].copy() # propose new rules rules_new, rules_norm = self._propose(rules_curr.copy(), rules_curr_norm.copy(), self.q, y) # compute probability of new rules cfmatrix, prob = self._compute_prob(rules_new, y) T = T0 ** (1 - iter / self.num_iterations) # temperature for simulated annealing pt_new = sum(prob) with warnings.catch_warnings(): if not verbose: warnings.simplefilter("ignore") alpha = np.exp(float(pt_new - pt_curr) / T) if pt_new > sum(maps[chain][-1][1]): maps[chain].append([iter, prob, rules_new, [self.rules_[i] for i in rules_new]]) if verbose: print(( '\n** chain = {}, max at iter = {} ** \n accuracy = {}, TP = {},FP = {}, TN = {}, FN = {}' '\n pt_new is {}, prior_ChsRules={}, likelihood_1 = {}, likelihood_2 = {}\n').format( chain, iter, (cfmatrix[0] + cfmatrix[2] + 0.0) / len(y), cfmatrix[0], cfmatrix[1], cfmatrix[2], cfmatrix[3], sum(prob), prob[0], prob[1], prob[2]) ) self._print_rules(rules_new) print(rules_new) if random() <= alpha: rules_curr_norm, rules_curr, pt_curr = rules_norm.copy(), rules_new.copy(), pt_new pt_max = [sum(maps[chain][-1][1]) for chain in range(self.num_chains)] index = pt_max.index(max(pt_max)) self.rules_ = maps[index][-1][3] return self def __str__(self): return ' '.join(str(r) for r in self.rules_) def predict(self, X): check_is_fitted(self) if isinstance(X, np.ndarray): df = pd.DataFrame(X, columns=self.attr_names_orig) else: df = X Z = [[]] * len(self.rules_) dfn = 1 - df # df has negative associations dfn.columns = [name.strip() + '_neg' for name in df.columns] df = pd.concat([df, dfn], axis=1) for i, rule in enumerate(self.rules_): Z[i] = (np.sum(df[list(rule)], axis=1) == len(rule)).astype(int) Yhat = (np.sum(Z, axis=0) > 0).astype(int) return Yhat def predict_proba(self, X): raise Exception('BOA does not support predicted probabilities.') def _set_pattern_space(self): """Compute the rule space from the levels in each attribute """ # add feat_neg to each existing feature feat for item in self.attr_names: self.attr_level_num[item + '_neg'] = self.attr_level_num[item] tmp = [item + '_neg' for item in self.attr_names] self.attr_names.extend(tmp) # set up pattern_space self.pattern_space = np.zeros(self.maxlen + 1) for k in range(1, self.maxlen + 1, 1): for subset in combinations(self.attr_names, k): tmp = 1 for i in subset: tmp = tmp * self.attr_level_num[i] # print('subset', subset, 'tmp', tmp, 'k', k) self.pattern_space[k] = self.pattern_space[k] + tmp def _generate_rules(self, X, y, verbose): '''This function generates rules that satisfy supp and maxlen using fpgrowth, then it selects the top n_rules rules that make data have the biggest decrease in entropy. There are two ways to generate rules. fpgrowth can handle cases where the maxlen is small. If maxlen<=3, fpgrowth can generates rules much faster than randomforest. If maxlen is big, fpgrowth tends to generate too many rules that overflow the memory. ''' df = 1 - X # df has negative associations df.columns = [name.strip() + '_neg' for name in X.columns] df = pd.concat([X, df], axis=1) if self.discretization_method == 'fpgrowth' and self.maxlen <= 3: itemMatrix = [[item for item in df.columns if row[item] == 1] for i, row in df.iterrows()] pindex = np.where(y == 1)[0] rules = fpgrowth([itemMatrix[i] for i in pindex], supp=self.supp, zmin=1, zmax=self.maxlen) rules = [tuple(np.sort(rule[0])) for rule in rules] rules = list(set(rules)) else: '''todo: replace this with imodels.RFDiscretizer ''' rules = [] for length in range(1, self.maxlen + 1, 1): n_estimators = min(pow(df.shape[1], length), 4000) clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=length) clf.fit(X, y) for n in range(n_estimators): rules.extend(_extract_rules(clf.estimators_[n], df.columns)) rules = [list(x) for x in set(tuple(x) for x in rules)] self.rules_ = rules # select the top n_rules rules using secondary criteria, information gain self._screen_rules(df, y, verbose) # updates self.rules_ self._set_pattern_space() def _screen_rules(self, df, y, verbose): '''Screening rules using information gain ''' item_ind_dict = {} for i, name in enumerate(df.columns): item_ind_dict[name] = i indices = np.array( list(itertools.chain.from_iterable([[ item_ind_dict[x] for x in rule] for rule in self.rules_]))) len_rules = [len(rule) for rule in self.rules_] indptr = list(_accumulate(len_rules)) indptr.insert(0, 0) indptr = np.array(indptr) data = np.ones(len(indices)) rule_matrix = csc_matrix((data, indices, indptr), shape=(len(df.columns), len(self.rules_))) mat = df.values @ rule_matrix print('mat.shape', mat.shape) len_matrix = np.array([len_rules] * df.shape[0]) Z = (mat == len_matrix).astype(int) Zpos = [Z[i] for i in np.where(y > 0)][0] TP = np.sum(Zpos, axis=0) supp_select = np.where(TP >= self.supp * sum(y) / 100)[0] FP = np.sum(Z, axis=0) - TP TN = len(y) - np.sum(y) - FP FN = np.sum(y) - TP p1 = TP.astype(float) / (TP + FP) p2 = FN.astype(float) / (FN + TN) pp = (TP + FP).astype(float) / (TP + FP + TN + FN) # p1 = np.clip(p1, a_min=1e-10, a_max=1-1e-10) print('\n\n\n\np1.shape', p1.shape, 'pp.shape', pp.shape, 'cond_entropy.shape') # , cond_entropy.shape) with warnings.catch_warnings(): if not verbose: warnings.simplefilter("ignore") # ignore warnings about invalid values (e.g. log(0)) cond_entropy = -pp * (p1 * np.log(p1) + (1 - p1) * np.log(1 - p1)) - (1 - pp) * ( p2 * np.log(p2) + (1 - p2) * np.log(1 - p2)) cond_entropy[p1 * (1 - p1) == 0] = -((1 - pp) * (p2 * np.log(p2) + (1 - p2) * np.log(1 - p2)))[ p1 * (1 - p1) == 0] cond_entropy[p2 * (1 - p2) == 0] = -(pp * (p1 * np.log(p1) + (1 - p1) * np.log(1 - p1)))[p2 * (1 - p2) == 0] cond_entropy[p1 * (1 - p1) * p2 * (1 - p2) == 0] = 0 select = np.argsort(cond_entropy[supp_select])[::-1][-self.n_rules:] self.rules_ = [self.rules_[i] for i in supp_select[select]] self.RMatrix = np.array(Z[:, supp_select[select]]) def _propose(self, rules_curr, rules_norm, q, y): nRules = len(self.rules_) yhat = (np.sum(self.RMatrix[:, rules_curr], axis=1) > 0).astype(int) incorr = np.where(y != yhat)[0] N = len(rules_curr) if len(incorr) == 0: # BOA correctly classified all points but there could be redundant patterns, so cleaning is needed move = ['clean'] else: ex = sample(incorr.tolist(), 1)[0] t = random() if y[ex] == 1 or N == 1: if t < 1.0 / 2 or N == 1: move = ['add'] # action: add else: move = ['cut', 'add'] # action: replace else: if t < 1.0 / 2: move = ['cut'] # action: cut else: move = ['cut', 'add'] # action: replace if move[0] == 'cut': """ cut """ if random() < q: candidate = list(set(np.where(self.RMatrix[ex, :] == 1)[0]).intersection(rules_curr)) if len(candidate) == 0: candidate = rules_curr cut_rule = sample(candidate, 1)[0] else: p = [] all_sum = np.sum(self.RMatrix[:, rules_curr], axis=1) for index, rule in enumerate(rules_curr): yhat = ((all_sum - np.array(self.RMatrix[:, rule])) > 0).astype(int) TP, FP, TN, FN = _get_confusion_matrix(yhat, y) p.append(TP.astype(float) / (TP + FP + 1)) p = [x - min(p) for x in p] p = np.exp(p) p = np.insert(p, 0, 0) p = np.array(list(_accumulate(p))) if p[-1] == 0: index = sample(range(len(rules_curr)), 1)[0] else: p = p / p[-1] index = _find_lt(p, random()) cut_rule = rules_curr[index] rules_curr.remove(cut_rule) rules_norm = self._normalize(rules_curr) move.remove('cut') if len(move) > 0 and move[0] == 'add': """ add """ if random() < q: add_rule = sample(range(nRules), 1)[0] else: Yhat_neg_index = list(np.where(np.sum(self.RMatrix[:, rules_curr], axis=1) < 1)[0]) mat = np.multiply(self.RMatrix[Yhat_neg_index, :].transpose(), y[Yhat_neg_index]) TP = np.sum(mat, axis=1) FP = np.array((np.sum(self.RMatrix[Yhat_neg_index, :], axis=0) - TP)) p = (TP.astype(float) / (TP + FP + 1)) p[rules_curr] = 0 add_rule = sample(np.where(p == max(p))[0].tolist(), 1)[0] if add_rule not in rules_curr: rules_curr.append(add_rule) rules_norm = self._normalize(rules_curr) if len(move) > 0 and move[0] == 'clean': remove = [] for i, rule in enumerate(rules_norm): yhat = (np.sum( self.RMatrix[:, [rule for j, rule in enumerate(rules_norm) if (j != i and j not in remove)]], axis=1) > 0).astype(int) TP, FP, TN, FN = _get_confusion_matrix(yhat, y) if TP + FP == 0: remove.append(i) for x in remove: rules_norm.remove(x) return rules_curr, rules_norm return rules_curr, rules_norm def _compute_prob(self, rules, y): Yhat = (np.sum(self.RMatrix[:, rules], axis=1) > 0).astype(int) TP, FP, TN, FN = _get_confusion_matrix(Yhat, y) Kn_count = list(np.bincount([self.rules_len_list[x] for x in rules], minlength=self.maxlen + 1)) prior_ChsRules = sum([_log_betabin(Kn_count[i], self.pattern_space[i], self.alpha_l[i], self.beta_l[i]) for i in range(1, len(Kn_count), 1)]) likelihood_1 = _log_betabin(TP, TP + FP, self.alpha_pos, self.beta_pos) likelihood_2 = _log_betabin(TN, FN + TN, self.alpha_neg, self.beta_neg) return [TP, FP, TN, FN], [prior_ChsRules, likelihood_1, likelihood_2] def _normalize_add(self, rules_new, rule_index): rules = rules_new.copy() for rule in rules_new: if set(self.rules_[rule]).issubset(self.rules_[rule_index]): return rules_new.copy() if set(self.rules_[rule_index]).issubset(self.rules_[rule]): rules.remove(rule) rules.append(rule_index) return rules def _normalize(self, rules_new): try: rules_len = [len(self.rules_[index]) for index in rules_new] rules = [rules_new[i] for i in np.argsort(rules_len)[::-1][:len(rules_len)]] p1 = 0 while p1 < len(rules): for p2 in range(p1 + 1, len(rules), 1): if set(self.rules_[rules[p2]]).issubset(set(self.rules_[rules[p1]])): rules.remove(rules[p1]) p1 -= 1 break p1 += 1 return rules except: return rules_new.copy() def _print_rules(self, rules_max): for rule_index in rules_max: print(self.rules_[rule_index])
Ancestors
- RuleSet
- sklearn.base.BaseEstimator
- sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
- sklearn.base.ClassifierMixin
Methods
def fit(self, X, y, feature_names: list = None, init=[], verbose=False)
-
Parameters
X
:array-like, shape = [n_samples, n_features]
- Training data
y
:array_like, shape = [n_samples]
- Labels
feature_names
:array_like, shape = [n_features]
, optional(default: [])
- String labels for each feature. If empty and X is a DataFrame, column labels are used. If empty and X is not a DataFrame, then features are simply enumerated
Expand source code
def fit(self, X, y, feature_names: list = None, init=[], verbose=False): ''' Parameters ---------- X : array-like, shape = [n_samples, n_features] Training data y : array_like, shape = [n_samples] Labels feature_names : array_like, shape = [n_features], optional (default: []) String labels for each feature. If empty and X is a DataFrame, column labels are used. If empty and X is not a DataFrame, then features are simply enumerated ''' # check inputs self.attr_level_num = defaultdict(int) # any missing value defaults to 0 self.attr_names = [] X, y, feature_names = check_fit_arguments(self, X, y, feature_names) np.random.seed(self.random_state) # convert to pandas DataFrame X = pd.DataFrame(X, columns=feature_names) for i, name in enumerate(X.columns): self.attr_level_num[name] += 1 self.attr_names.append(name) self.attr_names_orig = deepcopy(self.attr_names) self.attr_names = list(set(self.attr_names)) # set up patterns self._set_pattern_space() # parameter checking if self.alpha_l is None or self.beta_l is None or len(self.alpha_l) != self.maxlen or len( self.beta_l) != self.maxlen: if verbose: print('No or wrong input for alpha_l and beta_l - the model will use default parameters.') self.C = [1.0 / self.maxlen] * self.maxlen self.C.insert(0, -1) self.alpha_l = [10] * (self.maxlen + 1) self.beta_l = [10 * self.pattern_space[i] / self.C[i] for i in range(self.maxlen + 1)] else: self.alpha_l = [1] + list(self.alpha_l) self.beta_l = [1] + list(self.beta_l) # setup self._generate_rules(X, y, verbose) n_rules_current = len(self.rules_) self.rules_len_list = [len(rule) for rule in self.rules_] maps = defaultdict(list) T0 = 1000 # initial temperature for simulated annealing split = 0.7 * self.num_iterations # run simulated annealing for chain in range(self.num_chains): # initialize with a random pattern set if init != []: rules_curr = init.copy() else: assert n_rules_current > 1, f'Only {n_rules_current} potential rules found, change hyperparams to allow for more' N = sample(range(1, min(8, n_rules_current), 1), 1)[0] rules_curr = sample(range(n_rules_current), N) rules_curr_norm = self._normalize(rules_curr) pt_curr = -100000000000 maps[chain].append( [-1, [pt_curr / 3, pt_curr / 3, pt_curr / 3], rules_curr, [self.rules_[i] for i in rules_curr]]) for iter in range(self.num_iterations): if iter >= split: p = np.array(range(1 + len(maps[chain]))) p = np.array(list(_accumulate(p))) p = p / p[-1] index = _find_lt(p, random()) rules_curr = maps[chain][index][2].copy() rules_curr_norm = maps[chain][index][2].copy() # propose new rules rules_new, rules_norm = self._propose(rules_curr.copy(), rules_curr_norm.copy(), self.q, y) # compute probability of new rules cfmatrix, prob = self._compute_prob(rules_new, y) T = T0 ** (1 - iter / self.num_iterations) # temperature for simulated annealing pt_new = sum(prob) with warnings.catch_warnings(): if not verbose: warnings.simplefilter("ignore") alpha = np.exp(float(pt_new - pt_curr) / T) if pt_new > sum(maps[chain][-1][1]): maps[chain].append([iter, prob, rules_new, [self.rules_[i] for i in rules_new]]) if verbose: print(( '\n** chain = {}, max at iter = {} ** \n accuracy = {}, TP = {},FP = {}, TN = {}, FN = {}' '\n pt_new is {}, prior_ChsRules={}, likelihood_1 = {}, likelihood_2 = {}\n').format( chain, iter, (cfmatrix[0] + cfmatrix[2] + 0.0) / len(y), cfmatrix[0], cfmatrix[1], cfmatrix[2], cfmatrix[3], sum(prob), prob[0], prob[1], prob[2]) ) self._print_rules(rules_new) print(rules_new) if random() <= alpha: rules_curr_norm, rules_curr, pt_curr = rules_norm.copy(), rules_new.copy(), pt_new pt_max = [sum(maps[chain][-1][1]) for chain in range(self.num_chains)] index = pt_max.index(max(pt_max)) self.rules_ = maps[index][-1][3] return self
def predict(self, X)
-
Expand source code
def predict(self, X): check_is_fitted(self) if isinstance(X, np.ndarray): df = pd.DataFrame(X, columns=self.attr_names_orig) else: df = X Z = [[]] * len(self.rules_) dfn = 1 - df # df has negative associations dfn.columns = [name.strip() + '_neg' for name in df.columns] df = pd.concat([df, dfn], axis=1) for i, rule in enumerate(self.rules_): Z[i] = (np.sum(df[list(rule)], axis=1) == len(rule)).astype(int) Yhat = (np.sum(Z, axis=0) > 0).astype(int) return Yhat
def predict_proba(self, X)
-
Expand source code
def predict_proba(self, X): raise Exception('BOA does not support predicted probabilities.')
def set_fit_request(self: BayesianRuleSetClassifier, *, feature_names: Union[bool, ForwardRef(None), str] = '$UNCHANGED$', init: Union[bool, ForwardRef(None), str] = '$UNCHANGED$', verbose: Union[bool, ForwardRef(None), str] = '$UNCHANGED$') ‑> BayesianRuleSetClassifier
-
Request metadata passed to the
fit
method.Note that this method is only relevant if
enable_metadata_routing=True
(see :func:sklearn.set_config
). Please see :ref:User Guide <metadata_routing>
on how the routing mechanism works.The options for each parameter are:
-
True
: metadata is requested, and passed tofit
if provided. The request is ignored if metadata is not provided. -
False
: metadata is not requested and the meta-estimator will not pass it tofit
. -
None
: metadata is not requested, and the meta-estimator will raise an error if the user provides it. -
str
: metadata should be passed to the meta-estimator with this given alias instead of the original name.
The default (
sklearn.utils.metadata_routing.UNCHANGED
) retains the existing request. This allows you to change the request for some parameters and not others.Added in version: 1.3
Note
This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:
~sklearn.pipeline.Pipeline
. Otherwise it has no effect.Parameters
feature_names
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
feature_names
parameter infit
. init
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
init
parameter infit
. verbose
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
verbose
parameter infit
.
Returns
self
:object
- The updated object.
Expand source code
def func(*args, **kw): """Updates the request for provided parameters This docstring is overwritten below. See REQUESTER_DOC for expected functionality """ if not _routing_enabled(): raise RuntimeError( "This method is only available when metadata routing is enabled." " You can enable it using" " sklearn.set_config(enable_metadata_routing=True)." ) if self.validate_keys and (set(kw) - set(self.keys)): raise TypeError( f"Unexpected args: {set(kw) - set(self.keys)} in {self.name}. " f"Accepted arguments are: {set(self.keys)}" ) # This makes it possible to use the decorated method as an unbound method, # for instance when monkeypatching. # https://github.com/scikit-learn/scikit-learn/issues/28632 if instance is None: _instance = args[0] args = args[1:] else: _instance = instance # Replicating python's behavior when positional args are given other than # `self`, and `self` is only allowed if this method is unbound. if args: raise TypeError( f"set_{self.name}_request() takes 0 positional argument but" f" {len(args)} were given" ) requests = _instance._get_metadata_request() method_metadata_request = getattr(requests, self.name) for prop, alias in kw.items(): if alias is not UNCHANGED: method_metadata_request.add_request(param=prop, alias=alias) _instance._metadata_request = requests return _instance
-
def set_score_request(self: BayesianRuleSetClassifier, *, sample_weight: Union[bool, ForwardRef(None), str] = '$UNCHANGED$') ‑> BayesianRuleSetClassifier
-
Request metadata passed to the
score
method.Note that this method is only relevant if
enable_metadata_routing=True
(see :func:sklearn.set_config
). Please see :ref:User Guide <metadata_routing>
on how the routing mechanism works.The options for each parameter are:
-
True
: metadata is requested, and passed toscore
if provided. The request is ignored if metadata is not provided. -
False
: metadata is not requested and the meta-estimator will not pass it toscore
. -
None
: metadata is not requested, and the meta-estimator will raise an error if the user provides it. -
str
: metadata should be passed to the meta-estimator with this given alias instead of the original name.
The default (
sklearn.utils.metadata_routing.UNCHANGED
) retains the existing request. This allows you to change the request for some parameters and not others.Added in version: 1.3
Note
This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:
~sklearn.pipeline.Pipeline
. Otherwise it has no effect.Parameters
sample_weight
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
sample_weight
parameter inscore
.
Returns
self
:object
- The updated object.
Expand source code
def func(*args, **kw): """Updates the request for provided parameters This docstring is overwritten below. See REQUESTER_DOC for expected functionality """ if not _routing_enabled(): raise RuntimeError( "This method is only available when metadata routing is enabled." " You can enable it using" " sklearn.set_config(enable_metadata_routing=True)." ) if self.validate_keys and (set(kw) - set(self.keys)): raise TypeError( f"Unexpected args: {set(kw) - set(self.keys)} in {self.name}. " f"Accepted arguments are: {set(self.keys)}" ) # This makes it possible to use the decorated method as an unbound method, # for instance when monkeypatching. # https://github.com/scikit-learn/scikit-learn/issues/28632 if instance is None: _instance = args[0] args = args[1:] else: _instance = instance # Replicating python's behavior when positional args are given other than # `self`, and `self` is only allowed if this method is unbound. if args: raise TypeError( f"set_{self.name}_request() takes 0 positional argument but" f" {len(args)} were given" ) requests = _instance._get_metadata_request() method_metadata_request = getattr(requests, self.name) for prop, alias in kw.items(): if alias is not UNCHANGED: method_metadata_request.add_request(param=prop, alias=alias) _instance._metadata_request = requests return _instance
-