Shared transforms between different interpretable models

Expand source code
'''Shared transforms between different interpretable models
'''

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd


class Winsorizer():
    """Performs Winsorization 1->1*

    Warning: this class should not be used directly.
    """

    def __init__(self, trim_quantile=0.0):
        self.trim_quantile = trim_quantile
        self.winsor_lims = None

    def train(self, X):
        # get winsor limits
        self.winsor_lims = np.ones([2, X.shape[1]]) * np.inf
        self.winsor_lims[0, :] = -np.inf
        if self.trim_quantile > 0:
            for i_col in np.arange(X.shape[1]):
                lower = np.percentile(X[:, i_col], self.trim_quantile * 100)
                upper = np.percentile(
                    X[:, i_col], 100 - self.trim_quantile * 100)
                self.winsor_lims[:, i_col] = [lower, upper]

    def trim(self, X):
        X_ = X.copy()
        X_ = np.where(X > self.winsor_lims[1, :], np.tile(self.winsor_lims[1, :], [X.shape[0], 1]),
                      np.where(X < self.winsor_lims[0, :], np.tile(self.winsor_lims[0, :], [X.shape[0], 1]), X))
        return X_


class FriedScale():
    """Performs scaling of linear variables according to Friedman et alpha_l. 2005 Sec 5

    Each variable is first Winsorized l->l*, then standardised as 0.4 x l* / std(l*)
    Warning: this class should not be used directly.
    """

    def __init__(self, winsorizer=None):
        self.scale_multipliers = None
        self.winsorizer = winsorizer

    def train(self, X):
        # get multipliers
        if self.winsorizer != None:
            X_trimmed = self.winsorizer.trim(X)
        else:
            X_trimmed = X

        scale_multipliers = np.ones(X.shape[1])
        for i_col in np.arange(X.shape[1]):
            num_uniq_vals = len(np.unique(X[:, i_col]))
            if num_uniq_vals > 2:  # don't scale binary variables which are effectively already rules
                scale_multipliers[i_col] = 0.4 / \
                    (1.0e-12 + np.std(X_trimmed[:, i_col]))
        self.scale_multipliers = scale_multipliers

    def scale(self, X):
        if self.winsorizer != None:
            return self.winsorizer.trim(X) * self.scale_multipliers
        else:
            return X * self.scale_multipliers


class CorrelationScreenTransformer(BaseEstimator, TransformerMixin):
    '''Finds correlated features above a magnitude threshold
    and zeros out all but the first of them
    '''

    def __init__(self, threshold=1.0):
        # Initialize with a correlation threshold
        self.threshold = threshold
        self.correlated_feature_sets = []

    def fit(self, X, y=None):
        # Check if X is a pandas DataFrame; if not, convert it to DataFrame
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)

        # Calculate the correlation matrix
        corr_matrix = X.corr().abs()

        # Identify the features that are correlated based on the threshold
        for i in range(len(corr_matrix.columns)):
            for j in range(i):
                if corr_matrix.iloc[i, j] >= self.threshold or corr_matrix.iloc[i, j] <= -self.threshold:
                    # Find the set this feature belongs to
                    found_set = False
                    for feature_set in self.correlated_feature_sets:
                        if i in feature_set or j in feature_set:
                            feature_set.update([i, j])
                            found_set = True
                            break
                    if not found_set:
                        self.correlated_feature_sets.append(set([i, j]))

        # Convert the sets to list of lists where each sublist has indexes to keep and to remove
        self.to_keep_remove = []
        for feature_set in self.correlated_feature_sets:
            feature_list = list(feature_set)
            # keep the first, remove the rest
            self.to_keep_remove.append((feature_list[0], feature_list[1:]))

        return self

    def transform(self, X):
        # Again, check if X is a pandas DataFrame; if not, convert it
        input_type = type(X)
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)

        # Set the identified correlated features (except the first) to 0
        X_transformed = X.copy()
        for keep, to_remove in self.to_keep_remove:
            X_transformed.iloc[:, to_remove] = 0

        if input_type == np.ndarray:
            X_transformed == X_transformed.values

        return X_transformed


if __name__ == '__main__':
    X = np.random.randn(5, 5)
    X[:, 0] = [1, 1, 0, 1, 1]
    X[:, 1] = X[:, 0]

    transformer = CorrelationScreenTransformer()
    print(X)
    X_transformed = transformer.fit_transform(X)
    print(X_transformed)

Classes

class CorrelationScreenTransformer (threshold=1.0)

Finds correlated features above a magnitude threshold and zeros out all but the first of them

Expand source code
class CorrelationScreenTransformer(BaseEstimator, TransformerMixin):
    '''Finds correlated features above a magnitude threshold
    and zeros out all but the first of them
    '''

    def __init__(self, threshold=1.0):
        # Initialize with a correlation threshold
        self.threshold = threshold
        self.correlated_feature_sets = []

    def fit(self, X, y=None):
        # Check if X is a pandas DataFrame; if not, convert it to DataFrame
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)

        # Calculate the correlation matrix
        corr_matrix = X.corr().abs()

        # Identify the features that are correlated based on the threshold
        for i in range(len(corr_matrix.columns)):
            for j in range(i):
                if corr_matrix.iloc[i, j] >= self.threshold or corr_matrix.iloc[i, j] <= -self.threshold:
                    # Find the set this feature belongs to
                    found_set = False
                    for feature_set in self.correlated_feature_sets:
                        if i in feature_set or j in feature_set:
                            feature_set.update([i, j])
                            found_set = True
                            break
                    if not found_set:
                        self.correlated_feature_sets.append(set([i, j]))

        # Convert the sets to list of lists where each sublist has indexes to keep and to remove
        self.to_keep_remove = []
        for feature_set in self.correlated_feature_sets:
            feature_list = list(feature_set)
            # keep the first, remove the rest
            self.to_keep_remove.append((feature_list[0], feature_list[1:]))

        return self

    def transform(self, X):
        # Again, check if X is a pandas DataFrame; if not, convert it
        input_type = type(X)
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)

        # Set the identified correlated features (except the first) to 0
        X_transformed = X.copy()
        for keep, to_remove in self.to_keep_remove:
            X_transformed.iloc[:, to_remove] = 0

        if input_type == np.ndarray:
            X_transformed == X_transformed.values

        return X_transformed

Ancestors

  • sklearn.base.BaseEstimator
  • sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
  • sklearn.utils._metadata_requests._MetadataRequester
  • sklearn.base.TransformerMixin
  • sklearn.utils._set_output._SetOutputMixin

Methods

def fit(self, X, y=None)
Expand source code
def fit(self, X, y=None):
    # Check if X is a pandas DataFrame; if not, convert it to DataFrame
    if not isinstance(X, pd.DataFrame):
        X = pd.DataFrame(X)

    # Calculate the correlation matrix
    corr_matrix = X.corr().abs()

    # Identify the features that are correlated based on the threshold
    for i in range(len(corr_matrix.columns)):
        for j in range(i):
            if corr_matrix.iloc[i, j] >= self.threshold or corr_matrix.iloc[i, j] <= -self.threshold:
                # Find the set this feature belongs to
                found_set = False
                for feature_set in self.correlated_feature_sets:
                    if i in feature_set or j in feature_set:
                        feature_set.update([i, j])
                        found_set = True
                        break
                if not found_set:
                    self.correlated_feature_sets.append(set([i, j]))

    # Convert the sets to list of lists where each sublist has indexes to keep and to remove
    self.to_keep_remove = []
    for feature_set in self.correlated_feature_sets:
        feature_list = list(feature_set)
        # keep the first, remove the rest
        self.to_keep_remove.append((feature_list[0], feature_list[1:]))

    return self
def transform(self, X)
Expand source code
def transform(self, X):
    # Again, check if X is a pandas DataFrame; if not, convert it
    input_type = type(X)
    if not isinstance(X, pd.DataFrame):
        X = pd.DataFrame(X)

    # Set the identified correlated features (except the first) to 0
    X_transformed = X.copy()
    for keep, to_remove in self.to_keep_remove:
        X_transformed.iloc[:, to_remove] = 0

    if input_type == np.ndarray:
        X_transformed == X_transformed.values

    return X_transformed
class FriedScale (winsorizer=None)

Performs scaling of linear variables according to Friedman et alpha_l. 2005 Sec 5

Each variable is first Winsorized l->l, then standardised as 0.4 x l / std(l*) Warning: this class should not be used directly.

Expand source code
class FriedScale():
    """Performs scaling of linear variables according to Friedman et alpha_l. 2005 Sec 5

    Each variable is first Winsorized l->l*, then standardised as 0.4 x l* / std(l*)
    Warning: this class should not be used directly.
    """

    def __init__(self, winsorizer=None):
        self.scale_multipliers = None
        self.winsorizer = winsorizer

    def train(self, X):
        # get multipliers
        if self.winsorizer != None:
            X_trimmed = self.winsorizer.trim(X)
        else:
            X_trimmed = X

        scale_multipliers = np.ones(X.shape[1])
        for i_col in np.arange(X.shape[1]):
            num_uniq_vals = len(np.unique(X[:, i_col]))
            if num_uniq_vals > 2:  # don't scale binary variables which are effectively already rules
                scale_multipliers[i_col] = 0.4 / \
                    (1.0e-12 + np.std(X_trimmed[:, i_col]))
        self.scale_multipliers = scale_multipliers

    def scale(self, X):
        if self.winsorizer != None:
            return self.winsorizer.trim(X) * self.scale_multipliers
        else:
            return X * self.scale_multipliers

Methods

def scale(self, X)
Expand source code
def scale(self, X):
    if self.winsorizer != None:
        return self.winsorizer.trim(X) * self.scale_multipliers
    else:
        return X * self.scale_multipliers
def train(self, X)
Expand source code
def train(self, X):
    # get multipliers
    if self.winsorizer != None:
        X_trimmed = self.winsorizer.trim(X)
    else:
        X_trimmed = X

    scale_multipliers = np.ones(X.shape[1])
    for i_col in np.arange(X.shape[1]):
        num_uniq_vals = len(np.unique(X[:, i_col]))
        if num_uniq_vals > 2:  # don't scale binary variables which are effectively already rules
            scale_multipliers[i_col] = 0.4 / \
                (1.0e-12 + np.std(X_trimmed[:, i_col]))
    self.scale_multipliers = scale_multipliers
class Winsorizer (trim_quantile=0.0)

Performs Winsorization 1->1*

Warning: this class should not be used directly.

Expand source code
class Winsorizer():
    """Performs Winsorization 1->1*

    Warning: this class should not be used directly.
    """

    def __init__(self, trim_quantile=0.0):
        self.trim_quantile = trim_quantile
        self.winsor_lims = None

    def train(self, X):
        # get winsor limits
        self.winsor_lims = np.ones([2, X.shape[1]]) * np.inf
        self.winsor_lims[0, :] = -np.inf
        if self.trim_quantile > 0:
            for i_col in np.arange(X.shape[1]):
                lower = np.percentile(X[:, i_col], self.trim_quantile * 100)
                upper = np.percentile(
                    X[:, i_col], 100 - self.trim_quantile * 100)
                self.winsor_lims[:, i_col] = [lower, upper]

    def trim(self, X):
        X_ = X.copy()
        X_ = np.where(X > self.winsor_lims[1, :], np.tile(self.winsor_lims[1, :], [X.shape[0], 1]),
                      np.where(X < self.winsor_lims[0, :], np.tile(self.winsor_lims[0, :], [X.shape[0], 1]), X))
        return X_

Methods

def train(self, X)
Expand source code
def train(self, X):
    # get winsor limits
    self.winsor_lims = np.ones([2, X.shape[1]]) * np.inf
    self.winsor_lims[0, :] = -np.inf
    if self.trim_quantile > 0:
        for i_col in np.arange(X.shape[1]):
            lower = np.percentile(X[:, i_col], self.trim_quantile * 100)
            upper = np.percentile(
                X[:, i_col], 100 - self.trim_quantile * 100)
            self.winsor_lims[:, i_col] = [lower, upper]
def trim(self, X)
Expand source code
def trim(self, X):
    X_ = X.copy()
    X_ = np.where(X > self.winsor_lims[1, :], np.tile(self.winsor_lims[1, :], [X.shape[0], 1]),
                  np.where(X < self.winsor_lims[0, :], np.tile(self.winsor_lims[0, :], [X.shape[0], 1]), X))
    return X_