Module imodelsx.augtree.embed
Functions
def pairwise_distances(X: numpy.ndarray) ‑> numpy.ndarray
-
Expand source code
def pairwise_distances(X: np.ndarray) -> np.ndarray: n = X.shape[0] dists = np.zeros((n, n)) for i in tqdm(range(n)): # for i in range(n): vec_i = X[i] dists[i] = np.linalg.norm(X - vec_i, axis=1) return dists
def test_dists()
-
Expand source code
def test_dists(): # sample embeddings matrix X = np.array([ [1, 2, 3], [1, 2, 3], [2, 4, 6], [1, 2, 2], ]) dists_eucl = pairwise_distances(X).round(2) # dists_cos = pairwise_distances(X, metric='cosine').round(2) assert np.min(dists_eucl) >= 0 # assert np.min(dists_cos) >= 0 dists_eucl_ref = sklearn.metrics.pairwise_distances( X, metric='euclidean').round(2) print('dists_eucl', dists_eucl) print('dists_eucl_ref', dists_eucl_ref) assert np.allclose(dists_eucl, dists_eucl_ref) # if __name__ == '__main__': # test_dists() # allows calling wth args, e.g. python embed.py --dataset_name sst2 # fire.Fire(EmbsManager) # embs = EmbsManager( # dataset_name='financial_phrasebank', # checkpoint='ahmedrachid/FinancialBERT-Sentiment-Analysis' # ).expand_keyword('great')
Classes
class EmbsManager (save_dir_embs='/home/chansingh/llm-tree/results/embs_cache',
dataset_name: str = 'financial_phrasebank',
ngrams: int = 2,
n_keep: int = 200,
n_jobs: int = 60)-
Expand source code
class EmbsManager: def __init__(self, save_dir_embs='/home/chansingh/llm-tree/results/embs_cache', dataset_name: str = 'financial_phrasebank', # checkpoint: str='ahmedrachid/FinancialBERT-Sentiment-Analysis', ngrams: int = 2, # metric: str='euclidean', n_keep: int = 200, n_jobs: int = 60, ): ''' Params ------ n_jobs Number of cpus for computing pairwise distances ''' print(locals()) self.save_dir_embs = save_dir_embs self.dataset_name = dataset_name self.ngrams = ngrams self.checkpoint = CHECKPOINTS_DICT[dataset_name] self.n_keep = n_keep self.n_jobs = n_jobs # cache embeddings dir_name_top = join( save_dir_embs, f'{clean_str(dataset_name)}___ngrams={ngrams}') dir_name_checkpoint = join(dir_name_top, clean_str(self.checkpoint)) fname_vocab = join(dir_name_top, 'vocab.pkl') fname_mappings = join(dir_name_checkpoint, f'mappings_{"euclidean"}.npy') os.makedirs(dir_name_checkpoint, exist_ok=True) if not os.path.exists(fname_mappings): print(fname_mappings, 'not found') self._compute_mappings(fname_vocab, dir_name_checkpoint) # load mappings + vocab print('loading from cache...') with open(fname_mappings, 'rb') as f: self.mappings = np.load(f) with open(fname_vocab, 'rb') as f: self.ngrams_arr = np.array(pkl.load(f)) return def _compute_mappings(self, fname_vocab: str, dir_name_checkpoint: str): import torch.cuda # get raw data strings dset, dataset_key_text = imodelsx.data.load_huggingface_dataset( self.dataset_name, binary_classification=True) X = dset['train'][dataset_key_text] + \ dset['validation'][dataset_key_text] tokenizer = imodelsx.augtree.utils.get_spacy_tokenizer() # get ngrams list ngrams_list = [ imodelsx.util.generate_ngrams_list( x, ngrams=self.ngrams, tokenizer_ngrams=tokenizer, all_ngrams=True ) for x in X ] ngrams_list = sum(ngrams_list, []) ngrams_list = sorted(list(set(ngrams_list))) # ngrams_list = ngrams_list[:5] print(f'ngrams_list {len(ngrams_list)} {ngrams_list[:5]}') # compute embeddings print('computing embeddings...') embs = imodelsx.util.get_embs_llm(ngrams_list, self.checkpoint) print('embs.shape', embs.shape) torch.cuda.empty_cache() # compute embedding similarities with open(fname_vocab, 'wb') as f: pkl.dump(ngrams_list, f) print(f'computing embedding similarities...') # (N, D) -> (N, N) # pairwise_dists = sklearn.metrics.pairwise_distances( # embs, metric=metric, n_jobs=self.n_jobs) pairwise_dists = pairwise_distances(embs) pairwise_dists[np.eye(pairwise_dists.shape[0]).astype(int)] = 1e10 # argsort each row # (N, N) -> (N, N) print(f'computing argsort...') args = np.zeros((len(ngrams_list), self.n_keep), dtype=int) for i in tqdm(range(len(ngrams_list))): args[i] = np.argsort(pairwise_dists[i])[:self.n_keep] # args = np.argsort(pairwise_dists, axis=1)[:, :self.n_keep] # print(pairwise_dists.sort(axis=1)[0].round(20)) cache_file = join(dir_name_checkpoint, f'mappings_{"euclidean"}.npy') with open(cache_file, 'wb') as f: np.save(f, args) def expand_keyword(self, keyword: str, n_expands=50) -> List[str]: '''Expand ngram using similar keywords ''' # self.ngrams_list is an array of ngrams # self.mappings is a numpy array of indexes into ngrams_list # find index where keyword occurs in ngrams_arr find_keyword = self.ngrams_arr == keyword # print(self.ngrams_arr[500:900]) if find_keyword.sum() == 0: print(repr(keyword), 'not found') return [] else: idx_keyword = np.argmax(find_keyword) try: idx_keyword = int(idx_keyword) except: idx_keyword = int(idx_keyword[0]) idxs_expanded = self.mappings[idx_keyword, :n_expands] keywords_expanded = self.ngrams_arr[idxs_expanded] # print(f'{keyword=} {keywords_expanded=}') return keywords_expanded
Params
n_jobs Number of cpus for computing pairwise distances
Methods
def expand_keyword(self, keyword: str, n_expands=50) ‑> List[str]
-
Expand source code
def expand_keyword(self, keyword: str, n_expands=50) -> List[str]: '''Expand ngram using similar keywords ''' # self.ngrams_list is an array of ngrams # self.mappings is a numpy array of indexes into ngrams_list # find index where keyword occurs in ngrams_arr find_keyword = self.ngrams_arr == keyword # print(self.ngrams_arr[500:900]) if find_keyword.sum() == 0: print(repr(keyword), 'not found') return [] else: idx_keyword = np.argmax(find_keyword) try: idx_keyword = int(idx_keyword) except: idx_keyword = int(idx_keyword[0]) idxs_expanded = self.mappings[idx_keyword, :n_expands] keywords_expanded = self.ngrams_arr[idxs_expanded] # print(f'{keyword=} {keywords_expanded=}') return keywords_expanded
Expand ngram using similar keywords