Expand source code
from typing import List
from collections import Counter

from imodels.util.rule import Rule


def prune_mins(rules: List[Rule], precision_min: float, recall_min: float) -> List[Rule]:
    # Factorize rules before semantic tree filtering
    rules_ = [tuple(rule) for rule in rules]
    rules_dict = {}

    # keep only rules verifying precision_min and recall_min:
    for rule, score in rules_:
        if score[0] >= precision_min and score[1] >= recall_min:
            if rule in rules_dict:
                # update the score to the new mean
                c = rules_dict[rule][2] + 1
                b = rules_dict[rule][1] + 1. / c * (
                        score[1] - rules_dict[rule][1])
                a = rules_dict[rule][0] + 1. / c * (
                        score[0] - rules_dict[rule][0])

                rules_dict[rule] = (a, b, c)
            else:
                rules_dict[rule] = (score[0], score[1], 1)

    rule_tuple_list = sorted(rules_dict.items(), key=lambda x: (x[1][0], x[1][1]), reverse=True)
    return [Rule(rule, args=scores) for rule, scores in rule_tuple_list]


def deduplicate(rules: List[Rule], max_depth_dup: int) -> List[Rule]:
    if max_depth_dup is not None:
        rules = [max(rules_set, key=f1_score) for rules_set in find_similar_rulesets(rules, max_depth_dup)]
    return sorted(rules, key=lambda x: - f1_score(x))


def f1_score(rule: Rule) -> float:
    return 2 * rule.args[0] * rule.args[1] / \
           (rule.args[0] + rule.args[1]) if (rule.args[0] + rule.args[1]) > 0 else 0


def find_similar_rulesets(rules: List[Rule], max_depth_duplication: int = None) -> List[List[Rule]]:
    """Create clusters of rules using a decision tree based
    on the terms of the rules

    Parameters
    ----------
    rules : List, List of rules
            The rules that should be splitted in subsets of similar rules

    Returns
    -------
    rules : List of list of rules
            The different set of rules. Each set should be homogeneous

    """

    def split_with_best_feature(rules, depth, exceptions=[]):
        """
        Method to find a split of rules given most represented feature
        """
        if depth == 0:
            return rules

        rulelist = [rule.split(' and ') for rule, score in rules]
        terms = [t.split(' ')[0] for term in rulelist for t in term]
        counter = Counter(terms)
        # Drop exception list
        for exception in exceptions:
            del counter[exception]

        if len(counter) == 0:
            return rules

        most_represented_term = counter.most_common()[0][0]

        # Proceed to split
        rules_splitted = [[], [], []]
        for rule in rules:
            if (most_represented_term + ' <=') in rule.rule:
                rules_splitted[0].append(rule)
            elif (most_represented_term + ' >') in rule.rule:
                rules_splitted[1].append(rule)
            else:
                rules_splitted[2].append(rule)
        new_exceptions = exceptions + [most_represented_term]

        # Choose best term
        return [split_with_best_feature(ruleset,
                                        depth - 1,
                                        exceptions=new_exceptions)
                for ruleset in rules_splitted]

    def breadth_first_search(rules, leaves=None):
        if len(rules) == 0 or not isinstance(rules[0], list):
            if len(rules) > 0:
                return leaves.append(rules)
        else:
            for rules_child in rules:
                breadth_first_search(rules_child, leaves=leaves)
        return leaves

    leaves = []
    res = split_with_best_feature(rules, max_depth_duplication)
    breadth_first_search(res, leaves=leaves)
    return leaves

Functions

def deduplicate(rules: List[Rule], max_depth_dup: int) ‑> List[Rule]
Expand source code
def deduplicate(rules: List[Rule], max_depth_dup: int) -> List[Rule]:
    if max_depth_dup is not None:
        rules = [max(rules_set, key=f1_score) for rules_set in find_similar_rulesets(rules, max_depth_dup)]
    return sorted(rules, key=lambda x: - f1_score(x))
def f1_score(rule: Rule) ‑> float
Expand source code
def f1_score(rule: Rule) -> float:
    return 2 * rule.args[0] * rule.args[1] / \
           (rule.args[0] + rule.args[1]) if (rule.args[0] + rule.args[1]) > 0 else 0
def find_similar_rulesets(rules: List[Rule], max_depth_duplication: int = None) ‑> List[List[Rule]]

Create clusters of rules using a decision tree based on the terms of the rules

Parameters

rules : List, List of rules
The rules that should be splitted in subsets of similar rules

Returns

rules : List of list of rules
The different set of rules. Each set should be homogeneous
Expand source code
def find_similar_rulesets(rules: List[Rule], max_depth_duplication: int = None) -> List[List[Rule]]:
    """Create clusters of rules using a decision tree based
    on the terms of the rules

    Parameters
    ----------
    rules : List, List of rules
            The rules that should be splitted in subsets of similar rules

    Returns
    -------
    rules : List of list of rules
            The different set of rules. Each set should be homogeneous

    """

    def split_with_best_feature(rules, depth, exceptions=[]):
        """
        Method to find a split of rules given most represented feature
        """
        if depth == 0:
            return rules

        rulelist = [rule.split(' and ') for rule, score in rules]
        terms = [t.split(' ')[0] for term in rulelist for t in term]
        counter = Counter(terms)
        # Drop exception list
        for exception in exceptions:
            del counter[exception]

        if len(counter) == 0:
            return rules

        most_represented_term = counter.most_common()[0][0]

        # Proceed to split
        rules_splitted = [[], [], []]
        for rule in rules:
            if (most_represented_term + ' <=') in rule.rule:
                rules_splitted[0].append(rule)
            elif (most_represented_term + ' >') in rule.rule:
                rules_splitted[1].append(rule)
            else:
                rules_splitted[2].append(rule)
        new_exceptions = exceptions + [most_represented_term]

        # Choose best term
        return [split_with_best_feature(ruleset,
                                        depth - 1,
                                        exceptions=new_exceptions)
                for ruleset in rules_splitted]

    def breadth_first_search(rules, leaves=None):
        if len(rules) == 0 or not isinstance(rules[0], list):
            if len(rules) > 0:
                return leaves.append(rules)
        else:
            for rules_child in rules:
                breadth_first_search(rules_child, leaves=leaves)
        return leaves

    leaves = []
    res = split_with_best_feature(rules, max_depth_duplication)
    breadth_first_search(res, leaves=leaves)
    return leaves
def prune_mins(rules: List[Rule], precision_min: float, recall_min: float) ‑> List[Rule]
Expand source code
def prune_mins(rules: List[Rule], precision_min: float, recall_min: float) -> List[Rule]:
    # Factorize rules before semantic tree filtering
    rules_ = [tuple(rule) for rule in rules]
    rules_dict = {}

    # keep only rules verifying precision_min and recall_min:
    for rule, score in rules_:
        if score[0] >= precision_min and score[1] >= recall_min:
            if rule in rules_dict:
                # update the score to the new mean
                c = rules_dict[rule][2] + 1
                b = rules_dict[rule][1] + 1. / c * (
                        score[1] - rules_dict[rule][1])
                a = rules_dict[rule][0] + 1. / c * (
                        score[0] - rules_dict[rule][0])

                rules_dict[rule] = (a, b, c)
            else:
                rules_dict[rule] = (score[0], score[1], 1)

    rule_tuple_list = sorted(rules_dict.items(), key=lambda x: (x[1][0], x[1][1]), reverse=True)
    return [Rule(rule, args=scores) for rule, scores in rule_tuple_list]