Module imodelsx.augtree.embed

Expand source code
import re
from typing import List

from tqdm import tqdm
import imodelsx.data
import imodelsx.util
import imodelsx.augtree.utils
import os.path
import numpy as np
from imodelsx.augtree.utils import clean_str
from os.path import join
import pickle as pkl
import sklearn.metrics
from scipy.spatial import distance


CHECKPOINTS_DICT = {
    'financial_phrasebank': 'ahmedrachid/FinancialBERT-Sentiment-Analysis',
    'rotten_tomatoes': 'textattack/bert-base-uncased-rotten-tomatoes',
    'emotion': 'nateraw/bert-base-uncased-emotion',
    'sst2': 'textattack/bert-base-uncased-SST-2',
}

# @jit(nopython=True)


def pairwise_distances(X: np.ndarray) -> np.ndarray:
    n = X.shape[0]
    dists = np.zeros((n, n))
    for i in tqdm(range(n)):
        # for i in range(n):
        vec_i = X[i]
        dists[i] = np.linalg.norm(X - vec_i, axis=1)
    return dists


class EmbsManager:
    def __init__(self,
                 save_dir_embs='/home/chansingh/llm-tree/results/embs_cache',
                 dataset_name: str = 'financial_phrasebank',
                 # checkpoint: str='ahmedrachid/FinancialBERT-Sentiment-Analysis',
                 ngrams: int = 2,
                 # metric: str='euclidean',
                 n_keep: int = 200,
                 n_jobs: int = 60,
                 ):
        '''
        Params
        ------
        n_jobs
            Number of cpus for computing pairwise distances
        '''
        print(locals())
        self.save_dir_embs = save_dir_embs
        self.dataset_name = dataset_name
        self.ngrams = ngrams
        self.checkpoint = CHECKPOINTS_DICT[dataset_name]
        self.n_keep = n_keep
        self.n_jobs = n_jobs

        # cache embeddings
        dir_name_top = join(
            save_dir_embs, f'{clean_str(dataset_name)}___ngrams={ngrams}')
        dir_name_checkpoint = join(dir_name_top, clean_str(self.checkpoint))
        fname_vocab = join(dir_name_top, 'vocab.pkl')
        fname_mappings = join(dir_name_checkpoint,
                              f'mappings_{"euclidean"}.npy')
        os.makedirs(dir_name_checkpoint, exist_ok=True)
        if not os.path.exists(fname_mappings):
            print(fname_mappings, 'not found')
            self._compute_mappings(fname_vocab, dir_name_checkpoint)

        # load mappings + vocab
        print('loading from cache...')
        with open(fname_mappings, 'rb') as f:
            self.mappings = np.load(f)
        with open(fname_vocab, 'rb') as f:
            self.ngrams_arr = np.array(pkl.load(f))
        return

    def _compute_mappings(self, fname_vocab: str, dir_name_checkpoint: str):
        import torch.cuda
        
        # get raw data strings
        dset, dataset_key_text = imodelsx.data.load_huggingface_dataset(
            self.dataset_name, binary_classification=True)
        X = dset['train'][dataset_key_text] + \
            dset['validation'][dataset_key_text]
        tokenizer = imodelsx.augtree.utils.get_spacy_tokenizer()

        # get ngrams list
        ngrams_list = [
            imodelsx.util.generate_ngrams_list(
                x, ngrams=self.ngrams,
                tokenizer_ngrams=tokenizer,
                all_ngrams=True
            )
            for x in X
        ]
        ngrams_list = sum(ngrams_list, [])
        ngrams_list = sorted(list(set(ngrams_list)))
        # ngrams_list = ngrams_list[:5]
        print(f'ngrams_list {len(ngrams_list)} {ngrams_list[:5]}')

        # compute embeddings
        print('computing embeddings...')
        embs = imodelsx.util.get_embs_llm(ngrams_list, self.checkpoint)
        print('embs.shape', embs.shape)
        torch.cuda.empty_cache()

        # compute embedding similarities
        with open(fname_vocab, 'wb') as f:
            pkl.dump(ngrams_list, f)
        print(f'computing embedding similarities...')
        # (N, D) -> (N, N)
        # pairwise_dists = sklearn.metrics.pairwise_distances(
        # embs, metric=metric, n_jobs=self.n_jobs)
        pairwise_dists = pairwise_distances(embs)
        pairwise_dists[np.eye(pairwise_dists.shape[0]).astype(int)] = 1e10

        # argsort each row
        # (N, N) -> (N, N)
        print(f'computing argsort...')
        args = np.zeros((len(ngrams_list), self.n_keep), dtype=int)
        for i in tqdm(range(len(ngrams_list))):
            args[i] = np.argsort(pairwise_dists[i])[:self.n_keep]
        # args = np.argsort(pairwise_dists, axis=1)[:, :self.n_keep]
        # print(pairwise_dists.sort(axis=1)[0].round(20))

        cache_file = join(dir_name_checkpoint, f'mappings_{"euclidean"}.npy')
        with open(cache_file, 'wb') as f:
            np.save(f, args)

    def expand_keyword(self, keyword: str, n_expands=50) -> List[str]:
        '''Expand ngram using similar keywords
        '''
        # self.ngrams_list is an array of ngrams
        # self.mappings is a numpy array of indexes into ngrams_list

        # find index where keyword occurs in ngrams_arr
        find_keyword = self.ngrams_arr == keyword
        # print(self.ngrams_arr[500:900])
        if find_keyword.sum() == 0:
            print(repr(keyword), 'not found')
            return []
        else:
            idx_keyword = np.argmax(find_keyword)
            try:
                idx_keyword = int(idx_keyword)
            except:
                idx_keyword = int(idx_keyword[0])
            idxs_expanded = self.mappings[idx_keyword, :n_expands]
            keywords_expanded = self.ngrams_arr[idxs_expanded]
            # print(f'{keyword=} {keywords_expanded=}')
            return keywords_expanded


def test_dists():
    # sample embeddings matrix
    X = np.array([
        [1, 2, 3],
        [1, 2, 3],
        [2, 4, 6],
        [1, 2, 2],
    ])
    dists_eucl = pairwise_distances(X).round(2)
    # dists_cos = pairwise_distances(X, metric='cosine').round(2)
    assert np.min(dists_eucl) >= 0
    # assert np.min(dists_cos) >= 0
    dists_eucl_ref = sklearn.metrics.pairwise_distances(
        X, metric='euclidean').round(2)
    print('dists_eucl', dists_eucl)
    print('dists_eucl_ref', dists_eucl_ref)
    assert np.allclose(dists_eucl, dists_eucl_ref)


# if __name__ == '__main__':
    # test_dists()
    # allows calling wth args, e.g. python embed.py --dataset_name sst2
    # fire.Fire(EmbsManager)

    # embs = EmbsManager(
    #     dataset_name='financial_phrasebank',
    #     checkpoint='ahmedrachid/FinancialBERT-Sentiment-Analysis'
    # ).expand_keyword('great')

Functions

def pairwise_distances(X: numpy.ndarray) ‑> numpy.ndarray
Expand source code
def pairwise_distances(X: np.ndarray) -> np.ndarray:
    n = X.shape[0]
    dists = np.zeros((n, n))
    for i in tqdm(range(n)):
        # for i in range(n):
        vec_i = X[i]
        dists[i] = np.linalg.norm(X - vec_i, axis=1)
    return dists
def test_dists()
Expand source code
def test_dists():
    # sample embeddings matrix
    X = np.array([
        [1, 2, 3],
        [1, 2, 3],
        [2, 4, 6],
        [1, 2, 2],
    ])
    dists_eucl = pairwise_distances(X).round(2)
    # dists_cos = pairwise_distances(X, metric='cosine').round(2)
    assert np.min(dists_eucl) >= 0
    # assert np.min(dists_cos) >= 0
    dists_eucl_ref = sklearn.metrics.pairwise_distances(
        X, metric='euclidean').round(2)
    print('dists_eucl', dists_eucl)
    print('dists_eucl_ref', dists_eucl_ref)
    assert np.allclose(dists_eucl, dists_eucl_ref)


# if __name__ == '__main__':
    # test_dists()
    # allows calling wth args, e.g. python embed.py --dataset_name sst2
    # fire.Fire(EmbsManager)

    # embs = EmbsManager(
    #     dataset_name='financial_phrasebank',
    #     checkpoint='ahmedrachid/FinancialBERT-Sentiment-Analysis'
    # ).expand_keyword('great')

Classes

class EmbsManager (save_dir_embs='/home/chansingh/llm-tree/results/embs_cache', dataset_name: str = 'financial_phrasebank', ngrams: int = 2, n_keep: int = 200, n_jobs: int = 60)

Params

n_jobs Number of cpus for computing pairwise distances

Expand source code
class EmbsManager:
    def __init__(self,
                 save_dir_embs='/home/chansingh/llm-tree/results/embs_cache',
                 dataset_name: str = 'financial_phrasebank',
                 # checkpoint: str='ahmedrachid/FinancialBERT-Sentiment-Analysis',
                 ngrams: int = 2,
                 # metric: str='euclidean',
                 n_keep: int = 200,
                 n_jobs: int = 60,
                 ):
        '''
        Params
        ------
        n_jobs
            Number of cpus for computing pairwise distances
        '''
        print(locals())
        self.save_dir_embs = save_dir_embs
        self.dataset_name = dataset_name
        self.ngrams = ngrams
        self.checkpoint = CHECKPOINTS_DICT[dataset_name]
        self.n_keep = n_keep
        self.n_jobs = n_jobs

        # cache embeddings
        dir_name_top = join(
            save_dir_embs, f'{clean_str(dataset_name)}___ngrams={ngrams}')
        dir_name_checkpoint = join(dir_name_top, clean_str(self.checkpoint))
        fname_vocab = join(dir_name_top, 'vocab.pkl')
        fname_mappings = join(dir_name_checkpoint,
                              f'mappings_{"euclidean"}.npy')
        os.makedirs(dir_name_checkpoint, exist_ok=True)
        if not os.path.exists(fname_mappings):
            print(fname_mappings, 'not found')
            self._compute_mappings(fname_vocab, dir_name_checkpoint)

        # load mappings + vocab
        print('loading from cache...')
        with open(fname_mappings, 'rb') as f:
            self.mappings = np.load(f)
        with open(fname_vocab, 'rb') as f:
            self.ngrams_arr = np.array(pkl.load(f))
        return

    def _compute_mappings(self, fname_vocab: str, dir_name_checkpoint: str):
        import torch.cuda
        
        # get raw data strings
        dset, dataset_key_text = imodelsx.data.load_huggingface_dataset(
            self.dataset_name, binary_classification=True)
        X = dset['train'][dataset_key_text] + \
            dset['validation'][dataset_key_text]
        tokenizer = imodelsx.augtree.utils.get_spacy_tokenizer()

        # get ngrams list
        ngrams_list = [
            imodelsx.util.generate_ngrams_list(
                x, ngrams=self.ngrams,
                tokenizer_ngrams=tokenizer,
                all_ngrams=True
            )
            for x in X
        ]
        ngrams_list = sum(ngrams_list, [])
        ngrams_list = sorted(list(set(ngrams_list)))
        # ngrams_list = ngrams_list[:5]
        print(f'ngrams_list {len(ngrams_list)} {ngrams_list[:5]}')

        # compute embeddings
        print('computing embeddings...')
        embs = imodelsx.util.get_embs_llm(ngrams_list, self.checkpoint)
        print('embs.shape', embs.shape)
        torch.cuda.empty_cache()

        # compute embedding similarities
        with open(fname_vocab, 'wb') as f:
            pkl.dump(ngrams_list, f)
        print(f'computing embedding similarities...')
        # (N, D) -> (N, N)
        # pairwise_dists = sklearn.metrics.pairwise_distances(
        # embs, metric=metric, n_jobs=self.n_jobs)
        pairwise_dists = pairwise_distances(embs)
        pairwise_dists[np.eye(pairwise_dists.shape[0]).astype(int)] = 1e10

        # argsort each row
        # (N, N) -> (N, N)
        print(f'computing argsort...')
        args = np.zeros((len(ngrams_list), self.n_keep), dtype=int)
        for i in tqdm(range(len(ngrams_list))):
            args[i] = np.argsort(pairwise_dists[i])[:self.n_keep]
        # args = np.argsort(pairwise_dists, axis=1)[:, :self.n_keep]
        # print(pairwise_dists.sort(axis=1)[0].round(20))

        cache_file = join(dir_name_checkpoint, f'mappings_{"euclidean"}.npy')
        with open(cache_file, 'wb') as f:
            np.save(f, args)

    def expand_keyword(self, keyword: str, n_expands=50) -> List[str]:
        '''Expand ngram using similar keywords
        '''
        # self.ngrams_list is an array of ngrams
        # self.mappings is a numpy array of indexes into ngrams_list

        # find index where keyword occurs in ngrams_arr
        find_keyword = self.ngrams_arr == keyword
        # print(self.ngrams_arr[500:900])
        if find_keyword.sum() == 0:
            print(repr(keyword), 'not found')
            return []
        else:
            idx_keyword = np.argmax(find_keyword)
            try:
                idx_keyword = int(idx_keyword)
            except:
                idx_keyword = int(idx_keyword[0])
            idxs_expanded = self.mappings[idx_keyword, :n_expands]
            keywords_expanded = self.ngrams_arr[idxs_expanded]
            # print(f'{keyword=} {keywords_expanded=}')
            return keywords_expanded

Methods

def expand_keyword(self, keyword: str, n_expands=50) ‑> List[str]

Expand ngram using similar keywords

Expand source code
def expand_keyword(self, keyword: str, n_expands=50) -> List[str]:
    '''Expand ngram using similar keywords
    '''
    # self.ngrams_list is an array of ngrams
    # self.mappings is a numpy array of indexes into ngrams_list

    # find index where keyword occurs in ngrams_arr
    find_keyword = self.ngrams_arr == keyword
    # print(self.ngrams_arr[500:900])
    if find_keyword.sum() == 0:
        print(repr(keyword), 'not found')
        return []
    else:
        idx_keyword = np.argmax(find_keyword)
        try:
            idx_keyword = int(idx_keyword)
        except:
            idx_keyword = int(idx_keyword[0])
        idxs_expanded = self.mappings[idx_keyword, :n_expands]
        keywords_expanded = self.ngrams_arr[idxs_expanded]
        # print(f'{keyword=} {keywords_expanded=}')
        return keywords_expanded