Module imodelsx.submit_utils
Expand source code
from typing import Any, Dict, List, Optional, Tuple, Union
import itertools
import subprocess
import random
from multiprocessing.pool import ThreadPool
from multiprocessing import Pool, current_process, Queue
from functools import reduce
from itertools import repeat
import time
import traceback
import numpy as np
import os
from os.path import dirname, join
from dict_hash import sha256
submit_utils_dir = dirname(__file__)
"""Handles utilities for job sweeps,
focused on embarassingly parallel sweeps on a single machine.
"""
def run_args_list(
args_list: List[Dict[str, Any]],
cmd_python: str = 'python',
script_name: str = '02_train_suffix.py',
actually_run: bool = True,
debug_mode: bool = False,
shuffle: bool = False,
reverse: bool = False,
unique_seeds: str = None,
n_cpus: int = 1,
gpu_ids: Union[List[int], List[List[int]]] = [],
repeat_failed_jobs: bool = False,
slurm: bool = False,
slurm_kwargs: Optional[Dict] = None,
amlt_kwargs: Optional[Dict] = None,
):
"""
Params
------
run_args_list
cmd_python: str
Command to run python
script_name: str
Name of script to run
actually_run: bool
Whether to actually run the script (otherwise just print the command)
debug_mode: bool
Whether to open debugger after failure (stops all parallelilization)
shuffle: bool
Whether to shuffle the order of the script calls
reverse: bool
Whether to reverse the order of the script calls
unique_seeds: str
Whether to assign random, unique values to each parameter with this value
n_cpus: int
Number of cpus to use (if >1, parallelizes over local machine)
gpu_ids: List[int], List[List[int]]
Ids of GPUs to run on (e.g. [0, 1] for 2 gpus)
If List[List[int]], then each inner list is a group of GPUs to run on, e.g. [[0, 1], [2, 3]] for 2 groups of 2 GPUs
repeat_failed_jobs: bool
Whether to repeatedly run failed jobs
run_slurm: bool
Whether to run on SLURM (defaults to False)
slurm_kwargs: Optional[Dict]
kwargs for slurm
amlt_kwargs: Optional[Dict]
kwargs for amlt (will override everything else)
"""
if amlt_kwargs is not None:
print('Running on AMLT with', amlt_kwargs)
else:
n_gpus = len(gpu_ids)
_validate_run_arguments(n_cpus, gpu_ids)
# adjust order
if shuffle:
random.shuffle(args_list)
if reverse:
args_list = args_list[::-1]
# debug mode
if debug_mode:
cmd_python = 'python -m pdb -c continue'
if n_cpus > 1 or n_gpus > 1:
print('\n###\n### Debug mode, setting n_cpus=1 and n_gpus=0 ###\n###\n')
n_cpus = 1
n_gpus = 0
# assign unique seeds
if unique_seeds:
for i, args in enumerate(args_list):
args_list[i]['seed_stories'] = random.randint(1, int(1e6))
# construct commands
param_str_list = [_param_str_from_args(
args, cmd_python, script_name) for args in args_list]
# just print and exit
if not actually_run:
print('Not actually running the commands, just printing them.')
for i, param_str in enumerate(param_str_list):
print(
f'\n\n-------------------{i + 1}/{len(param_str_list)}--------------------\n' + param_str)
return
failed_jobs = []
if slurm:
for i, param_str in enumerate(param_str_list):
print(
f'\n\n-------------------{i + 1}/{len(param_str_list)}--------------------\n' + param_str)
run_slurm(param_str, slurm_kwargs=slurm_kwargs)
return
elif amlt_kwargs is not None:
assert 'amlt_file' in amlt_kwargs
sku = amlt_kwargs.get('sku', 'G1')
process_count_per_node = amlt_kwargs.get('process_count_per_node', 1)
amlt_dir = dirname(amlt_kwargs['amlt_file'])
repo_dir = dirname(amlt_dir)
amlt_text = open(amlt_kwargs['amlt_file'], 'r').read()
assert amlt_text.endswith('jobs:'), 'amlt file must end with jobs:'
script_name = script_name.replace(repo_dir, '').strip('/')
param_str_list = [_param_str_from_args(
args, cmd_python, script_name) for args in args_list]
if 'mnt_rename' in amlt_kwargs:
param_str_list = [
param_str.replace(
amlt_kwargs['mnt_rename'][0], amlt_kwargs['mnt_rename'][1])
for param_str in param_str_list
]
# save yaml file with multiple jobs in logs dir and run with amlt
logs_dir = join(amlt_dir, 'logs')
os.makedirs(logs_dir, exist_ok=True)
job_template = '''
- name: {name}
process_count_per_node: {process_count_per_node}
sku: {sku}
command:
- echo "{param_str}"
- {param_str}'''
out_file = join(logs_dir, sha256({'s': str(param_str_list)}) + '.yaml')
s = amlt_text
for i, param_str in enumerate(param_str_list):
job_text = job_template.format(
name=f'{sku}_job_{i}',
process_count_per_node=process_count_per_node,
sku=sku,
param_str=param_str
)
s = s + job_text
s = s.replace('$CONFIG_DIR', '$CONFIG_DIR/..')
with open(out_file, 'w') as f:
f.write(s)
subprocess.run(
f'amlt run {out_file}', shell=True, check=True,
)
return
# run serial
elif n_cpus == 1 and n_gpus == 0:
for i, param_str in enumerate(param_str_list):
print(
f'\n\n-------------------{i + 1}/{len(param_str_list)}--------------------\n' + param_str)
try:
output = subprocess.run(
param_str, shell=True, check=True,
)
except KeyboardInterrupt:
print('Keyboard interrupt, exiting...')
exit(0)
except subprocess.CalledProcessError as e:
print('CalledProcessError', e)
failed_jobs.append((i, param_str))
except Exception as e:
print(e)
# run parallel on CPUs
elif n_cpus > 1 and n_gpus == 0:
def run_single_job(i, param_str):
print(
f'\n\n-------------------{i + 1}/{len(param_str_list)}--------------------\n' + param_str)
try:
output = subprocess.run(
param_str, shell=True, check=True,
)
except subprocess.CalledProcessError as e:
print('CalledProcessError', e)
failed_jobs.append((i, param_str))
except KeyboardInterrupt:
print('Keyboard interrupt, exiting...')
exit(0)
except Exception as e:
print(e)
pool = ThreadPool(n_cpus)
for i, param_str in enumerate(param_str_list):
pool.apply_async(run_single_job, (i, param_str, ))
pool.close()
pool.join()
# run parallel on GPUs
elif n_gpus > 0:
# initialize the queue with the GPU ids
global job_queue_multiprocessing
job_queue_multiprocessing = Queue()
for gpu_id in gpu_ids:
job_queue_multiprocessing.put(gpu_id)
# call the jobs
pool = Pool(processes=n_gpus)
n = len(param_str_list)
indexes = [i for i in range(n)]
args = zip(param_str_list, indexes, repeat(n))
# time.sleep(0.1)
for failed_job in pool.starmap(run_on_gpu, args, chunksize=1):
failed_jobs.append(failed_job)
failed_jobs = [x for x in failed_jobs if x is not None]
pool.close()
pool.join()
print('failed_jobs', failed_jobs)
# final printing
print('\n\n\n*********************Done*********************')
if len(failed_jobs) == 0:
print('All jobs succeeded!')
else:
print(len(failed_jobs), 'Failed jobs\n\n')
for (i, param_str) in failed_jobs:
print('\t', param_str)
# print('\t', repr(e))
failed_args_list = [args_list[i] for (i, _) in failed_jobs]
if repeat_failed_jobs:
print('Repeating failed jobs...')
run_args_list(
failed_args_list,
cmd_python=cmd_python,
script_name=script_name,
actually_run=actually_run,
shuffle=shuffle,
reverse=reverse,
n_cpus=n_cpus,
gpu_ids=gpu_ids,
repeat_failed_jobs=repeat_failed_jobs,
)
def run_slurm(param_str, slurm_kwargs):
from slurmpy import Slurm
slurm = Slurm(
f"imodelsx_job_{time.time()}",
slurm_kwargs=slurm_kwargs,
slurm_flags=["requeue"],
)
slurm.run(
f"""
{param_str}
"""
)
def _param_str_from_args(args, cmd_python, script_name):
param_str = cmd_python + ' ' + script_name + ' '
for k, v in args.items():
if isinstance(v, list):
param_str += '--' + k + ' ' + ' '.join(v) + ' '
elif v is None:
# skip: None means don't include this argument
pass
else:
param_str += '--' + k + ' ' + str(v) + ' '
return param_str
def run_on_gpu(param_str, i, n):
gpu_id = job_queue_multiprocessing.get()
failed_job = None
try:
# run on GPU <gpu_id>
ident = current_process().ident
print(f'{ident}: Starting process on GPU(s) {gpu_id}')
if isinstance(gpu_id, list):
gpu_str = ','.join([str(x) for x in gpu_id])
else:
gpu_str = str(gpu_id)
prefix = f'CUDA_VISIBLE_DEVICES={gpu_str} '
param_str = prefix + param_str
print(
f'\n\n-------------------{i + 1}/{n}--------------------\n' + param_str)
subprocess.run(
param_str, check=True, shell=True
)
except KeyboardInterrupt:
print('Keyboard interrupt, exiting...')
exit(0)
except subprocess.CalledProcessError as e:
print('CalledProcessError', e)
print(f'{ident}: Finished on GPU(s) {gpu_id}')
failed_job = (i, param_str)
finally:
job_queue_multiprocessing.put(gpu_id)
return failed_job
def get_args_list(
params_shared_dict: Dict[str, List],
params_coupled_dict: Dict[Tuple[str], List[Tuple]] = {},
) -> List[Dict[str, Any]]:
_validate_arguments(params_shared_dict, params_coupled_dict)
def combos_collapse(l: List[List[Dict]]) -> List[Dict]:
# get param combos as List[Tuple[Dict]] then convert to List[Dict]
return [
# convert List[Dict[Tuple]] -> List[Dict]
reduce(lambda a, b: {**a, **b}, dict_tup)
# get param combos as List[Tuple[Dict]]
for dict_tup in list(itertools.product(*l))
]
# Shared params as List[List[Dict]]
shared_combos_dict_list = combos_collapse(
[[{k: v} for v in params_shared_dict[k]]
for k in params_shared_dict.keys()]
)
# Coupled params as List[List[Dict]]]
coupled_combos_dict_list = [[
{k_tup[x]: v[i][x] for x in range(len(k_tup))}
for i in range(len(v))]
for k_tup, v in params_coupled_dict.items()
]
if coupled_combos_dict_list == []:
return shared_combos_dict_list
# Combine each coupled List[Dict] with the shared List[Dict]
combined_combos_dict_list = [
combos_collapse(
[coupled_combos_dict_list[i], shared_combos_dict_list])
for i in range(len(coupled_combos_dict_list))
]
args_list = sum(combined_combos_dict_list, [])
return args_list
def _validate_arguments(
params_shared_dict: Dict[str, List],
params_coupled_dict: Dict[Tuple[str], List[Tuple]],
):
for k, v in params_shared_dict.items():
if isinstance(v, range):
v = list(v)
elif isinstance(v, np.ndarray):
v = v.tolist()
assert isinstance(
k, str), f"params_shared_dict key {k} must be type list, got type {type(k)}"
assert isinstance(
v, list), f"params_shared_dict val {v} must be type list, got type {type(v)}"
for k_tup, v_tup_list in params_coupled_dict.items():
assert isinstance(
k_tup, tuple), f"params_coupled_dict key {k_tup} must be type tuple, got type {type(k_tup)}"
assert isinstance(
v_tup_list, list), f"params_coupled_dict val {v_tup_list} must be type list, got type {type(v_tup_list)}"
assert all([isinstance(x, str) for x in k_tup]
), f"params_coupled_dict k {k_tup} must only contain strings"
assert [len(
k_tup) == x for x in v_tup_list], f"params_coupled_dict k and v must have same length but got {len(k_tup)} and {len(v_tup_list)} for {k_tup} and {v_tup_list} respectively"
for k in k_tup:
assert not k in params_shared_dict, f"params_coupled_dict key {k} should not be in params_shared_dict"
for v_tup in v_tup_list:
assert len(k_tup) == len(
v_tup), f"params_coupled_dict k and v must have same length but got {len(k_tup)} and {len(v_tup)} for {k_tup} and {v_tup} respectively"
def _validate_run_arguments(
n_cpus: int,
gpu_ids: List[int],
):
assert n_cpus > 0, f"n_cpus must be greater than 0, got {n_cpus}"
assert not (n_cpus > 1 and len(gpu_ids) >
0), 'Cannot parallelize over cpus and gpus'
if len(gpu_ids) > 0:
import torch.cuda
num_gpus = torch.cuda.device_count()
assert all([isinstance(x, int) for x in gpu_ids]) or all([isinstance(x, list) for x in gpu_ids]
), f'gpu_ids {gpu_ids} must be type int or type list'
if all([isinstance(x, int) for x in gpu_ids]):
assert all([x >= 0 and x < num_gpus for x in gpu_ids]
), f'gpu_ids {gpu_ids} must be less than available gpus count {num_gpus}'
elif all([isinstance(x, list) for x in gpu_ids]):
gpu_ids_flattened = sum(gpu_ids, [])
assert all([x >= 0 and x < num_gpus for x in gpu_ids_flattened]
), f'gpu_ids {gpu_ids} must be less than available gpus count {num_gpus}'
if __name__ == '__main__':
params_shared_dict = {
'name': ['chandan', 'saloni', 'alice', 'albert', 'jessica', 'felicia', ],
}
# Single-tree sweep
params_coupled_dict = {
('dataset_name',): [
('llm_tree', )
],
}
# Args list is a list of dictionaries
args_list = get_args_list(
params_shared_dict=params_shared_dict,
params_coupled_dict=params_coupled_dict,
)
run_args_list(
args_list,
script_name=join(submit_utils_dir, '../tests/dummy_script.py'),
actually_run=True,
# n_cpus=3,
gpu_ids=[[0, 3]],
repeat_failed_jobs=True,
)
Global variables
var submit_utils_dir
-
Handles utilities for job sweeps, focused on embarassingly parallel sweeps on a single machine.
Functions
def get_args_list(params_shared_dict: Dict[str, List], params_coupled_dict: Dict[Tuple[str], List[Tuple]] = {}) ‑> List[Dict[str, Any]]
-
Expand source code
def get_args_list( params_shared_dict: Dict[str, List], params_coupled_dict: Dict[Tuple[str], List[Tuple]] = {}, ) -> List[Dict[str, Any]]: _validate_arguments(params_shared_dict, params_coupled_dict) def combos_collapse(l: List[List[Dict]]) -> List[Dict]: # get param combos as List[Tuple[Dict]] then convert to List[Dict] return [ # convert List[Dict[Tuple]] -> List[Dict] reduce(lambda a, b: {**a, **b}, dict_tup) # get param combos as List[Tuple[Dict]] for dict_tup in list(itertools.product(*l)) ] # Shared params as List[List[Dict]] shared_combos_dict_list = combos_collapse( [[{k: v} for v in params_shared_dict[k]] for k in params_shared_dict.keys()] ) # Coupled params as List[List[Dict]]] coupled_combos_dict_list = [[ {k_tup[x]: v[i][x] for x in range(len(k_tup))} for i in range(len(v))] for k_tup, v in params_coupled_dict.items() ] if coupled_combos_dict_list == []: return shared_combos_dict_list # Combine each coupled List[Dict] with the shared List[Dict] combined_combos_dict_list = [ combos_collapse( [coupled_combos_dict_list[i], shared_combos_dict_list]) for i in range(len(coupled_combos_dict_list)) ] args_list = sum(combined_combos_dict_list, []) return args_list
def run_args_list(args_list: List[Dict[str, Any]], cmd_python: str = 'python', script_name: str = '02_train_suffix.py', actually_run: bool = True, debug_mode: bool = False, shuffle: bool = False, reverse: bool = False, unique_seeds: str = None, n_cpus: int = 1, gpu_ids: Union[List[int], List[List[int]]] = [], repeat_failed_jobs: bool = False, slurm: bool = False, slurm_kwargs: Optional[Dict] = None, amlt_kwargs: Optional[Dict] = None)
-
Params
run_args_list cmd_python: str Command to run python script_name: str Name of script to run actually_run: bool Whether to actually run the script (otherwise just print the command) debug_mode: bool Whether to open debugger after failure (stops all parallelilization) shuffle: bool Whether to shuffle the order of the script calls reverse: bool Whether to reverse the order of the script calls unique_seeds: str Whether to assign random, unique values to each parameter with this value n_cpus: int Number of cpus to use (if >1, parallelizes over local machine) gpu_ids: List[int], List[List[int]] Ids of GPUs to run on (e.g. [0, 1] for 2 gpus) If List[List[int]], then each inner list is a group of GPUs to run on, e.g. [[0, 1], [2, 3]] for 2 groups of 2 GPUs repeat_failed_jobs: bool Whether to repeatedly run failed jobs run_slurm: bool Whether to run on SLURM (defaults to False) slurm_kwargs: Optional[Dict] kwargs for slurm amlt_kwargs: Optional[Dict] kwargs for amlt (will override everything else)
Expand source code
def run_args_list( args_list: List[Dict[str, Any]], cmd_python: str = 'python', script_name: str = '02_train_suffix.py', actually_run: bool = True, debug_mode: bool = False, shuffle: bool = False, reverse: bool = False, unique_seeds: str = None, n_cpus: int = 1, gpu_ids: Union[List[int], List[List[int]]] = [], repeat_failed_jobs: bool = False, slurm: bool = False, slurm_kwargs: Optional[Dict] = None, amlt_kwargs: Optional[Dict] = None, ): """ Params ------ run_args_list cmd_python: str Command to run python script_name: str Name of script to run actually_run: bool Whether to actually run the script (otherwise just print the command) debug_mode: bool Whether to open debugger after failure (stops all parallelilization) shuffle: bool Whether to shuffle the order of the script calls reverse: bool Whether to reverse the order of the script calls unique_seeds: str Whether to assign random, unique values to each parameter with this value n_cpus: int Number of cpus to use (if >1, parallelizes over local machine) gpu_ids: List[int], List[List[int]] Ids of GPUs to run on (e.g. [0, 1] for 2 gpus) If List[List[int]], then each inner list is a group of GPUs to run on, e.g. [[0, 1], [2, 3]] for 2 groups of 2 GPUs repeat_failed_jobs: bool Whether to repeatedly run failed jobs run_slurm: bool Whether to run on SLURM (defaults to False) slurm_kwargs: Optional[Dict] kwargs for slurm amlt_kwargs: Optional[Dict] kwargs for amlt (will override everything else) """ if amlt_kwargs is not None: print('Running on AMLT with', amlt_kwargs) else: n_gpus = len(gpu_ids) _validate_run_arguments(n_cpus, gpu_ids) # adjust order if shuffle: random.shuffle(args_list) if reverse: args_list = args_list[::-1] # debug mode if debug_mode: cmd_python = 'python -m pdb -c continue' if n_cpus > 1 or n_gpus > 1: print('\n###\n### Debug mode, setting n_cpus=1 and n_gpus=0 ###\n###\n') n_cpus = 1 n_gpus = 0 # assign unique seeds if unique_seeds: for i, args in enumerate(args_list): args_list[i]['seed_stories'] = random.randint(1, int(1e6)) # construct commands param_str_list = [_param_str_from_args( args, cmd_python, script_name) for args in args_list] # just print and exit if not actually_run: print('Not actually running the commands, just printing them.') for i, param_str in enumerate(param_str_list): print( f'\n\n-------------------{i + 1}/{len(param_str_list)}--------------------\n' + param_str) return failed_jobs = [] if slurm: for i, param_str in enumerate(param_str_list): print( f'\n\n-------------------{i + 1}/{len(param_str_list)}--------------------\n' + param_str) run_slurm(param_str, slurm_kwargs=slurm_kwargs) return elif amlt_kwargs is not None: assert 'amlt_file' in amlt_kwargs sku = amlt_kwargs.get('sku', 'G1') process_count_per_node = amlt_kwargs.get('process_count_per_node', 1) amlt_dir = dirname(amlt_kwargs['amlt_file']) repo_dir = dirname(amlt_dir) amlt_text = open(amlt_kwargs['amlt_file'], 'r').read() assert amlt_text.endswith('jobs:'), 'amlt file must end with jobs:' script_name = script_name.replace(repo_dir, '').strip('/') param_str_list = [_param_str_from_args( args, cmd_python, script_name) for args in args_list] if 'mnt_rename' in amlt_kwargs: param_str_list = [ param_str.replace( amlt_kwargs['mnt_rename'][0], amlt_kwargs['mnt_rename'][1]) for param_str in param_str_list ] # save yaml file with multiple jobs in logs dir and run with amlt logs_dir = join(amlt_dir, 'logs') os.makedirs(logs_dir, exist_ok=True) job_template = ''' - name: {name} process_count_per_node: {process_count_per_node} sku: {sku} command: - echo "{param_str}" - {param_str}''' out_file = join(logs_dir, sha256({'s': str(param_str_list)}) + '.yaml') s = amlt_text for i, param_str in enumerate(param_str_list): job_text = job_template.format( name=f'{sku}_job_{i}', process_count_per_node=process_count_per_node, sku=sku, param_str=param_str ) s = s + job_text s = s.replace('$CONFIG_DIR', '$CONFIG_DIR/..') with open(out_file, 'w') as f: f.write(s) subprocess.run( f'amlt run {out_file}', shell=True, check=True, ) return # run serial elif n_cpus == 1 and n_gpus == 0: for i, param_str in enumerate(param_str_list): print( f'\n\n-------------------{i + 1}/{len(param_str_list)}--------------------\n' + param_str) try: output = subprocess.run( param_str, shell=True, check=True, ) except KeyboardInterrupt: print('Keyboard interrupt, exiting...') exit(0) except subprocess.CalledProcessError as e: print('CalledProcessError', e) failed_jobs.append((i, param_str)) except Exception as e: print(e) # run parallel on CPUs elif n_cpus > 1 and n_gpus == 0: def run_single_job(i, param_str): print( f'\n\n-------------------{i + 1}/{len(param_str_list)}--------------------\n' + param_str) try: output = subprocess.run( param_str, shell=True, check=True, ) except subprocess.CalledProcessError as e: print('CalledProcessError', e) failed_jobs.append((i, param_str)) except KeyboardInterrupt: print('Keyboard interrupt, exiting...') exit(0) except Exception as e: print(e) pool = ThreadPool(n_cpus) for i, param_str in enumerate(param_str_list): pool.apply_async(run_single_job, (i, param_str, )) pool.close() pool.join() # run parallel on GPUs elif n_gpus > 0: # initialize the queue with the GPU ids global job_queue_multiprocessing job_queue_multiprocessing = Queue() for gpu_id in gpu_ids: job_queue_multiprocessing.put(gpu_id) # call the jobs pool = Pool(processes=n_gpus) n = len(param_str_list) indexes = [i for i in range(n)] args = zip(param_str_list, indexes, repeat(n)) # time.sleep(0.1) for failed_job in pool.starmap(run_on_gpu, args, chunksize=1): failed_jobs.append(failed_job) failed_jobs = [x for x in failed_jobs if x is not None] pool.close() pool.join() print('failed_jobs', failed_jobs) # final printing print('\n\n\n*********************Done*********************') if len(failed_jobs) == 0: print('All jobs succeeded!') else: print(len(failed_jobs), 'Failed jobs\n\n') for (i, param_str) in failed_jobs: print('\t', param_str) # print('\t', repr(e)) failed_args_list = [args_list[i] for (i, _) in failed_jobs] if repeat_failed_jobs: print('Repeating failed jobs...') run_args_list( failed_args_list, cmd_python=cmd_python, script_name=script_name, actually_run=actually_run, shuffle=shuffle, reverse=reverse, n_cpus=n_cpus, gpu_ids=gpu_ids, repeat_failed_jobs=repeat_failed_jobs, )
def run_on_gpu(param_str, i, n)
-
Expand source code
def run_on_gpu(param_str, i, n): gpu_id = job_queue_multiprocessing.get() failed_job = None try: # run on GPU <gpu_id> ident = current_process().ident print(f'{ident}: Starting process on GPU(s) {gpu_id}') if isinstance(gpu_id, list): gpu_str = ','.join([str(x) for x in gpu_id]) else: gpu_str = str(gpu_id) prefix = f'CUDA_VISIBLE_DEVICES={gpu_str} ' param_str = prefix + param_str print( f'\n\n-------------------{i + 1}/{n}--------------------\n' + param_str) subprocess.run( param_str, check=True, shell=True ) except KeyboardInterrupt: print('Keyboard interrupt, exiting...') exit(0) except subprocess.CalledProcessError as e: print('CalledProcessError', e) print(f'{ident}: Finished on GPU(s) {gpu_id}') failed_job = (i, param_str) finally: job_queue_multiprocessing.put(gpu_id) return failed_job
def run_slurm(param_str, slurm_kwargs)
-
Expand source code
def run_slurm(param_str, slurm_kwargs): from slurmpy import Slurm slurm = Slurm( f"imodelsx_job_{time.time()}", slurm_kwargs=slurm_kwargs, slurm_flags=["requeue"], ) slurm.run( f""" {param_str} """ )