Expand source code
from typing import List, Tuple
from warnings import warn

import pandas as pd
import numpy as np
from sklearn.utils._mask import indices_to_mask
from sklearn.linear_model import Lasso, LogisticRegression
from sklearn.linear_model._coordinate_descent import _alpha_grid
from sklearn.model_selection import cross_val_score

from imodels.util.rule import Rule


def score_precision_recall(X,
                           y,
                           rules: List[List[str]],
                           samples: List[List[int]],
                           features: List[List[int]],
                           feature_names: List[str],
                           oob: bool = True) -> List[Rule]:

    scored_rules = []

    for curr_rules, curr_samples, curr_features in zip(rules, samples, features):

        # Create mask for OOB samples
        mask = ~indices_to_mask(curr_samples, X.shape[0])
        if sum(mask) == 0:
            if oob:
                warn(
                    "OOB evaluation not possible: doing it in-bag. Performance evaluation is "
                    "likely to be wrong (overfitting) and selected rules are likely to not "
                    "perform well! Please use max_samples < 1."
                )
            mask = curr_samples

        # XXX todo: idem without dataframe

        curr_features_to_use = np.unique(curr_features)
        X_oob = pd.DataFrame(
            (X[mask, :])[:, curr_features_to_use],
            columns=np.array(feature_names)[curr_features_to_use]
        )

        if X_oob.shape[1] <= 1:  # otherwise pandas bug (cf. issue #16363)
            return []

        y_oob = y[mask]
        y_oob = np.array((y_oob != 0))

        # Add OOB performances to rules:
        scored_rules += [
            Rule(r, args=_eval_rule_perf(r, X_oob, y_oob))
            for r in set(curr_rules)
        ]

    return scored_rules


def _eval_rule_perf(rule: str, X, y) -> Tuple[float, float]:
    detected_index = list(X.query(rule).index)
    if len(detected_index) <= 1:
        return (0, 0)
    y_detected = y[detected_index]
    true_pos = y_detected[y_detected > 0].sum()
    if true_pos == 0:
        return (0, 0)
    pos = y[y > 0].sum()
    return y_detected.mean(), float(true_pos) / pos


def score_linear(X, y, rules: List[str],
                 penalty='l1',
                 prediction_task='regression',
                 max_rules=30,
                 alpha=None,
                 cv=True,
                 random_state=None) -> Tuple[List[Rule], List[float], float]:

    if alpha is not None:
        final_alpha = alpha
        if max_rules is not None:
            warn("Ignoring max_rules parameter since alpha passed explicitly")

    elif max_rules is not None:
        final_alpha = get_best_alpha_under_max_rules(X, y, rules,
                                                     penalty=penalty,
                                                     prediction_task=prediction_task,
                                                     max_rules=max_rules,
                                                     cv=cv,
                                                     random_state=random_state)
    else:
        raise ValueError("Invalid alpha and max_rules passed")

    if prediction_task == 'regression':
        lin_model = Lasso(alpha=final_alpha,
                          random_state=random_state, max_iter=2000)
    else:
        lin_model = LogisticRegression(
            penalty=penalty, C=(1 / final_alpha), solver='liblinear',
            random_state=random_state, max_iter=200)

    lin_model.fit(X, y)

    coef_ = lin_model.coef_.flatten()
    coefs = list(coef_[:coef_.shape[0] - len(rules)])
    support = np.sum(X[:, -len(rules):], axis=0) / X.shape[0]

    nonzero_rules = []
    coef_zero_threshold = 1e-6 / np.mean(np.abs(y))
    for r, w, s in zip(rules, coef_[-len(rules):], support):
        if abs(w) > coef_zero_threshold:
            nonzero_rules.append(Rule(r, args=[w], support=s))
            coefs.append(w)

    return nonzero_rules, coefs, lin_model.intercept_


def get_best_alpha_under_max_rules(X, y, rules: List[str],
                                   penalty='l1',
                                   prediction_task='regression',
                                   max_rules=30,
                                   cv=True,
                                   random_state=None) -> float:
    coef_zero_threshold = 1e-6 / np.mean(np.abs(y))
    alpha_scores = []

    if prediction_task == 'regression':
        alphas = _alpha_grid(X, y)
    elif prediction_task == 'classification':
        # LogisticRegression accepts inverse of regularization
        alphas = np.flip(np.logspace(-4, 4, num=100, base=10))

    # alphas are sorted from highest to lowest regularization
    for i, alpha in enumerate(alphas):

        if prediction_task == 'regression':
            m = Lasso(alpha=alpha, random_state=random_state, max_iter=2000)
            cv_scoring = 'neg_mean_squared_error'
        else:
            m = LogisticRegression(
                penalty=penalty, C=(1 / alpha), solver='liblinear', random_state=random_state)
            cv_scoring = 'accuracy'

        m.fit(X, y)
        rule_coefs = m.coef_.flatten()
        rule_count = np.sum(np.abs(rule_coefs) > coef_zero_threshold)

        if rule_count > max_rules:
            break

        if cv:
            fold_scores = cross_val_score(m, X, y, cv=5, scoring=cv_scoring)
            alpha_scores.append(np.mean(fold_scores))

    if cv and np.all(alpha_scores != alpha_scores[0]):
        # check for rare case in which diff alphas lead to identical scores
        final_alpha = alphas[np.argmax(alpha_scores)]
    else:
        final_alpha = alphas[i - 1]

    return final_alpha

Functions

def get_best_alpha_under_max_rules(X, y, rules: List[str], penalty='l1', prediction_task='regression', max_rules=30, cv=True, random_state=None) ‑> float
Expand source code
def get_best_alpha_under_max_rules(X, y, rules: List[str],
                                   penalty='l1',
                                   prediction_task='regression',
                                   max_rules=30,
                                   cv=True,
                                   random_state=None) -> float:
    coef_zero_threshold = 1e-6 / np.mean(np.abs(y))
    alpha_scores = []

    if prediction_task == 'regression':
        alphas = _alpha_grid(X, y)
    elif prediction_task == 'classification':
        # LogisticRegression accepts inverse of regularization
        alphas = np.flip(np.logspace(-4, 4, num=100, base=10))

    # alphas are sorted from highest to lowest regularization
    for i, alpha in enumerate(alphas):

        if prediction_task == 'regression':
            m = Lasso(alpha=alpha, random_state=random_state, max_iter=2000)
            cv_scoring = 'neg_mean_squared_error'
        else:
            m = LogisticRegression(
                penalty=penalty, C=(1 / alpha), solver='liblinear', random_state=random_state)
            cv_scoring = 'accuracy'

        m.fit(X, y)
        rule_coefs = m.coef_.flatten()
        rule_count = np.sum(np.abs(rule_coefs) > coef_zero_threshold)

        if rule_count > max_rules:
            break

        if cv:
            fold_scores = cross_val_score(m, X, y, cv=5, scoring=cv_scoring)
            alpha_scores.append(np.mean(fold_scores))

    if cv and np.all(alpha_scores != alpha_scores[0]):
        # check for rare case in which diff alphas lead to identical scores
        final_alpha = alphas[np.argmax(alpha_scores)]
    else:
        final_alpha = alphas[i - 1]

    return final_alpha
def score_linear(X, y, rules: List[str], penalty='l1', prediction_task='regression', max_rules=30, alpha=None, cv=True, random_state=None) ‑> Tuple[List[Rule], List[float], float]
Expand source code
def score_linear(X, y, rules: List[str],
                 penalty='l1',
                 prediction_task='regression',
                 max_rules=30,
                 alpha=None,
                 cv=True,
                 random_state=None) -> Tuple[List[Rule], List[float], float]:

    if alpha is not None:
        final_alpha = alpha
        if max_rules is not None:
            warn("Ignoring max_rules parameter since alpha passed explicitly")

    elif max_rules is not None:
        final_alpha = get_best_alpha_under_max_rules(X, y, rules,
                                                     penalty=penalty,
                                                     prediction_task=prediction_task,
                                                     max_rules=max_rules,
                                                     cv=cv,
                                                     random_state=random_state)
    else:
        raise ValueError("Invalid alpha and max_rules passed")

    if prediction_task == 'regression':
        lin_model = Lasso(alpha=final_alpha,
                          random_state=random_state, max_iter=2000)
    else:
        lin_model = LogisticRegression(
            penalty=penalty, C=(1 / final_alpha), solver='liblinear',
            random_state=random_state, max_iter=200)

    lin_model.fit(X, y)

    coef_ = lin_model.coef_.flatten()
    coefs = list(coef_[:coef_.shape[0] - len(rules)])
    support = np.sum(X[:, -len(rules):], axis=0) / X.shape[0]

    nonzero_rules = []
    coef_zero_threshold = 1e-6 / np.mean(np.abs(y))
    for r, w, s in zip(rules, coef_[-len(rules):], support):
        if abs(w) > coef_zero_threshold:
            nonzero_rules.append(Rule(r, args=[w], support=s))
            coefs.append(w)

    return nonzero_rules, coefs, lin_model.intercept_
def score_precision_recall(X, y, rules: List[List[str]], samples: List[List[int]], features: List[List[int]], feature_names: List[str], oob: bool = True) ‑> List[Rule]
Expand source code
def score_precision_recall(X,
                           y,
                           rules: List[List[str]],
                           samples: List[List[int]],
                           features: List[List[int]],
                           feature_names: List[str],
                           oob: bool = True) -> List[Rule]:

    scored_rules = []

    for curr_rules, curr_samples, curr_features in zip(rules, samples, features):

        # Create mask for OOB samples
        mask = ~indices_to_mask(curr_samples, X.shape[0])
        if sum(mask) == 0:
            if oob:
                warn(
                    "OOB evaluation not possible: doing it in-bag. Performance evaluation is "
                    "likely to be wrong (overfitting) and selected rules are likely to not "
                    "perform well! Please use max_samples < 1."
                )
            mask = curr_samples

        # XXX todo: idem without dataframe

        curr_features_to_use = np.unique(curr_features)
        X_oob = pd.DataFrame(
            (X[mask, :])[:, curr_features_to_use],
            columns=np.array(feature_names)[curr_features_to_use]
        )

        if X_oob.shape[1] <= 1:  # otherwise pandas bug (cf. issue #16363)
            return []

        y_oob = y[mask]
        y_oob = np.array((y_oob != 0))

        # Add OOB performances to rules:
        scored_rules += [
            Rule(r, args=_eval_rule_perf(r, X_oob, y_oob))
            for r in set(curr_rules)
        ]

    return scored_rules