Module imodelsx.llm

Expand source code
from copy import deepcopy
import json
from transformers import (
    T5ForConditionalGeneration,
)
from datasets import Dataset
import transformers
from transformers import AutoConfig, AutoModel, AutoTokenizer, AutoModelForCausalLM
import re
from transformers import LlamaForCausalLM, LlamaTokenizer
from typing import Any, Dict, List, Mapping, Optional, Union
import numpy as np
import os.path
from os.path import join, dirname
import os
import pickle as pkl
from scipy.special import softmax
import hashlib
import torch
import time
from tqdm import tqdm

HF_TOKEN = None
if 'HF_TOKEN' in os.environ:
    HF_TOKEN = os.environ.get("HF_TOKEN")
elif os.path.exists('~/.HF_TOKEN'):
    HF_TOKEN = open(os.path.expanduser('~/.HF_TOKEN'), 'r').read().strip()
'''
Example usage:
# gpt-4, gpt-35-turbo, meta-llama/Llama-2-70b-hf, mistralai/Mistral-7B-v0.1
checkpoint = 'meta-llama/Llama-2-7b-hf'
llm = imodelsx.llm.get_llm(checkpoint)
llm('may the force be') # returns ' with you'
'''

# change these settings before using these classes!
LLM_CONFIG = {
    # how long to wait before recalling a failed llm call (can set to None)
    "LLM_REPEAT_DELAY": None,
    "CACHE_DIR": join(
        os.path.expanduser("~"), "clin/CACHE_OPENAI"
    ),  # path to save cached llm outputs
    "LLAMA_DIR": join(
        os.path.expanduser("~"), "llama"
    ),  # path to extracted llama weights
}


def get_llm(
    checkpoint,
    seed=1,
    role: str = None,
    repeat_delay: Optional[float] = None,
    CACHE_DIR=LLM_CONFIG["CACHE_DIR"],
    LLAMA_DIR=LLM_CONFIG["LLAMA_DIR"],
):
    if repeat_delay is not None:
        LLM_CONFIG["LLM_REPEAT_DELAY"] = repeat_delay

    """Get an LLM with a call function and caching capabilities"""
    if checkpoint.startswith("text-da"):
        return LLM_OpenAI(checkpoint, seed=seed, CACHE_DIR=CACHE_DIR)
    elif checkpoint.startswith("gpt-3") or checkpoint.startswith("gpt-4"):
        return LLM_Chat(checkpoint, seed, role, CACHE_DIR)
    elif 'Meta-Llama-3' in checkpoint and 'Instruct' in checkpoint:
        return LLM_HF_Pipeline(checkpoint, CACHE_DIR)
    else:
        # warning: this sets torch.manual_seed(seed)
        return LLM_HF(checkpoint, seed=seed, CACHE_DIR=CACHE_DIR, LLAMA_DIR=LLAMA_DIR)


def repeatedly_call_with_delay(llm_call):
    def wrapper(*args, **kwargs):
        # Number of seconds to wait between calls (None will not repeat)
        delay = LLM_CONFIG["LLM_REPEAT_DELAY"]
        response = None
        while response is None:
            try:
                response = llm_call(*args, **kwargs)

                # fix for when this function was returning response rather than string
                # if response is not None and not isinstance(response, str):
                # response = response["choices"][0]["message"]["content"]
            except Exception as e:
                e = str(e)
                print(e)
                if "does not exist" in e:
                    return None
                elif "maximum context length" in e:
                    return None
                elif 'content management policy' in e:
                    return None
                if delay is None:
                    raise e
                else:
                    time.sleep(delay)
        return response

    return wrapper


class LLM_OpenAI:
    def __init__(self, checkpoint, seed, CACHE_DIR):
        self.cache_dir = join(
            CACHE_DIR, "cache_openai", f'{checkpoint.replace("/", "_")}___{seed}'
        )
        self.checkpoint = checkpoint

    @repeatedly_call_with_delay
    def __call__(
        self,
        prompt: str,
        max_new_tokens=250,
        frequency_penalty=0.25,  # maximum is 2
        temperature=0.1,
        do_sample=True,
        stop=None,
        return_str=True,
    ):

        # cache
        os.makedirs(self.cache_dir, exist_ok=True)
        hash_str = hashlib.sha256(prompt.encode()).hexdigest()
        cache_file = join(
            self.cache_dir, f"{hash_str}__num_tok={max_new_tokens}.pkl")
        if os.path.exists(cache_file):
            return pkl.load(open(cache_file, "rb"))

        # import openai
        # response = openai.Completion.create(
        #     engine=self.checkpoint,
        #     prompt=prompt,
        #     max_tokens=max_new_tokens,
        #     temperature=temperature,
        #     top_p=1,
        #     frequency_penalty=frequency_penalty,
        #     presence_penalty=0,
        #     stop=stop,
        #     # stop=["101"]
        # )
        # if return_str:
        #     response = response["choices"][0]["text"]

        from openai import AzureOpenAI
        api_key = os.getenv("OPENAI_API_KEY")  # need to fill this in
        client = AzureOpenAI(
            azure_endpoint="https://healthcare-ai.openai.azure.com/",
            api_version="2024-02-01",
            api_key=api_key,
        )

        response = client.chat.completions.create(  # replace this value with the deployment name you chose when you deployed the associated model.
            model=self.checkpoint,
            messages=prompt,
            temperature=temperature,
            max_tokens=max_new_tokens,
            top_p=1,
            frequency_penalty=frequency_penalty,
            presence_penalty=0,
            stop=None)

        if return_str:
            response = response.choices[0].message.content

        pkl.dump(response, open(cache_file, "wb"))
        return response


class LLM_Chat:
    """Chat models take a different format: https://platform.openai.com/docs/guides/chat/introduction"""

    def __init__(self, checkpoint, seed, role, CACHE_DIR):
        self.cache_dir = join(
            CACHE_DIR, "cache_openai", f'{checkpoint.replace("/", "_")}___{seed}'
        )
        self.checkpoint = checkpoint
        self.role = role
        from openai import AzureOpenAI
        api_key = os.getenv("OPENAI_API_KEY")  # need to fill this in
        self.client = AzureOpenAI(
            azure_endpoint="https://healthcare-ai.openai.azure.com/",
            api_version="2024-02-01",
            api_key=api_key,
        )

    # @repeatedly_call_with_delay
    def __call__(
        self,
        prompts_list: List[Dict[str, str]],
        max_new_tokens=250,
        stop=None,
        functions: List[Dict] = None,
        return_str=True,
        verbose=True,
        temperature=0.1,
        frequency_penalty=0.25,
        use_cache=True,
    ):
        """
        prompts_list: list of dicts, each dict has keys 'role' and 'content'
            Example: [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": "Who won the world series in 2020?"},
                {"role": "assistant",
                    "content": "The Los Angeles Dodgers won the World Series in 2020."},
                {"role": "user", "content": "Where was it played?"}
            ]
        prompts_list: str
            Alternatively, string which gets formatted into basic prompts_list:
            messages = [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": <<<<<prompts_list>>>>},
            ]
        """
        if isinstance(prompts_list, str):
            role = self.role
            if role is None:
                role = "You are a helpful assistant."
            prompts_list = [
                {"role": "system", "content": role},
                {"role": "user", "content": prompts_list},
            ]

        assert isinstance(prompts_list, list), prompts_list

        # cache
        os.makedirs(self.cache_dir, exist_ok=True)
        prompts_list_dict = {
            str(i): sorted(v.items()) for i, v in enumerate(prompts_list)
        }
        if not self.checkpoint == "gpt-3.5-turbo":
            prompts_list_dict["checkpoint"] = self.checkpoint
        if functions is not None:
            prompts_list_dict["functions"] = functions
        if temperature > 0.1:
            prompts_list_dict["temperature"] = temperature
        dict_as_str = json.dumps(prompts_list_dict, sort_keys=True)
        hash_str = hashlib.sha256(dict_as_str.encode()).hexdigest()
        cache_file = join(
            self.cache_dir,
            f"chat__{hash_str}__num_tok={max_new_tokens}.pkl",
        )
        if os.path.exists(cache_file) and use_cache:
            if verbose:
                print("cached!")
                # print(cache_file)
            # print(cache_file)
            response = pkl.load(open(cache_file, "rb"))
            if response is not None:
                return response
        if verbose:
            print("not cached")

        kwargs = dict(
            model=self.checkpoint,
            messages=prompts_list,
            max_tokens=max_new_tokens,
            temperature=temperature,
            top_p=1,
            frequency_penalty=frequency_penalty,  # maximum is 2
            presence_penalty=0,
            stop=stop,
            # stop=["101"]
        )
        if functions is not None:
            kwargs["functions"] = functions

        response = self.client.chat.completions.create(
            **kwargs,
        )

        if return_str:
            response = response.choices[0].message.content

        if response is not None:
            pkl.dump(response, open(cache_file, "wb"))

        return response


def load_tokenizer(checkpoint: str) -> transformers.PreTrainedTokenizer:
    if "facebook/opt" in checkpoint:
        # opt can't use fast tokenizer
        tokenizer = AutoTokenizer.from_pretrained(
            checkpoint, use_fast=False, padding_side='left', token=HF_TOKEN)
    elif "PMC_LLAMA" in checkpoint:
        tokenizer = transformers.LlamaTokenizer.from_pretrained(
            "chaoyi-wu/PMC_LLAMA_7B", padding_side='left', token=HF_TOKEN)
    else:
        # , use_fast=True)
        tokenizer = AutoTokenizer.from_pretrained(
            checkpoint, padding_side='left', use_fast=True, token=HF_TOKEN)

    if tokenizer.pad_token_id is None:
        tokenizer.pad_token_id = tokenizer.eos_token_id
    return tokenizer


def load_hf_model(checkpoint: str) -> transformers.PreTrainedModel:
    # set checkpoint
    kwargs = {
        "pretrained_model_name_or_path": checkpoint,
        "output_hidden_states": False,
        # "pad_token_id": tokenizer.eos_token_id,
        "low_cpu_mem_usage": True,
    }
    if "google/flan" in checkpoint:
        return T5ForConditionalGeneration.from_pretrained(
            checkpoint, device_map="auto", torch_dtype=torch.float16
        )
    elif checkpoint == "EleutherAI/gpt-j-6B":
        return AutoModelForCausalLM.from_pretrained(
            checkpoint,
            revision="float16",
            torch_dtype=torch.float16,
            **kwargs,
        )
    elif "llama-2" in checkpoint.lower():
        return AutoModelForCausalLM.from_pretrained(
            checkpoint,
            torch_dtype=torch.float16,
            device_map="auto",
            token=HF_TOKEN,
            offload_folder="offload",
        )
    elif "llama_" in checkpoint:
        return transformers.LlamaForCausalLM.from_pretrained(
            join(LLAMA_DIR, checkpoint),
            device_map="auto",
            torch_dtype=torch.float16,
        )
    elif 'microsoft/phi' in checkpoint:
        return AutoModelForCausalLM.from_pretrained(
            checkpoint
        )
    elif checkpoint == "gpt-xl":
        return AutoModelForCausalLM.from_pretrained(checkpoint)
    else:
        return AutoModelForCausalLM.from_pretrained(
            checkpoint, device_map="auto", torch_dtype=torch.float16,
            token=HF_TOKEN,
        )


class LLM_HF_Pipeline:
    def __init__(self, checkpoint, CACHE_DIR):

        self.pipeline_ = transformers.pipeline(
            "text-generation",
            model=checkpoint,
            # model_kwargs={"torch_dtype": torch.bfloat16},
            # , 'device_map': "auto"},
            model_kwargs={'torch_dtype': torch.float16},
            device_map="auto"
        )
        self.pipeline_.tokenizer.pad_token_id = self.pipeline_.tokenizer.eos_token_id
        self.pipeline_.tokenizer.padding_side = 'left'
        self.cache_dir = join(CACHE_DIR)

    def __call__(
        self,
        prompt: Union[str, List[str]],
        max_new_tokens=20,
        use_cache=True,
        verbose=False,
        batch_size=64,
    ):

        if use_cache:
            os.makedirs(self.cache_dir, exist_ok=True)
            hash_str = hashlib.sha256(str(prompt).encode()).hexdigest()
            cache_file = join(
                self.cache_dir, f"{hash_str}__num_tok={max_new_tokens}.pkl"
            )

            if os.path.exists(cache_file):
                if verbose:
                    print("cached!")
                try:
                    return pkl.load(open(cache_file, "rb"))
                except:
                    print('failed to load cache so rerunning...')
            if verbose:
                print("not cached...")
        outputs = self.pipeline_(
            prompt,
            max_new_tokens=max_new_tokens,
            batch_size=batch_size,
            do_sample=False,
        )
        if isinstance(prompt, str):
            texts = outputs[0]["generated_text"][len(prompt):]
        else:
            texts = [outputs[i][0]['generated_text']
                     [len(prompt[i]):] for i in range(len(outputs))]

        if use_cache:
            pkl.dump(texts, open(cache_file, "wb"))
        return texts


class LLM_HF:
    def __init__(self, checkpoint, seed, CACHE_DIR, LLAMA_DIR=None):
        self.tokenizer_ = load_tokenizer(checkpoint)
        self.model_ = load_hf_model(checkpoint)
        self.checkpoint = checkpoint
        self.cache_dir = join(
            CACHE_DIR, "cache_hf", f'{checkpoint.replace("/", "_")}___{seed}'
        )
        self.seed = seed

    def __call__(
        self,
        prompt: Union[str, List[str]],
        stop: str = None,
        max_new_tokens=20,
        do_sample=False,
        use_cache=True,
        verbose=False,
        return_next_token_prob_scores=False,
        target_token_strs: List[str] = None,
        return_top_target_token_str: bool = False,
        batch_size=1,
    ) -> Union[str, List[str]]:
        """Warning: stop is used posthoc but not during generation.
        Be careful, caching can take up a lot of memory....

        Example mistral-instruct prompt: "<s>[INST]'Input text: {example}\nQuestion: {question} Answer yes or no.[/INST]"


        Params
        ------
        return_next_token_prob_scores: bool
            If this is true, then the function will return the probability of the next token being each of the target_token_strs
            target_token_strs: List[str]
                If this is not None and return_next_token_prob_scores is True, then the function will return the probability of the next token being each of the target_token_strs
                The output will be a list of dictionaries in this case List[Dict[str, float]]
                return_top_target_token_str: bool
                    If true and above are true, then just return top token of the above
                    This is a way to constrain the output (but only for 1 token)
                    This setting caches but the other two (which do not return strings) do not cache

        """
        input_is_str = isinstance(prompt, str)
        with torch.no_grad():
            # cache
            if use_cache:
                os.makedirs(self.cache_dir, exist_ok=True)
                hash_str = hashlib.sha256(str(prompt).encode()).hexdigest()
                cache_file = join(
                    self.cache_dir, f"{hash_str}__num_tok={max_new_tokens}.pkl"
                )

                if os.path.exists(cache_file):
                    if verbose:
                        print("cached!")
                    try:
                        return pkl.load(open(cache_file, "rb"))
                    except:
                        print('failed to load cache so rerunning...')
                if verbose:
                    print("not cached...")

            # if stop is not None:
            # raise ValueError("stop kwargs are not permitted.")
            inputs = self.tokenizer_(
                prompt, return_tensors="pt",
                return_attention_mask=True,
                padding=True,
                truncation=False,
            ).to(self.model_.device)

            if return_next_token_prob_scores or target_token_strs or return_top_target_token_str:
                outputs = self.model_.generate(
                    **inputs,
                    max_new_tokens=1,
                    pad_token_id=self.tokenizer_.pad_token_id,
                    output_logits=True,
                    return_dict_in_generate=True,
                )
                next_token_logits = outputs['logits'][0]
                next_token_probs = next_token_logits.softmax(
                    axis=-1).detach().cpu().numpy()

                if target_token_strs is None:
                    return next_token_probs

                target_token_ids = self._check_target_token_strs(
                    target_token_strs)
                if return_top_target_token_str:
                    selected_tokens = next_token_probs[:, np.array(
                        target_token_ids)].squeeze().argmax(axis=-1)
                    out_strs = [
                        target_token_strs[selected_tokens[i]]
                        for i in range(len(selected_tokens))
                    ]
                    if len(out_strs) == 1:
                        out_strs = out_strs[0]
                    if use_cache:
                        pkl.dump(out_strs, open(cache_file, "wb"))
                    return out_strs
                else:
                    out_dict_list = [
                        {target_token_strs[i]: next_token_probs[prompt_num, target_token_ids[i]]
                            for i in range(len(target_token_strs))
                         }
                        for prompt_num in range(len(prompt))
                    ]
                    return out_dict_list
            else:
                outputs = self.model_.generate(
                    **inputs,
                    max_new_tokens=max_new_tokens,
                    do_sample=do_sample,
                    pad_token_id=self.tokenizer_.pad_token_id,
                )
                # top_p=0.92,
                # temperature=0,
                # top_k=0
            if input_is_str:
                out_str = self.tokenizer_.decode(
                    outputs[0], skip_special_tokens=True)
                # print('out_str', out_str)
                if 'mistral' in self.checkpoint and 'Instruct' in self.checkpoint:
                    out_str = out_str[len(prompt) - 2:]
                elif 'Meta-Llama-3' in self.checkpoint and 'Instruct' in self.checkpoint:
                    out_str = out_str[len(prompt) - 145:]
                else:
                    out_str = out_str[len(prompt):]

                if use_cache:
                    pkl.dump(out_str, open(cache_file, "wb"))
                return out_str
            else:
                out_strs = []
                for i in range(outputs.shape[0]):
                    out_tokens = outputs[i]
                    out_str = self.tokenizer_.decode(
                        out_tokens, skip_special_tokens=True)
                    if 'mistral' in self.checkpoint and 'Instruct' in self.checkpoint:
                        out_str = out_str[len(prompt[i]) - 2:]
                    elif 'Meta-Llama-3' in self.checkpoint and 'Instruct' in self.checkpoint:
                        # print('here')
                        out_str = out_str[len(prompt) + 187:]
                    else:
                        out_str = out_str[len(prompt[i]):]
                    out_strs.append(out_str)
                if use_cache:
                    pkl.dump(out_strs, open(cache_file, "wb"))
                return out_strs

    def _check_target_token_strs(self, target_token_strs, override_token_with_first_token_id=False):
        if isinstance(target_token_strs, str):
            target_token_strs = [target_token_strs]

        target_token_ids = [self.tokenizer_(target_token_str, add_special_tokens=False)["input_ids"]
                            for target_token_str in target_token_strs]

        # Check that the target token is in the vocab
        if override_token_with_first_token_id:
            # Get first token id in target_token_str
            target_token_ids = [target_token_id[0]
                                for target_token_id in target_token_ids]
        else:
            for i in range(len(target_token_strs)):
                if len(target_token_ids[i]) > 1:
                    raise ValueError(
                        f"target_token_str {target_token_strs[i]} has multiple tokens: " +
                        str([self.tokenizer_.decode(target_token_id)
                            for target_token_id in target_token_ids[i]]))
        return target_token_ids


class LLMEmbs:
    def __init__(self, checkpoint):
        self.tokenizer_ = load_tokenizer(checkpoint)
        self.model_ = AutoModel.from_pretrained(
            checkpoint, output_hidden_states=True,
            device_map="auto",
            torch_dtype=torch.float16,)

    def __call__(self, texts: List[str], layer_idx: int = 18, batch_size=16):
        '''Returns embeddings
        '''
        embs = []
        for i in tqdm(range(0, len(texts), batch_size)):
            inputs = self.tokenizer_(
                texts[i:i + batch_size], return_tensors='pt', padding=True).to(self.model_.device)
            hidden_states = self.model_(**inputs).hidden_states

            # layers x batch x tokens x features
            emb = hidden_states[layer_idx].detach().cpu().numpy()

            # get emb from last token
            emb = emb[:, -1, :]
            embs.append(deepcopy(emb))
        embs = np.concatenate(embs)
        return embs


if __name__ == "__main__":
    # llm = get_llm("text-davinci-003")
    # text = llm("What do these have in common? Horse, ")
    # print("text", text)

    # llm = get_llm("gpt2")
    # text = llm(
    # """Continue this list
    # - apple
    # - banana
    # -"""
    # )
    # print("text", text)
    # tokenizer = transformers.LlamaTokenizer.from_pretrained("chaoyi-wu/PMC_LLAMA_7B")
    # model = transformers.LlamaForCausalLM.from_pretrained("chaoyi-wu/PMC_LLAMA_7B")

    # llm = get_llm("chaoyi-wu/PMC_LLAMA_7B")
    #     llm = get_llm("llama_65b")
    #     text = llm(
    #         """Continue this list
    # - red
    # - orange
    # - yellow
    # - green
    # -""",
    #         use_cache=False,
    #     )
    #     print(text)
    #     print("\n\n")
    #     print(repr(text))

    # GET LOGITS ###################################
    # llm = get_llm("gpt2")
    # prompts = ['roses are red, violets are', 'may the force be with']
    # # prompts = ['may the force be with', 'so may the light be with']
    # target_token_strs = [' blue', ' you']
    # ans = llm(prompts, return_next_token_prob_scores=True,
    #           use_cache=False, target_token_strs=target_token_strs)

    # FORCE WORDS ##########
    llm = get_llm("gpt2")
    prompts = ['roses are red, violets are',
               'may the force be with', 'trees are usually']
    # prompts = ['may the force be with', 'so may the light be with']
    target_token_strs = [' green', ' you', 'orange']
    llm._check_target_token_strs(target_token_strs)
    ans = llm(prompts, use_cache=False,
              return_next_token_prob_scores=True, target_token_strs=target_token_strs,
              return_top_target_token_str=True)
    print('ans', ans)

Functions

def get_llm(checkpoint, seed=1, role: str = None, repeat_delay: Optional[float] = None, CACHE_DIR='/home/chansingh/clin/CACHE_OPENAI', LLAMA_DIR='/home/chansingh/llama')
Expand source code
def get_llm(
    checkpoint,
    seed=1,
    role: str = None,
    repeat_delay: Optional[float] = None,
    CACHE_DIR=LLM_CONFIG["CACHE_DIR"],
    LLAMA_DIR=LLM_CONFIG["LLAMA_DIR"],
):
    if repeat_delay is not None:
        LLM_CONFIG["LLM_REPEAT_DELAY"] = repeat_delay

    """Get an LLM with a call function and caching capabilities"""
    if checkpoint.startswith("text-da"):
        return LLM_OpenAI(checkpoint, seed=seed, CACHE_DIR=CACHE_DIR)
    elif checkpoint.startswith("gpt-3") or checkpoint.startswith("gpt-4"):
        return LLM_Chat(checkpoint, seed, role, CACHE_DIR)
    elif 'Meta-Llama-3' in checkpoint and 'Instruct' in checkpoint:
        return LLM_HF_Pipeline(checkpoint, CACHE_DIR)
    else:
        # warning: this sets torch.manual_seed(seed)
        return LLM_HF(checkpoint, seed=seed, CACHE_DIR=CACHE_DIR, LLAMA_DIR=LLAMA_DIR)
def load_hf_model(checkpoint: str) ‑> transformers.modeling_utils.PreTrainedModel
Expand source code
def load_hf_model(checkpoint: str) -> transformers.PreTrainedModel:
    # set checkpoint
    kwargs = {
        "pretrained_model_name_or_path": checkpoint,
        "output_hidden_states": False,
        # "pad_token_id": tokenizer.eos_token_id,
        "low_cpu_mem_usage": True,
    }
    if "google/flan" in checkpoint:
        return T5ForConditionalGeneration.from_pretrained(
            checkpoint, device_map="auto", torch_dtype=torch.float16
        )
    elif checkpoint == "EleutherAI/gpt-j-6B":
        return AutoModelForCausalLM.from_pretrained(
            checkpoint,
            revision="float16",
            torch_dtype=torch.float16,
            **kwargs,
        )
    elif "llama-2" in checkpoint.lower():
        return AutoModelForCausalLM.from_pretrained(
            checkpoint,
            torch_dtype=torch.float16,
            device_map="auto",
            token=HF_TOKEN,
            offload_folder="offload",
        )
    elif "llama_" in checkpoint:
        return transformers.LlamaForCausalLM.from_pretrained(
            join(LLAMA_DIR, checkpoint),
            device_map="auto",
            torch_dtype=torch.float16,
        )
    elif 'microsoft/phi' in checkpoint:
        return AutoModelForCausalLM.from_pretrained(
            checkpoint
        )
    elif checkpoint == "gpt-xl":
        return AutoModelForCausalLM.from_pretrained(checkpoint)
    else:
        return AutoModelForCausalLM.from_pretrained(
            checkpoint, device_map="auto", torch_dtype=torch.float16,
            token=HF_TOKEN,
        )
def load_tokenizer(checkpoint: str) ‑> transformers.tokenization_utils.PreTrainedTokenizer
Expand source code
def load_tokenizer(checkpoint: str) -> transformers.PreTrainedTokenizer:
    if "facebook/opt" in checkpoint:
        # opt can't use fast tokenizer
        tokenizer = AutoTokenizer.from_pretrained(
            checkpoint, use_fast=False, padding_side='left', token=HF_TOKEN)
    elif "PMC_LLAMA" in checkpoint:
        tokenizer = transformers.LlamaTokenizer.from_pretrained(
            "chaoyi-wu/PMC_LLAMA_7B", padding_side='left', token=HF_TOKEN)
    else:
        # , use_fast=True)
        tokenizer = AutoTokenizer.from_pretrained(
            checkpoint, padding_side='left', use_fast=True, token=HF_TOKEN)

    if tokenizer.pad_token_id is None:
        tokenizer.pad_token_id = tokenizer.eos_token_id
    return tokenizer
def repeatedly_call_with_delay(llm_call)
Expand source code
def repeatedly_call_with_delay(llm_call):
    def wrapper(*args, **kwargs):
        # Number of seconds to wait between calls (None will not repeat)
        delay = LLM_CONFIG["LLM_REPEAT_DELAY"]
        response = None
        while response is None:
            try:
                response = llm_call(*args, **kwargs)

                # fix for when this function was returning response rather than string
                # if response is not None and not isinstance(response, str):
                # response = response["choices"][0]["message"]["content"]
            except Exception as e:
                e = str(e)
                print(e)
                if "does not exist" in e:
                    return None
                elif "maximum context length" in e:
                    return None
                elif 'content management policy' in e:
                    return None
                if delay is None:
                    raise e
                else:
                    time.sleep(delay)
        return response

    return wrapper

Classes

class LLMEmbs (checkpoint)
Expand source code
class LLMEmbs:
    def __init__(self, checkpoint):
        self.tokenizer_ = load_tokenizer(checkpoint)
        self.model_ = AutoModel.from_pretrained(
            checkpoint, output_hidden_states=True,
            device_map="auto",
            torch_dtype=torch.float16,)

    def __call__(self, texts: List[str], layer_idx: int = 18, batch_size=16):
        '''Returns embeddings
        '''
        embs = []
        for i in tqdm(range(0, len(texts), batch_size)):
            inputs = self.tokenizer_(
                texts[i:i + batch_size], return_tensors='pt', padding=True).to(self.model_.device)
            hidden_states = self.model_(**inputs).hidden_states

            # layers x batch x tokens x features
            emb = hidden_states[layer_idx].detach().cpu().numpy()

            # get emb from last token
            emb = emb[:, -1, :]
            embs.append(deepcopy(emb))
        embs = np.concatenate(embs)
        return embs
class LLM_Chat (checkpoint, seed, role, CACHE_DIR)
Expand source code
class LLM_Chat:
    """Chat models take a different format: https://platform.openai.com/docs/guides/chat/introduction"""

    def __init__(self, checkpoint, seed, role, CACHE_DIR):
        self.cache_dir = join(
            CACHE_DIR, "cache_openai", f'{checkpoint.replace("/", "_")}___{seed}'
        )
        self.checkpoint = checkpoint
        self.role = role
        from openai import AzureOpenAI
        api_key = os.getenv("OPENAI_API_KEY")  # need to fill this in
        self.client = AzureOpenAI(
            azure_endpoint="https://healthcare-ai.openai.azure.com/",
            api_version="2024-02-01",
            api_key=api_key,
        )

    # @repeatedly_call_with_delay
    def __call__(
        self,
        prompts_list: List[Dict[str, str]],
        max_new_tokens=250,
        stop=None,
        functions: List[Dict] = None,
        return_str=True,
        verbose=True,
        temperature=0.1,
        frequency_penalty=0.25,
        use_cache=True,
    ):
        """
        prompts_list: list of dicts, each dict has keys 'role' and 'content'
            Example: [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": "Who won the world series in 2020?"},
                {"role": "assistant",
                    "content": "The Los Angeles Dodgers won the World Series in 2020."},
                {"role": "user", "content": "Where was it played?"}
            ]
        prompts_list: str
            Alternatively, string which gets formatted into basic prompts_list:
            messages = [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": <<<<<prompts_list>>>>},
            ]
        """
        if isinstance(prompts_list, str):
            role = self.role
            if role is None:
                role = "You are a helpful assistant."
            prompts_list = [
                {"role": "system", "content": role},
                {"role": "user", "content": prompts_list},
            ]

        assert isinstance(prompts_list, list), prompts_list

        # cache
        os.makedirs(self.cache_dir, exist_ok=True)
        prompts_list_dict = {
            str(i): sorted(v.items()) for i, v in enumerate(prompts_list)
        }
        if not self.checkpoint == "gpt-3.5-turbo":
            prompts_list_dict["checkpoint"] = self.checkpoint
        if functions is not None:
            prompts_list_dict["functions"] = functions
        if temperature > 0.1:
            prompts_list_dict["temperature"] = temperature
        dict_as_str = json.dumps(prompts_list_dict, sort_keys=True)
        hash_str = hashlib.sha256(dict_as_str.encode()).hexdigest()
        cache_file = join(
            self.cache_dir,
            f"chat__{hash_str}__num_tok={max_new_tokens}.pkl",
        )
        if os.path.exists(cache_file) and use_cache:
            if verbose:
                print("cached!")
                # print(cache_file)
            # print(cache_file)
            response = pkl.load(open(cache_file, "rb"))
            if response is not None:
                return response
        if verbose:
            print("not cached")

        kwargs = dict(
            model=self.checkpoint,
            messages=prompts_list,
            max_tokens=max_new_tokens,
            temperature=temperature,
            top_p=1,
            frequency_penalty=frequency_penalty,  # maximum is 2
            presence_penalty=0,
            stop=stop,
            # stop=["101"]
        )
        if functions is not None:
            kwargs["functions"] = functions

        response = self.client.chat.completions.create(
            **kwargs,
        )

        if return_str:
            response = response.choices[0].message.content

        if response is not None:
            pkl.dump(response, open(cache_file, "wb"))

        return response
class LLM_HF (checkpoint, seed, CACHE_DIR, LLAMA_DIR=None)
Expand source code
class LLM_HF:
    def __init__(self, checkpoint, seed, CACHE_DIR, LLAMA_DIR=None):
        self.tokenizer_ = load_tokenizer(checkpoint)
        self.model_ = load_hf_model(checkpoint)
        self.checkpoint = checkpoint
        self.cache_dir = join(
            CACHE_DIR, "cache_hf", f'{checkpoint.replace("/", "_")}___{seed}'
        )
        self.seed = seed

    def __call__(
        self,
        prompt: Union[str, List[str]],
        stop: str = None,
        max_new_tokens=20,
        do_sample=False,
        use_cache=True,
        verbose=False,
        return_next_token_prob_scores=False,
        target_token_strs: List[str] = None,
        return_top_target_token_str: bool = False,
        batch_size=1,
    ) -> Union[str, List[str]]:
        """Warning: stop is used posthoc but not during generation.
        Be careful, caching can take up a lot of memory....

        Example mistral-instruct prompt: "<s>[INST]'Input text: {example}\nQuestion: {question} Answer yes or no.[/INST]"


        Params
        ------
        return_next_token_prob_scores: bool
            If this is true, then the function will return the probability of the next token being each of the target_token_strs
            target_token_strs: List[str]
                If this is not None and return_next_token_prob_scores is True, then the function will return the probability of the next token being each of the target_token_strs
                The output will be a list of dictionaries in this case List[Dict[str, float]]
                return_top_target_token_str: bool
                    If true and above are true, then just return top token of the above
                    This is a way to constrain the output (but only for 1 token)
                    This setting caches but the other two (which do not return strings) do not cache

        """
        input_is_str = isinstance(prompt, str)
        with torch.no_grad():
            # cache
            if use_cache:
                os.makedirs(self.cache_dir, exist_ok=True)
                hash_str = hashlib.sha256(str(prompt).encode()).hexdigest()
                cache_file = join(
                    self.cache_dir, f"{hash_str}__num_tok={max_new_tokens}.pkl"
                )

                if os.path.exists(cache_file):
                    if verbose:
                        print("cached!")
                    try:
                        return pkl.load(open(cache_file, "rb"))
                    except:
                        print('failed to load cache so rerunning...')
                if verbose:
                    print("not cached...")

            # if stop is not None:
            # raise ValueError("stop kwargs are not permitted.")
            inputs = self.tokenizer_(
                prompt, return_tensors="pt",
                return_attention_mask=True,
                padding=True,
                truncation=False,
            ).to(self.model_.device)

            if return_next_token_prob_scores or target_token_strs or return_top_target_token_str:
                outputs = self.model_.generate(
                    **inputs,
                    max_new_tokens=1,
                    pad_token_id=self.tokenizer_.pad_token_id,
                    output_logits=True,
                    return_dict_in_generate=True,
                )
                next_token_logits = outputs['logits'][0]
                next_token_probs = next_token_logits.softmax(
                    axis=-1).detach().cpu().numpy()

                if target_token_strs is None:
                    return next_token_probs

                target_token_ids = self._check_target_token_strs(
                    target_token_strs)
                if return_top_target_token_str:
                    selected_tokens = next_token_probs[:, np.array(
                        target_token_ids)].squeeze().argmax(axis=-1)
                    out_strs = [
                        target_token_strs[selected_tokens[i]]
                        for i in range(len(selected_tokens))
                    ]
                    if len(out_strs) == 1:
                        out_strs = out_strs[0]
                    if use_cache:
                        pkl.dump(out_strs, open(cache_file, "wb"))
                    return out_strs
                else:
                    out_dict_list = [
                        {target_token_strs[i]: next_token_probs[prompt_num, target_token_ids[i]]
                            for i in range(len(target_token_strs))
                         }
                        for prompt_num in range(len(prompt))
                    ]
                    return out_dict_list
            else:
                outputs = self.model_.generate(
                    **inputs,
                    max_new_tokens=max_new_tokens,
                    do_sample=do_sample,
                    pad_token_id=self.tokenizer_.pad_token_id,
                )
                # top_p=0.92,
                # temperature=0,
                # top_k=0
            if input_is_str:
                out_str = self.tokenizer_.decode(
                    outputs[0], skip_special_tokens=True)
                # print('out_str', out_str)
                if 'mistral' in self.checkpoint and 'Instruct' in self.checkpoint:
                    out_str = out_str[len(prompt) - 2:]
                elif 'Meta-Llama-3' in self.checkpoint and 'Instruct' in self.checkpoint:
                    out_str = out_str[len(prompt) - 145:]
                else:
                    out_str = out_str[len(prompt):]

                if use_cache:
                    pkl.dump(out_str, open(cache_file, "wb"))
                return out_str
            else:
                out_strs = []
                for i in range(outputs.shape[0]):
                    out_tokens = outputs[i]
                    out_str = self.tokenizer_.decode(
                        out_tokens, skip_special_tokens=True)
                    if 'mistral' in self.checkpoint and 'Instruct' in self.checkpoint:
                        out_str = out_str[len(prompt[i]) - 2:]
                    elif 'Meta-Llama-3' in self.checkpoint and 'Instruct' in self.checkpoint:
                        # print('here')
                        out_str = out_str[len(prompt) + 187:]
                    else:
                        out_str = out_str[len(prompt[i]):]
                    out_strs.append(out_str)
                if use_cache:
                    pkl.dump(out_strs, open(cache_file, "wb"))
                return out_strs

    def _check_target_token_strs(self, target_token_strs, override_token_with_first_token_id=False):
        if isinstance(target_token_strs, str):
            target_token_strs = [target_token_strs]

        target_token_ids = [self.tokenizer_(target_token_str, add_special_tokens=False)["input_ids"]
                            for target_token_str in target_token_strs]

        # Check that the target token is in the vocab
        if override_token_with_first_token_id:
            # Get first token id in target_token_str
            target_token_ids = [target_token_id[0]
                                for target_token_id in target_token_ids]
        else:
            for i in range(len(target_token_strs)):
                if len(target_token_ids[i]) > 1:
                    raise ValueError(
                        f"target_token_str {target_token_strs[i]} has multiple tokens: " +
                        str([self.tokenizer_.decode(target_token_id)
                            for target_token_id in target_token_ids[i]]))
        return target_token_ids
class LLM_HF_Pipeline (checkpoint, CACHE_DIR)
Expand source code
class LLM_HF_Pipeline:
    def __init__(self, checkpoint, CACHE_DIR):

        self.pipeline_ = transformers.pipeline(
            "text-generation",
            model=checkpoint,
            # model_kwargs={"torch_dtype": torch.bfloat16},
            # , 'device_map': "auto"},
            model_kwargs={'torch_dtype': torch.float16},
            device_map="auto"
        )
        self.pipeline_.tokenizer.pad_token_id = self.pipeline_.tokenizer.eos_token_id
        self.pipeline_.tokenizer.padding_side = 'left'
        self.cache_dir = join(CACHE_DIR)

    def __call__(
        self,
        prompt: Union[str, List[str]],
        max_new_tokens=20,
        use_cache=True,
        verbose=False,
        batch_size=64,
    ):

        if use_cache:
            os.makedirs(self.cache_dir, exist_ok=True)
            hash_str = hashlib.sha256(str(prompt).encode()).hexdigest()
            cache_file = join(
                self.cache_dir, f"{hash_str}__num_tok={max_new_tokens}.pkl"
            )

            if os.path.exists(cache_file):
                if verbose:
                    print("cached!")
                try:
                    return pkl.load(open(cache_file, "rb"))
                except:
                    print('failed to load cache so rerunning...')
            if verbose:
                print("not cached...")
        outputs = self.pipeline_(
            prompt,
            max_new_tokens=max_new_tokens,
            batch_size=batch_size,
            do_sample=False,
        )
        if isinstance(prompt, str):
            texts = outputs[0]["generated_text"][len(prompt):]
        else:
            texts = [outputs[i][0]['generated_text']
                     [len(prompt[i]):] for i in range(len(outputs))]

        if use_cache:
            pkl.dump(texts, open(cache_file, "wb"))
        return texts
class LLM_OpenAI (checkpoint, seed, CACHE_DIR)
Expand source code
class LLM_OpenAI:
    def __init__(self, checkpoint, seed, CACHE_DIR):
        self.cache_dir = join(
            CACHE_DIR, "cache_openai", f'{checkpoint.replace("/", "_")}___{seed}'
        )
        self.checkpoint = checkpoint

    @repeatedly_call_with_delay
    def __call__(
        self,
        prompt: str,
        max_new_tokens=250,
        frequency_penalty=0.25,  # maximum is 2
        temperature=0.1,
        do_sample=True,
        stop=None,
        return_str=True,
    ):

        # cache
        os.makedirs(self.cache_dir, exist_ok=True)
        hash_str = hashlib.sha256(prompt.encode()).hexdigest()
        cache_file = join(
            self.cache_dir, f"{hash_str}__num_tok={max_new_tokens}.pkl")
        if os.path.exists(cache_file):
            return pkl.load(open(cache_file, "rb"))

        # import openai
        # response = openai.Completion.create(
        #     engine=self.checkpoint,
        #     prompt=prompt,
        #     max_tokens=max_new_tokens,
        #     temperature=temperature,
        #     top_p=1,
        #     frequency_penalty=frequency_penalty,
        #     presence_penalty=0,
        #     stop=stop,
        #     # stop=["101"]
        # )
        # if return_str:
        #     response = response["choices"][0]["text"]

        from openai import AzureOpenAI
        api_key = os.getenv("OPENAI_API_KEY")  # need to fill this in
        client = AzureOpenAI(
            azure_endpoint="https://healthcare-ai.openai.azure.com/",
            api_version="2024-02-01",
            api_key=api_key,
        )

        response = client.chat.completions.create(  # replace this value with the deployment name you chose when you deployed the associated model.
            model=self.checkpoint,
            messages=prompt,
            temperature=temperature,
            max_tokens=max_new_tokens,
            top_p=1,
            frequency_penalty=frequency_penalty,
            presence_penalty=0,
            stop=None)

        if return_str:
            response = response.choices[0].message.content

        pkl.dump(response, open(cache_file, "wb"))
        return response