Module acd.agglomeration.agg_1d

Expand source code
import numpy as np
import torch
from skimage import measure

from ..scores import score_funcs
from ..util import tiling_1d as tiling


def agglomerate(model, batch, percentile_include, method, sweep_dim,
                label, num_iters=5, subtract=True, absolute=True, device='cuda'):
    '''Agglomerative sweep - black out selected pixels from before and resweep over the entire image

    Returns
    -------
    r: dict
        r['comps_list']       - arrs of components with diff number for each comp
        r['comp_scores_list'] - dicts with score for each comp
        r['mask_list']        - boolean arrs of selected
        r['scores_list']      - arrs of scores (nan for selected)
        r['score_orig']       - original score
    '''
    # get original text and score
    text_orig = batch.text.data.cpu().numpy()
    score_orig = score_funcs.get_scores_1d(batch, model, method, label, only_one=True,
                                           score_orig=None, text_orig=text_orig, subtract=subtract, device=device)[0]

    # get scores
    texts = tiling.gen_tiles(text_orig, method=method, sweep_dim=sweep_dim)
    texts = texts.transpose()
    batch.text.data = torch.LongTensor(texts).to(device)
    scores = score_funcs.get_scores_1d(batch, model, method, label, only_one=False,
                                       score_orig=score_orig, text_orig=text_orig, subtract=subtract, device=device)

    # threshold scores
    mask = threshold_scores(scores, percentile_include, absolute=absolute)

    # initialize lists
    scores_list = [np.copy(scores)]
    mask_list = [mask]
    comps_list = []
    comp_scores_list = [{0: score_orig}]

    # iterate
    for step in range(num_iters):
        # find connected components for regions
        comps = np.copy(measure.label(mask_list[-1], background=0, connectivity=1))

        # loop over components
        comp_scores_dict = {}
        for comp_num in range(1, np.max(comps) + 1):

            # make component tile
            comp_tile_bool = (comps == comp_num)
            comp_tile = tiling.gen_tile_from_comp(text_orig, comp_tile_bool, method)

            # make neighboring tiles around component
            border_tiles = tiling.gen_tiles_around_baseline(text_orig, comp_tile_bool,
                                                            method=method,
                                                            sweep_dim=sweep_dim)

            # predict for all tiles
            # format tiles into batch
            tiles_concat = np.hstack((comp_tile, np.squeeze(border_tiles[0]).transpose()))
            batch.text.data = torch.LongTensor(tiles_concat).to(device)

            # get scores for this component tile (index 0) and border tiles (indexes 1 onwards)
            scores_all = score_funcs.get_scores_1d(batch, model, method, label, only_one=False,
                                                   score_orig=score_orig, text_orig=text_orig, subtract=subtract,
                                                   device=device)
            score_comp = np.copy(scores_all[0])
            scores_border_tiles = np.copy(scores_all[1:])

            # store the predicted class scores
            comp_scores_dict[comp_num] = np.copy(score_comp)

            # update scores for different indexes
            tiles_idxs = border_tiles[1]
            for i, idx in enumerate(tiles_idxs):
                scores[idx] = scores_border_tiles[i] - score_comp

        # get class preds and thresholded image
        scores[mask_list[-1]] = np.nan
        mask = threshold_scores(scores, percentile_include, absolute=absolute)

        # add to lists
        scores_list.append(np.copy(scores))
        mask_list.append(mask_list[-1] + mask)
        comps_list.append(comps)
        comp_scores_list.append(comp_scores_dict)

        if np.sum(mask) == 0:
            break

    # pad first image
    comps_list = [np.zeros(text_orig.size, dtype=np.int)] + comps_list

    return {
        'comps_list': comps_list,  # arrs of comps with diff number for each comp
        'scores_list': scores_list,  # arrs of scores (nan for selected)
        'mask_list': mask_list,  # boolean arrs of selected
        'comp_scores_list': comp_scores_list,  # dicts with score for each comp
        'score_orig': score_orig  # original score
    }


def threshold_scores(scores, percentile_include, absolute):
    '''threshold scores at a specific percentile

    Returns
    -------
    mask: np.ndarray
        Boolean mask which is true when scores should be kept
    '''
    # whether to threshold based on abs value
    if absolute:
        scores = np.absolute(scores)

    # judgement call: last 5 always pick 2
    num_left = scores.size - np.sum(np.isnan(scores))
    if num_left <= 5:
        if num_left == 5:
            percentile_include = 59
        elif num_left == 4:
            percentile_include = 49
        elif num_left == 3:
            percentile_include = 59
        elif num_left == 2:
            percentile_include = 49
        elif num_left == 1:
            percentile_include = 0
    thresh = np.nanpercentile(scores, percentile_include)
    mask = scores >= thresh
    return mask


def collapse_tree(lists):
    '''Removes redundant joins from final hierarchy
    Params
    ------
    lists: dict
        Dictionary of lists output by agglomerate

    Returns
    -------
    lists: dicts
        Dictionary of lists with redundant joins removed
        i.e. merge whenever possible, if it doesn't skip a merge step
    '''
    num_iters = len(lists['comps_list'])
    num_words = len(lists['comps_list'][0])

    # need to update comp_scores_list, comps_list
    comps_list = [np.zeros(num_words, dtype=np.int) for i in range(num_iters)]
    comp_scores_list = [{0: 0} for _ in range(num_iters)]
    comp_levels_list = [{0: 0} for _ in range(num_iters)]  # use this to determine what level to put things at

    # initialize first level
    comps_list[0] = np.arange(num_words)
    comp_levels_list[0] = {i: 0 for i in range(num_words)}

    # iterate over levels
    for i in range(1, num_iters):
        comps = lists['comps_list'][i]
        comps_old = lists['comps_list'][i - 1]
        comp_scores = lists['comp_scores_list'][i]

        # iterate over number of components
        for comp_num in range(1, np.max(comps) + 1):
            comp = comps == comp_num
            comp_size = np.sum(comp)
            if comp_size == 1:
                comp_levels_list[i][comp_num] = 0  # set level to 0
            else:
                # check for matches
                matches = np.unique(comps_old[comp])
                num_matches = matches.size

                # if 0 matches, level is 1
                if num_matches == 0:
                    level = 1
                    comp_levels_list[i][comp_num] = level  # set level to level 1

                # if 1 match, maintain level
                elif num_matches == 1:
                    level = comp_levels_list[i - 1][matches[0]]


                # if >1 match, take highest level + 1
                else:
                    level = np.max([comp_levels_list[i - 1][match] for match in matches]) + 1

                comp_levels_list[i][comp_num] = level
                new_comp_num = int(np.max(comps_list[level]) + 1)
                comps_list[level][comp] = new_comp_num  # update comp
                comp_scores_list[level][new_comp_num] = comp_scores[comp_num]  # update comp score

    # remove unnecessary iters
    num_iters = 0
    while np.sum(comps_list[num_iters] > 0) and num_iters < len(comps_list):
        num_iters += 1

    # populate lists
    lists['comps_list'] = comps_list[:num_iters]
    lists['comp_scores_list'] = comp_scores_list[:num_iters]
    return lists

Functions

def agglomerate(model, batch, percentile_include, method, sweep_dim, label, num_iters=5, subtract=True, absolute=True, device='cuda')

Agglomerative sweep - black out selected pixels from before and resweep over the entire image

Returns

r : dict
r['comps_list'] - arrs of components with diff number for each comp r['comp_scores_list'] - dicts with score for each comp r['mask_list'] - boolean arrs of selected r['scores_list'] - arrs of scores (nan for selected) r['score_orig'] - original score
Expand source code
def agglomerate(model, batch, percentile_include, method, sweep_dim,
                label, num_iters=5, subtract=True, absolute=True, device='cuda'):
    '''Agglomerative sweep - black out selected pixels from before and resweep over the entire image

    Returns
    -------
    r: dict
        r['comps_list']       - arrs of components with diff number for each comp
        r['comp_scores_list'] - dicts with score for each comp
        r['mask_list']        - boolean arrs of selected
        r['scores_list']      - arrs of scores (nan for selected)
        r['score_orig']       - original score
    '''
    # get original text and score
    text_orig = batch.text.data.cpu().numpy()
    score_orig = score_funcs.get_scores_1d(batch, model, method, label, only_one=True,
                                           score_orig=None, text_orig=text_orig, subtract=subtract, device=device)[0]

    # get scores
    texts = tiling.gen_tiles(text_orig, method=method, sweep_dim=sweep_dim)
    texts = texts.transpose()
    batch.text.data = torch.LongTensor(texts).to(device)
    scores = score_funcs.get_scores_1d(batch, model, method, label, only_one=False,
                                       score_orig=score_orig, text_orig=text_orig, subtract=subtract, device=device)

    # threshold scores
    mask = threshold_scores(scores, percentile_include, absolute=absolute)

    # initialize lists
    scores_list = [np.copy(scores)]
    mask_list = [mask]
    comps_list = []
    comp_scores_list = [{0: score_orig}]

    # iterate
    for step in range(num_iters):
        # find connected components for regions
        comps = np.copy(measure.label(mask_list[-1], background=0, connectivity=1))

        # loop over components
        comp_scores_dict = {}
        for comp_num in range(1, np.max(comps) + 1):

            # make component tile
            comp_tile_bool = (comps == comp_num)
            comp_tile = tiling.gen_tile_from_comp(text_orig, comp_tile_bool, method)

            # make neighboring tiles around component
            border_tiles = tiling.gen_tiles_around_baseline(text_orig, comp_tile_bool,
                                                            method=method,
                                                            sweep_dim=sweep_dim)

            # predict for all tiles
            # format tiles into batch
            tiles_concat = np.hstack((comp_tile, np.squeeze(border_tiles[0]).transpose()))
            batch.text.data = torch.LongTensor(tiles_concat).to(device)

            # get scores for this component tile (index 0) and border tiles (indexes 1 onwards)
            scores_all = score_funcs.get_scores_1d(batch, model, method, label, only_one=False,
                                                   score_orig=score_orig, text_orig=text_orig, subtract=subtract,
                                                   device=device)
            score_comp = np.copy(scores_all[0])
            scores_border_tiles = np.copy(scores_all[1:])

            # store the predicted class scores
            comp_scores_dict[comp_num] = np.copy(score_comp)

            # update scores for different indexes
            tiles_idxs = border_tiles[1]
            for i, idx in enumerate(tiles_idxs):
                scores[idx] = scores_border_tiles[i] - score_comp

        # get class preds and thresholded image
        scores[mask_list[-1]] = np.nan
        mask = threshold_scores(scores, percentile_include, absolute=absolute)

        # add to lists
        scores_list.append(np.copy(scores))
        mask_list.append(mask_list[-1] + mask)
        comps_list.append(comps)
        comp_scores_list.append(comp_scores_dict)

        if np.sum(mask) == 0:
            break

    # pad first image
    comps_list = [np.zeros(text_orig.size, dtype=np.int)] + comps_list

    return {
        'comps_list': comps_list,  # arrs of comps with diff number for each comp
        'scores_list': scores_list,  # arrs of scores (nan for selected)
        'mask_list': mask_list,  # boolean arrs of selected
        'comp_scores_list': comp_scores_list,  # dicts with score for each comp
        'score_orig': score_orig  # original score
    }
def collapse_tree(lists)

Removes redundant joins from final hierarchy Params


lists : dict
Dictionary of lists output by agglomerate

Returns

lists : dicts
Dictionary of lists with redundant joins removed i.e. merge whenever possible, if it doesn't skip a merge step
Expand source code
def collapse_tree(lists):
    '''Removes redundant joins from final hierarchy
    Params
    ------
    lists: dict
        Dictionary of lists output by agglomerate

    Returns
    -------
    lists: dicts
        Dictionary of lists with redundant joins removed
        i.e. merge whenever possible, if it doesn't skip a merge step
    '''
    num_iters = len(lists['comps_list'])
    num_words = len(lists['comps_list'][0])

    # need to update comp_scores_list, comps_list
    comps_list = [np.zeros(num_words, dtype=np.int) for i in range(num_iters)]
    comp_scores_list = [{0: 0} for _ in range(num_iters)]
    comp_levels_list = [{0: 0} for _ in range(num_iters)]  # use this to determine what level to put things at

    # initialize first level
    comps_list[0] = np.arange(num_words)
    comp_levels_list[0] = {i: 0 for i in range(num_words)}

    # iterate over levels
    for i in range(1, num_iters):
        comps = lists['comps_list'][i]
        comps_old = lists['comps_list'][i - 1]
        comp_scores = lists['comp_scores_list'][i]

        # iterate over number of components
        for comp_num in range(1, np.max(comps) + 1):
            comp = comps == comp_num
            comp_size = np.sum(comp)
            if comp_size == 1:
                comp_levels_list[i][comp_num] = 0  # set level to 0
            else:
                # check for matches
                matches = np.unique(comps_old[comp])
                num_matches = matches.size

                # if 0 matches, level is 1
                if num_matches == 0:
                    level = 1
                    comp_levels_list[i][comp_num] = level  # set level to level 1

                # if 1 match, maintain level
                elif num_matches == 1:
                    level = comp_levels_list[i - 1][matches[0]]


                # if >1 match, take highest level + 1
                else:
                    level = np.max([comp_levels_list[i - 1][match] for match in matches]) + 1

                comp_levels_list[i][comp_num] = level
                new_comp_num = int(np.max(comps_list[level]) + 1)
                comps_list[level][comp] = new_comp_num  # update comp
                comp_scores_list[level][new_comp_num] = comp_scores[comp_num]  # update comp score

    # remove unnecessary iters
    num_iters = 0
    while np.sum(comps_list[num_iters] > 0) and num_iters < len(comps_list):
        num_iters += 1

    # populate lists
    lists['comps_list'] = comps_list[:num_iters]
    lists['comp_scores_list'] = comp_scores_list[:num_iters]
    return lists
def threshold_scores(scores, percentile_include, absolute)

threshold scores at a specific percentile

Returns

mask : np.ndarray
Boolean mask which is true when scores should be kept
Expand source code
def threshold_scores(scores, percentile_include, absolute):
    '''threshold scores at a specific percentile

    Returns
    -------
    mask: np.ndarray
        Boolean mask which is true when scores should be kept
    '''
    # whether to threshold based on abs value
    if absolute:
        scores = np.absolute(scores)

    # judgement call: last 5 always pick 2
    num_left = scores.size - np.sum(np.isnan(scores))
    if num_left <= 5:
        if num_left == 5:
            percentile_include = 59
        elif num_left == 4:
            percentile_include = 49
        elif num_left == 3:
            percentile_include = 59
        elif num_left == 2:
            percentile_include = 49
        elif num_left == 1:
            percentile_include = 0
    thresh = np.nanpercentile(scores, percentile_include)
    mask = scores >= thresh
    return mask