Expand source code
from typing import Iterable, Tuple, List

import numpy as np
import pandas as pd
from mlxtend import frequent_patterns as mlx
from sklearn.ensemble import BaggingRegressor, GradientBoostingRegressor, RandomForestRegressor, \
    GradientBoostingClassifier, RandomForestClassifier
from sklearn.tree import DecisionTreeRegressor
from sklearn.utils.validation import check_array
import inspect
from imodels.util import rule, convert


def extract_fpgrowth(X,
                     minsupport=0.1,
                     maxcardinality=2,
                     verbose=False) -> List[Tuple]:

    itemsets_df = mlx.fpgrowth(
        X, min_support=minsupport, max_len=maxcardinality)
    itemsets_indices = [tuple(s[1]) for s in itemsets_df.values]
    itemsets = [np.array(X.columns)[list(inds)] for inds in itemsets_indices]
    itemsets = list(map(tuple, itemsets))
    if verbose:
        print(len(itemsets), 'rules mined')

    return itemsets


def extract_rulefit(X, y, feature_names,
                    n_estimators=10,
                    tree_size=4,
                    memory_par=0.01,
                    tree_generator=None,
                    exp_rand_tree_size=True,
                    random_state=None) -> List[str]:
    if tree_generator is None:
        sample_fract_ = min(0.5, (100 + 6 * np.sqrt(X.shape[0])) / X.shape[0])

        tree_generator = GradientBoostingRegressor(n_estimators=n_estimators,
                                                   max_leaf_nodes=tree_size,
                                                   learning_rate=memory_par,
                                                   subsample=sample_fract_,
                                                   random_state=random_state,
                                                   max_depth=100)

    if type(tree_generator) not in [GradientBoostingClassifier, GradientBoostingRegressor,
                                    RandomForestRegressor, RandomForestClassifier]:
        raise ValueError(
            "RuleFit only works with GradientBoostingClassifier(), GradientBoostingRegressor(), "
            "RandomForestRegressor() or RandomForestClassifier()")

    # fit tree generator
    if not exp_rand_tree_size:  # simply fit with constant tree size
        tree_generator.fit(X, y)
    else:  # randomise tree size as per Friedman 2005 Sec 3.3
        np.random.seed(random_state)
        tree_sizes = np.random.exponential(
            scale=tree_size - 2, size=n_estimators)
        tree_sizes = np.asarray([2 + np.floor(tree_sizes[i_])
                                for i_ in np.arange(len(tree_sizes))], dtype=int)
        tree_generator.set_params(warm_start=True)
        curr_est_ = 0
        for i_size in np.arange(len(tree_sizes)):
            size = tree_sizes[i_size]
            tree_generator.set_params(n_estimators=curr_est_ + 1)
            tree_generator.set_params(max_leaf_nodes=size)
            random_state_add = random_state if random_state else 0
            tree_generator.set_params(
                random_state=i_size + random_state_add)  # warm_state=True seems to reset random_state, such that the trees are highly correlated, unless we manually change the random_sate here.
            tree_generator.fit(np.copy(X, order='C'), np.copy(y, order='C'))
            curr_est_ = curr_est_ + 1
        tree_generator.set_params(warm_start=False)

    if isinstance(tree_generator, RandomForestRegressor) or isinstance(tree_generator, RandomForestClassifier):
        estimators_ = [[x] for x in tree_generator.estimators_]
    else:
        estimators_ = tree_generator.estimators_

    seen_rules = set()
    extracted_rules = []
    for estimator in estimators_:
        for rule_value_pair in convert.tree_to_rules(estimator[0], np.array(feature_names), prediction_values=True):

            rule_obj = rule.Rule(rule_value_pair[0])

            if rule_obj not in seen_rules:
                extracted_rules.append(rule_value_pair)
                seen_rules.add(rule_obj)

    extracted_rules = sorted(extracted_rules, key=lambda x: x[1])
    extracted_rules = list(map(lambda x: x[0], extracted_rules))
    return extracted_rules


def extract_skope(X, y, feature_names,
                  sample_weight=None,
                  n_estimators=10,
                  max_samples=.8,
                  max_samples_features=1.,
                  bootstrap=False,
                  bootstrap_features=False,
                  max_depths=[3],
                  max_features=1.,
                  min_samples_split=2,
                  n_jobs=1,
                  random_state=None,
                  verbose=0) -> Tuple[List[str], List[np.array], List[np.array]]:
    ensembles = []
    if not isinstance(max_depths, Iterable):
        max_depths = [max_depths]

    for max_depth in max_depths:

        # pass different key based on sklearn version
        estimator = DecisionTreeRegressor(
            max_depth=max_depth,
            max_features=max_features,
            min_samples_split=min_samples_split,

        )
        init_signature = inspect.signature(BaggingRegressor.__init__)
        estimator_key = 'estimator' if 'estimator' in init_signature.parameters.keys(
        ) else 'base_estimator'
        kwargs = {
            estimator_key: estimator,
        }
        bagging_clf = BaggingRegressor(
            n_estimators=n_estimators,
            max_samples=max_samples,
            max_features=max_samples_features,
            bootstrap=bootstrap,
            bootstrap_features=bootstrap_features,
            # oob_score=... XXX may be added
            # if selection on tree perf needed.
            # warm_start=... XXX may be added to increase computation perf.
            n_jobs=n_jobs,
            random_state=random_state,
            verbose=verbose,
            **kwargs
        )
        ensembles.append(bagging_clf)

    y_reg = y
    if sample_weight is not None:
        sample_weight = check_array(sample_weight, ensure_2d=False)
        weights = sample_weight - sample_weight.min()
        contamination = float(sum(y)) / len(y)
        y_reg = (
            pow(weights, 0.5) * 0.5 / contamination * (y > 0) -
            pow((weights).mean(), 0.5) * (y == 0)
        )
        y_reg = 1. / (1 + np.exp(-y_reg))  # sigmoid

    for e in ensembles[:len(ensembles) // 2]:
        e.fit(X, y)

    for e in ensembles[len(ensembles) // 2:]:
        e.fit(X, y_reg)

    estimators_, estimators_samples_, estimators_features_ = [], [], []
    for ensemble in ensembles:
        estimators_ += ensemble.estimators_
        estimators_samples_ += ensemble.estimators_samples_
        estimators_features_ += ensemble.estimators_features_

    extracted_rules = []
    for estimator, features in zip(estimators_, estimators_features_):
        extracted_rules.append(convert.tree_to_rules(
            estimator, np.array(feature_names)[features]))

    return extracted_rules, estimators_samples_, estimators_features_


def extract_marginal_curves(clf, X, max_evals=100):
    """Uses predict_proba to compute marginal curves.
    Assumes clf is a classifier with a predict_proba method and that classifier is additive across features
    For GAM, this returns the shape functions

    Params
    ------
    clf : classifier
        A classifier with a predict_proba method
    X : array-like
        The data to compute the marginal curves on (used to calculate unique feature vals)
    max_evals : int
        The maximum number of evaluations to make for each feature

    Returns
    -------
    feature_vals_list : list of arrays
        The values of each feature for which the shape function is evaluated.
    shape_function_vals_list : list of arrays
        The shape function evaluated at each value of the corresponding feature.
    """
    p = X.shape[1]
    dummy_input = np.zeros((1, p))
    base = clf.predict_proba(dummy_input)[:, 1][0]
    feature_vals_list = []
    shape_function_vals_list = []
    for feat_num in range(p):
        feature_vals = sorted(np.unique(X[:, feat_num]))
        while len(feature_vals) > max_evals:
            feature_vals = feature_vals[::2]
        dummy_input = np.zeros((len(feature_vals), p))
        dummy_input[:, feat_num] = feature_vals
        shape_function_vals = clf.predict_proba(dummy_input)[:, 1] - base
        feature_vals_list.append(feature_vals)
        shape_function_vals_list.append(shape_function_vals.tolist())
    return feature_vals_list, shape_function_vals_list


if __name__ == '__main__':
    init_signature = inspect.signature(BaggingRegressor.__init__)
    print('estimator' in init_signature.parameters.keys())

Functions

def extract_fpgrowth(X, minsupport=0.1, maxcardinality=2, verbose=False) ‑> List[Tuple]
Expand source code
def extract_fpgrowth(X,
                     minsupport=0.1,
                     maxcardinality=2,
                     verbose=False) -> List[Tuple]:

    itemsets_df = mlx.fpgrowth(
        X, min_support=minsupport, max_len=maxcardinality)
    itemsets_indices = [tuple(s[1]) for s in itemsets_df.values]
    itemsets = [np.array(X.columns)[list(inds)] for inds in itemsets_indices]
    itemsets = list(map(tuple, itemsets))
    if verbose:
        print(len(itemsets), 'rules mined')

    return itemsets
def extract_marginal_curves(clf, X, max_evals=100)

Uses predict_proba to compute marginal curves. Assumes clf is a classifier with a predict_proba method and that classifier is additive across features For GAM, this returns the shape functions

Params

clf : classifier A classifier with a predict_proba method X : array-like The data to compute the marginal curves on (used to calculate unique feature vals) max_evals : int The maximum number of evaluations to make for each feature

Returns

feature_vals_list : list of arrays
The values of each feature for which the shape function is evaluated.
shape_function_vals_list : list of arrays
The shape function evaluated at each value of the corresponding feature.
Expand source code
def extract_marginal_curves(clf, X, max_evals=100):
    """Uses predict_proba to compute marginal curves.
    Assumes clf is a classifier with a predict_proba method and that classifier is additive across features
    For GAM, this returns the shape functions

    Params
    ------
    clf : classifier
        A classifier with a predict_proba method
    X : array-like
        The data to compute the marginal curves on (used to calculate unique feature vals)
    max_evals : int
        The maximum number of evaluations to make for each feature

    Returns
    -------
    feature_vals_list : list of arrays
        The values of each feature for which the shape function is evaluated.
    shape_function_vals_list : list of arrays
        The shape function evaluated at each value of the corresponding feature.
    """
    p = X.shape[1]
    dummy_input = np.zeros((1, p))
    base = clf.predict_proba(dummy_input)[:, 1][0]
    feature_vals_list = []
    shape_function_vals_list = []
    for feat_num in range(p):
        feature_vals = sorted(np.unique(X[:, feat_num]))
        while len(feature_vals) > max_evals:
            feature_vals = feature_vals[::2]
        dummy_input = np.zeros((len(feature_vals), p))
        dummy_input[:, feat_num] = feature_vals
        shape_function_vals = clf.predict_proba(dummy_input)[:, 1] - base
        feature_vals_list.append(feature_vals)
        shape_function_vals_list.append(shape_function_vals.tolist())
    return feature_vals_list, shape_function_vals_list
def extract_rulefit(X, y, feature_names, n_estimators=10, tree_size=4, memory_par=0.01, tree_generator=None, exp_rand_tree_size=True, random_state=None) ‑> List[str]
Expand source code
def extract_rulefit(X, y, feature_names,
                    n_estimators=10,
                    tree_size=4,
                    memory_par=0.01,
                    tree_generator=None,
                    exp_rand_tree_size=True,
                    random_state=None) -> List[str]:
    if tree_generator is None:
        sample_fract_ = min(0.5, (100 + 6 * np.sqrt(X.shape[0])) / X.shape[0])

        tree_generator = GradientBoostingRegressor(n_estimators=n_estimators,
                                                   max_leaf_nodes=tree_size,
                                                   learning_rate=memory_par,
                                                   subsample=sample_fract_,
                                                   random_state=random_state,
                                                   max_depth=100)

    if type(tree_generator) not in [GradientBoostingClassifier, GradientBoostingRegressor,
                                    RandomForestRegressor, RandomForestClassifier]:
        raise ValueError(
            "RuleFit only works with GradientBoostingClassifier(), GradientBoostingRegressor(), "
            "RandomForestRegressor() or RandomForestClassifier()")

    # fit tree generator
    if not exp_rand_tree_size:  # simply fit with constant tree size
        tree_generator.fit(X, y)
    else:  # randomise tree size as per Friedman 2005 Sec 3.3
        np.random.seed(random_state)
        tree_sizes = np.random.exponential(
            scale=tree_size - 2, size=n_estimators)
        tree_sizes = np.asarray([2 + np.floor(tree_sizes[i_])
                                for i_ in np.arange(len(tree_sizes))], dtype=int)
        tree_generator.set_params(warm_start=True)
        curr_est_ = 0
        for i_size in np.arange(len(tree_sizes)):
            size = tree_sizes[i_size]
            tree_generator.set_params(n_estimators=curr_est_ + 1)
            tree_generator.set_params(max_leaf_nodes=size)
            random_state_add = random_state if random_state else 0
            tree_generator.set_params(
                random_state=i_size + random_state_add)  # warm_state=True seems to reset random_state, such that the trees are highly correlated, unless we manually change the random_sate here.
            tree_generator.fit(np.copy(X, order='C'), np.copy(y, order='C'))
            curr_est_ = curr_est_ + 1
        tree_generator.set_params(warm_start=False)

    if isinstance(tree_generator, RandomForestRegressor) or isinstance(tree_generator, RandomForestClassifier):
        estimators_ = [[x] for x in tree_generator.estimators_]
    else:
        estimators_ = tree_generator.estimators_

    seen_rules = set()
    extracted_rules = []
    for estimator in estimators_:
        for rule_value_pair in convert.tree_to_rules(estimator[0], np.array(feature_names), prediction_values=True):

            rule_obj = rule.Rule(rule_value_pair[0])

            if rule_obj not in seen_rules:
                extracted_rules.append(rule_value_pair)
                seen_rules.add(rule_obj)

    extracted_rules = sorted(extracted_rules, key=lambda x: x[1])
    extracted_rules = list(map(lambda x: x[0], extracted_rules))
    return extracted_rules
def extract_skope(X, y, feature_names, sample_weight=None, n_estimators=10, max_samples=0.8, max_samples_features=1.0, bootstrap=False, bootstrap_features=False, max_depths=[3], max_features=1.0, min_samples_split=2, n_jobs=1, random_state=None, verbose=0) ‑> Tuple[List[str], List[], List[]]
Expand source code
def extract_skope(X, y, feature_names,
                  sample_weight=None,
                  n_estimators=10,
                  max_samples=.8,
                  max_samples_features=1.,
                  bootstrap=False,
                  bootstrap_features=False,
                  max_depths=[3],
                  max_features=1.,
                  min_samples_split=2,
                  n_jobs=1,
                  random_state=None,
                  verbose=0) -> Tuple[List[str], List[np.array], List[np.array]]:
    ensembles = []
    if not isinstance(max_depths, Iterable):
        max_depths = [max_depths]

    for max_depth in max_depths:

        # pass different key based on sklearn version
        estimator = DecisionTreeRegressor(
            max_depth=max_depth,
            max_features=max_features,
            min_samples_split=min_samples_split,

        )
        init_signature = inspect.signature(BaggingRegressor.__init__)
        estimator_key = 'estimator' if 'estimator' in init_signature.parameters.keys(
        ) else 'base_estimator'
        kwargs = {
            estimator_key: estimator,
        }
        bagging_clf = BaggingRegressor(
            n_estimators=n_estimators,
            max_samples=max_samples,
            max_features=max_samples_features,
            bootstrap=bootstrap,
            bootstrap_features=bootstrap_features,
            # oob_score=... XXX may be added
            # if selection on tree perf needed.
            # warm_start=... XXX may be added to increase computation perf.
            n_jobs=n_jobs,
            random_state=random_state,
            verbose=verbose,
            **kwargs
        )
        ensembles.append(bagging_clf)

    y_reg = y
    if sample_weight is not None:
        sample_weight = check_array(sample_weight, ensure_2d=False)
        weights = sample_weight - sample_weight.min()
        contamination = float(sum(y)) / len(y)
        y_reg = (
            pow(weights, 0.5) * 0.5 / contamination * (y > 0) -
            pow((weights).mean(), 0.5) * (y == 0)
        )
        y_reg = 1. / (1 + np.exp(-y_reg))  # sigmoid

    for e in ensembles[:len(ensembles) // 2]:
        e.fit(X, y)

    for e in ensembles[len(ensembles) // 2:]:
        e.fit(X, y_reg)

    estimators_, estimators_samples_, estimators_features_ = [], [], []
    for ensemble in ensembles:
        estimators_ += ensemble.estimators_
        estimators_samples_ += ensemble.estimators_samples_
        estimators_features_ += ensemble.estimators_features_

    extracted_rules = []
    for estimator, features in zip(estimators_, estimators_features_):
        extracted_rules.append(convert.tree_to_rules(
            estimator, np.array(feature_names)[features]))

    return extracted_rules, estimators_samples_, estimators_features_