Module imodelsx.auggam.embed
Expand source code
from transformers import BertModel, DistilBertModel
from transformers import AutoModelForCausalLM
from os.path import join as oj
from datasets import Dataset
from tqdm import tqdm
import torch
import numpy as np
from torch.utils.data import DataLoader
import imodelsx.util
def get_model(checkpoint):
if "distilbert" in checkpoint.lower():
model = DistilBertModel.from_pretrained(checkpoint)
elif "bert-base" in checkpoint.lower() or "BERT" in checkpoint:
model = BertModel.from_pretrained(checkpoint)
elif "gpt" in checkpoint.lower():
model = AutoModelForCausalLM.from_pretrained(
checkpoint, output_hidden_states=True
)
try:
model = model.cuda()
except:
pass
return model
def preprocess_gpt_token_batch(seqs, tokenizer_embeddings):
"""Preprocess token batch with token strings of different lengths
Add attention mask here
"""
# batch_size = len(seqs)
token_ids = [tokenizer_embeddings.encode(s, add_special_tokens=False) for s in seqs]
prompt_lengths = [len(s) for s in token_ids]
max_prompt_len = max(prompt_lengths)
# use 0 as padding id, shouldn't matter (snippet from here https://github.com/huggingface/transformers/issues/3021)
padded_tokens = [
tok_ids + [0] * (max_prompt_len - len(tok_ids)) for tok_ids in token_ids
]
input_ids = torch.LongTensor(padded_tokens)
attn_mask = torch.zeros(input_ids.shape).long()
for ix, tok_ids in enumerate(token_ids):
attn_mask[ix][: len(tok_ids)] = 1
# tokens = tokenizer(seqs, truncation=True, return_tensors="pt")
return {"input_ids": input_ids, "attention_mask": attn_mask}
def embed_and_sum_function(
example,
model,
ngrams: int,
tokenizer_embeddings,
tokenizer_ngrams,
checkpoint: str,
dataset_key_text: str = None,
layer: str = "last_hidden_state",
padding: str = "max_length",
batch_size: int = 8,
parsing: str = "",
nlp_chunks=None,
all_ngrams: bool = False,
fit_with_ngram_decomposition: bool = True,
instructor_prompt: str = None,
sum_embeddings=True,
):
"""Get summed embeddings for a single example
Params
------
ngrams: int
What order of ngrams to use (1 for unigrams, 2 for bigrams, ...)
dataset_key_text:
str that identifies where data examples are stored, e.g. "sentence" for sst2
tokenizer_embeddings
tokenizing for the embedding model
tokenizer_ngrams
tokenizing the ngrams (word-based tokenization is more interpretable)
parsing: str
whether to use parsing rather than extracting all ngrams
nlp_chunks
if parsing is not empty string, a parser that extracts specific ngrams
fit_with_ngram_decomposition
whether to fit the model with ngram decomposition (if not just use the standard sentence)
instructor_prompt: str
if using instructor, the prompt to use
"""
if dataset_key_text is not None:
sentence = example[dataset_key_text]
else:
sentence = example
# seqs = sentence
if fit_with_ngram_decomposition:
seqs = imodelsx.util.generate_ngrams_list(
sentence,
ngrams=ngrams,
tokenizer_ngrams=tokenizer_ngrams,
parsing=parsing,
nlp_chunks=nlp_chunks,
all_ngrams=all_ngrams,
)
elif isinstance(sentence, list):
seqs = sentence
elif isinstance(sentence, str):
seqs = [sentence]
else:
raise ValueError("sentence must be a string or list of strings")
# assert isinstance(
# sentence, str
# ), "sentence must be a string (batched mode not supported)"
# seqs = list(map(imodelsx.util.generate_ngrams_list, sentence))
seq_len = len(seqs)
if seq_len == 0:
# will multiply embedding by 0 so doesn't matter, but still want to get the shape
seqs = ["dummy"]
if not checkpoint.startswith("hkunlp/instructor") and (
not hasattr(tokenizer_embeddings, "pad_token")
or tokenizer_embeddings.pad_token is None
):
tokenizer_embeddings.pad_token = tokenizer_embeddings.eos_token
embs = []
if (
"bert" in checkpoint.lower()
): # has up to two keys, 'last_hidden_state', 'pooler_output'
tokens = tokenizer_embeddings(
seqs, padding=padding, truncation=True, return_tensors="pt"
)
ds = Dataset.from_dict(tokens).with_format("torch")
for batch in DataLoader(ds, batch_size=batch_size, shuffle=False):
batch = {k: v.to(model.device) for k, v in batch.items()}
with torch.no_grad():
output = model(**batch)
torch.cuda.empty_cache()
if layer == "pooler_output":
emb = output["pooler_output"].cpu().detach().numpy()
elif layer == "last_hidden_state_mean" or layer == "last_hidden_state":
emb = output["last_hidden_state"].cpu().detach().numpy()
emb = emb.mean(axis=1)
embs.append(emb)
embs = np.concatenate(embs)
elif "gpt" in checkpoint.lower() or "llama" in checkpoint.lower():
tokens = preprocess_gpt_token_batch(seqs, tokenizer_embeddings)
ds = Dataset.from_dict(tokens).with_format("torch")
for batch in DataLoader(ds, batch_size=batch_size, shuffle=False):
batch = {k: v.to(model.device) for k, v in batch.items()}
with torch.no_grad():
output = model(**batch)
torch.cuda.empty_cache()
if "hidden_states" in output.keys():
# extract (layer x (batch_size, seq_len, hidden_size))
h = output["hidden_states"]
# convert to (batch_size, seq_len, hidden_size)
emb = h[0].cpu().detach().numpy()
# convert to (batch_size, hidden_size)
emb = emb.mean(axis=1)
elif "last_hidden_state" in output.keys():
# extract (batch_size, seq_len, hidden_size)
h = output["last_hidden_state"]
# convert to np
emb = h.cpu().detach().numpy()
# convert to (batch_size, hidden_size)
emb = emb.mean(axis=1)
else:
raise Exception(f"keys: {output.keys()}")
embs.append(emb)
embs = np.concatenate(embs)
elif checkpoint.startswith("hkunlp/instructor"):
if instructor_prompt is None:
instructor_prompt = (
"Represent the short phrase for sentiment classification: "
)
embs = model.encode(
[[instructor_prompt, x_i] for x_i in seqs], batch_size=batch_size
)
else:
raise Exception(f"Unknown model checkpoint {checkpoint}")
# sum over the embeddings
if sum_embeddings:
embs = embs.sum(axis=0).reshape(1, -1)
if seq_len == 0:
embs *= 0
return {"embs": embs, "seq_len": len(seqs)}
Functions
def embed_and_sum_function(example, model, ngrams: int, tokenizer_embeddings, tokenizer_ngrams, checkpoint: str, dataset_key_text: str = None, layer: str = 'last_hidden_state', padding: str = 'max_length', batch_size: int = 8, parsing: str = '', nlp_chunks=None, all_ngrams: bool = False, fit_with_ngram_decomposition: bool = True, instructor_prompt: str = None, sum_embeddings=True)
-
Get summed embeddings for a single example
Params
ngrams: int What order of ngrams to use (1 for unigrams, 2 for bigrams, …) dataset_key_text: str that identifies where data examples are stored, e.g. "sentence" for sst2 tokenizer_embeddings tokenizing for the embedding model tokenizer_ngrams tokenizing the ngrams (word-based tokenization is more interpretable) parsing: str whether to use parsing rather than extracting all ngrams nlp_chunks if parsing is not empty string, a parser that extracts specific ngrams fit_with_ngram_decomposition whether to fit the model with ngram decomposition (if not just use the standard sentence) instructor_prompt: str if using instructor, the prompt to use
Expand source code
def embed_and_sum_function( example, model, ngrams: int, tokenizer_embeddings, tokenizer_ngrams, checkpoint: str, dataset_key_text: str = None, layer: str = "last_hidden_state", padding: str = "max_length", batch_size: int = 8, parsing: str = "", nlp_chunks=None, all_ngrams: bool = False, fit_with_ngram_decomposition: bool = True, instructor_prompt: str = None, sum_embeddings=True, ): """Get summed embeddings for a single example Params ------ ngrams: int What order of ngrams to use (1 for unigrams, 2 for bigrams, ...) dataset_key_text: str that identifies where data examples are stored, e.g. "sentence" for sst2 tokenizer_embeddings tokenizing for the embedding model tokenizer_ngrams tokenizing the ngrams (word-based tokenization is more interpretable) parsing: str whether to use parsing rather than extracting all ngrams nlp_chunks if parsing is not empty string, a parser that extracts specific ngrams fit_with_ngram_decomposition whether to fit the model with ngram decomposition (if not just use the standard sentence) instructor_prompt: str if using instructor, the prompt to use """ if dataset_key_text is not None: sentence = example[dataset_key_text] else: sentence = example # seqs = sentence if fit_with_ngram_decomposition: seqs = imodelsx.util.generate_ngrams_list( sentence, ngrams=ngrams, tokenizer_ngrams=tokenizer_ngrams, parsing=parsing, nlp_chunks=nlp_chunks, all_ngrams=all_ngrams, ) elif isinstance(sentence, list): seqs = sentence elif isinstance(sentence, str): seqs = [sentence] else: raise ValueError("sentence must be a string or list of strings") # assert isinstance( # sentence, str # ), "sentence must be a string (batched mode not supported)" # seqs = list(map(imodelsx.util.generate_ngrams_list, sentence)) seq_len = len(seqs) if seq_len == 0: # will multiply embedding by 0 so doesn't matter, but still want to get the shape seqs = ["dummy"] if not checkpoint.startswith("hkunlp/instructor") and ( not hasattr(tokenizer_embeddings, "pad_token") or tokenizer_embeddings.pad_token is None ): tokenizer_embeddings.pad_token = tokenizer_embeddings.eos_token embs = [] if ( "bert" in checkpoint.lower() ): # has up to two keys, 'last_hidden_state', 'pooler_output' tokens = tokenizer_embeddings( seqs, padding=padding, truncation=True, return_tensors="pt" ) ds = Dataset.from_dict(tokens).with_format("torch") for batch in DataLoader(ds, batch_size=batch_size, shuffle=False): batch = {k: v.to(model.device) for k, v in batch.items()} with torch.no_grad(): output = model(**batch) torch.cuda.empty_cache() if layer == "pooler_output": emb = output["pooler_output"].cpu().detach().numpy() elif layer == "last_hidden_state_mean" or layer == "last_hidden_state": emb = output["last_hidden_state"].cpu().detach().numpy() emb = emb.mean(axis=1) embs.append(emb) embs = np.concatenate(embs) elif "gpt" in checkpoint.lower() or "llama" in checkpoint.lower(): tokens = preprocess_gpt_token_batch(seqs, tokenizer_embeddings) ds = Dataset.from_dict(tokens).with_format("torch") for batch in DataLoader(ds, batch_size=batch_size, shuffle=False): batch = {k: v.to(model.device) for k, v in batch.items()} with torch.no_grad(): output = model(**batch) torch.cuda.empty_cache() if "hidden_states" in output.keys(): # extract (layer x (batch_size, seq_len, hidden_size)) h = output["hidden_states"] # convert to (batch_size, seq_len, hidden_size) emb = h[0].cpu().detach().numpy() # convert to (batch_size, hidden_size) emb = emb.mean(axis=1) elif "last_hidden_state" in output.keys(): # extract (batch_size, seq_len, hidden_size) h = output["last_hidden_state"] # convert to np emb = h.cpu().detach().numpy() # convert to (batch_size, hidden_size) emb = emb.mean(axis=1) else: raise Exception(f"keys: {output.keys()}") embs.append(emb) embs = np.concatenate(embs) elif checkpoint.startswith("hkunlp/instructor"): if instructor_prompt is None: instructor_prompt = ( "Represent the short phrase for sentiment classification: " ) embs = model.encode( [[instructor_prompt, x_i] for x_i in seqs], batch_size=batch_size ) else: raise Exception(f"Unknown model checkpoint {checkpoint}") # sum over the embeddings if sum_embeddings: embs = embs.sum(axis=0).reshape(1, -1) if seq_len == 0: embs *= 0 return {"embs": embs, "seq_len": len(seqs)}
def get_model(checkpoint)
-
Expand source code
def get_model(checkpoint): if "distilbert" in checkpoint.lower(): model = DistilBertModel.from_pretrained(checkpoint) elif "bert-base" in checkpoint.lower() or "BERT" in checkpoint: model = BertModel.from_pretrained(checkpoint) elif "gpt" in checkpoint.lower(): model = AutoModelForCausalLM.from_pretrained( checkpoint, output_hidden_states=True ) try: model = model.cuda() except: pass return model
def preprocess_gpt_token_batch(seqs, tokenizer_embeddings)
-
Preprocess token batch with token strings of different lengths Add attention mask here
Expand source code
def preprocess_gpt_token_batch(seqs, tokenizer_embeddings): """Preprocess token batch with token strings of different lengths Add attention mask here """ # batch_size = len(seqs) token_ids = [tokenizer_embeddings.encode(s, add_special_tokens=False) for s in seqs] prompt_lengths = [len(s) for s in token_ids] max_prompt_len = max(prompt_lengths) # use 0 as padding id, shouldn't matter (snippet from here https://github.com/huggingface/transformers/issues/3021) padded_tokens = [ tok_ids + [0] * (max_prompt_len - len(tok_ids)) for tok_ids in token_ids ] input_ids = torch.LongTensor(padded_tokens) attn_mask = torch.zeros(input_ids.shape).long() for ix, tok_ids in enumerate(token_ids): attn_mask[ix][: len(tok_ids)] = 1 # tokens = tokenizer(seqs, truncation=True, return_tensors="pt") return {"input_ids": input_ids, "attention_mask": attn_mask}