# Copyright 2020 Alex Harvill
# SPDX-License-Identifier: Apache-2.0
'''
batch and commandline utilities
'''
from __future__ import print_function
import gc
import os
import ssl
import sys
import site
import shlex
import logging
import warnings
import argparse
import platform
import resource
import subprocess
import time
if sys.version_info.major < 3 or sys.version_info.minor < 5:
warnings.warn('old python')
#pylint: disable=wrong-import-position
from pathlib import Path
try:
import numpy
except ImportError:
numpy = None
try:
from pynvml.smi import nvidia_smi
except ImportError:
nvidia_smi = None
CODE_RESET = '\033[0m'
CODE_BLACK = '\033[1;30m'
CODE_RED = '\033[1;31m'
CODE_GREEN = '\033[1;32m'
CODE_YELLOW = '\033[1;33m'
CODE_BLUE = '\033[1;34m'
CODE_MAGENTA = '\033[1;35m'
CODE_CYAN = '\033[1;36m'
CODE_WHITE = '\033[1;37m'
RUN_CMD_ALWAYS = 'RUN_CMD_ALWAYS'
RUN_CMD_CONFIRM = 'RUN_CMD_USER_CONFIRMATION'
RUN_CMD_NEVER = 'RUN_CMD_NEVER'
USER_CONFIRM_ALWAYS = False
[docs]def confirm(run_mode, cmd_str):
'optionally ask user for confirmation with info about a cmd about to be run'
#pylint: disable=global-statement
global USER_CONFIRM_ALWAYS
if run_mode == RUN_CMD_NEVER:
return False
if not USER_CONFIRM_ALWAYS and run_mode == RUN_CMD_CONFIRM:
c = input('run command [%s] ? (N)o / (Y)es / (A)lways:' % (cmd_str))
if not isinstance(c, str) or c == '':
return False
c = c.lower()
if c == 'n':
return False
if c == 'a':
USER_CONFIRM_ALWAYS = True
return True
[docs]def color_text(text, color, fmt=None):
'if color control string is not None, wrap like so: color|text|color_rest'
if text is None:
return None
if fmt is not None:
text = fmt % (text)
if color is None:
return str(text)
return color + str(text) + CODE_RESET
[docs]def black_text(text, **kwargs):
'wrap text in terminal encoding characters'
return color_text(text, CODE_BLACK, **kwargs)
[docs]def red_text(text, **kwargs):
'wrap text in terminal encoding characters'
return color_text(text, CODE_RED, **kwargs)
[docs]def green_text(text, **kwargs):
'wrap text in terminal encoding characters'
return color_text(text, CODE_GREEN, **kwargs)
[docs]def yellow_text(text, **kwargs):
'wrap text in terminal encoding characters'
return color_text(text, CODE_YELLOW, **kwargs)
[docs]def blue_text(text, **kwargs):
'wrap text in terminal encoding characters'
return color_text(text, CODE_BLUE, **kwargs)
[docs]def magenta_text(text, **kwargs):
'wrap text in terminal encoding characters'
return color_text(text, CODE_MAGENTA, **kwargs)
[docs]def cyan_text(text, **kwargs):
'wrap text in terminal encoding characters'
return color_text(text, CODE_CYAN, **kwargs)
[docs]def white_text(text, **kwargs):
'wrap text in terminal encoding characters'
return color_text(text, CODE_WHITE, **kwargs)
[docs]def color_code_stdout(color_code):
'write color code to stdout and flush'
if color_code is not None:
sys.stdout.write(color_code)
sys.stdout.flush()
[docs]def reset_color_code_stdout(color):
'reset stdout to non normal color code mode flush'
if color:
sys.stdout.write(CODE_RESET)
sys.stdout.flush()
[docs]def execute(
cmd,
run_mode=RUN_CMD_ALWAYS,
cwd=None,
output=False,
color=True,
log_level=logging.DEBUG,
env=None,
):
'''
execute a subprocess with
logging of commandline before output
optional color coded output
optional current working directory override
a run mode that can disable execution, ask for user confirmation, or execute
'''
nottext = color_text('not', CODE_RED) if color else 'not'
cmd = [str(x) for x in cmd]
cmd_str = color_text(subprocess.list2cmdline(cmd), CODE_GREEN)
go = confirm(run_mode, cmd_str)
verb = 'running' if go else nottext + ' running'
highlight_color = (CODE_YELLOW if go else CODE_GREEN) if color else None
result_color = CODE_CYAN if color else None
cwd_str = color_text(cwd, highlight_color)
cmd_str = color_text(subprocess.list2cmdline(cmd), highlight_color)
if cwd is None:
logging.log(log_level, '%s [%s]', verb, cmd_str)
else:
logging.log(log_level, 'from [%s] %s [%s]', cwd_str, verb, cmd_str)
if not go:
return None
if not output:
color_code_stdout(result_color)
try:
subprocess.check_call(cmd, cwd=cwd, env=env)
finally:
reset_color_code_stdout(color)
else:
return subprocess.check_output(cmd, cwd=cwd, env=env)
return None
[docs]def execute_multiline_str(**kwargs):
'wraps execute by converting multiline "cmd" kwarg to strings'
cmd = kwargs.pop('cmd')
if cmd is None:
raise ValueError('expected multiline string keyword arg "cmd"')
lines = cmd.split('\n')
lines = [x.strip() for x in lines]
lines = [x for x in lines if not x.startswith('#')]
cmd = ' '.join(lines)
cmd = cmd.split(' ')
cmd = [x for x in cmd if x] # remove empty argv
execute(cmd, **kwargs)
[docs]def execute_callback(
message,
callback,
args,
kwargs,
run_mode=RUN_CMD_ALWAYS,
color=True,
log_arguments=True,
log_time=False,
log_level=logging.DEBUG,
):
'''
execute a python function with
a run mode that can disable execution, ask for user confirmation, or execute
'''
nottext = color_text('not', CODE_RED) if color else 'not'
go = confirm(run_mode, message)
verb = 'calling' if go else nottext + ' calling'
if log_arguments:
logging.log(
log_level,
'%s [%s.%s] with args %s and kwargs %s',
verb,
callback.__module__,
callback.__name__,
args,
kwargs,
)
else:
logging.log(
log_level,
'%s [%s.%s] to %s',
verb,
callback.__module__,
callback.__name__,
message,
)
if not go:
return None
if log_time:
with T(message + ' total'):
result = callback(*args, **kwargs)
else:
result = callback(*args, **kwargs)
return result
[docs]def set_log_level(level):
'set the global logging level'
logging.getLogger('').setLevel(level)
[docs]def setup_logging(
level=logging.DEBUG,
setup_matplotlib=True,
setup_lambda=False,
numpy_precision=3,
numpy_suppress=True,
numpy_linewidth=75,
stream=None,
color=True,
force_warning_modules=(
'boto3',
'botocore',
's3transfer',
'urllib3',
'websockets',
),
):
'setup reasonable logging defaults'
if setup_lambda:
color = False
logger = logging.getLogger()
logger.setLevel(level)
logger.propagate = False
for modname in force_warning_modules:
modlogger = logging.getLogger(modname)
modlogger.setLevel(logging.WARNING)
elif level == logging.INFO:
logging.basicConfig(level=logging.INFO, format='%(message)s', stream=stream)
else:
logging.basicConfig(
level=level,
format='%(levelname)s %(message)s',
stream=stream,
)
logger = logging.getLogger()
logger.propagate = False
for modname in force_warning_modules:
modlogger = logging.getLogger(modname)
modlogger.setLevel(logging.WARNING)
if setup_matplotlib:
# force matplotlib to never show debug info!
mpl_logger = logging.getLogger('matplotlib')
mpl_logger.setLevel(logging.WARNING)
for num, name, color_code in [
(logging.CRITICAL, 'BAD ', CODE_RED),
(logging.ERROR, 'err ', CODE_RED),
(logging.WARNING, 'warn', CODE_WHITE),
(logging.INFO, 'info', CODE_BLACK),
(logging.DEBUG, 'dbg ', CODE_BLACK),
]:
#name = logging.getLevelName(num).lower().ljust(8)
resolved_name = name
if color:
resolved_name = color_text(name, color_code)
logging.addLevelName(num, resolved_name)
if numpy is not None:
numpy.set_printoptions(
precision=numpy_precision,
suppress=numpy_suppress,
linewidth=numpy_linewidth,
)
[docs]def setup_patching(setup_ssl=True):
'''
follow this guide to make sure models can be downloaded without error:
https://github.com/fchollet/deep-learning-models/issues/33#issuecomment-397257502
'''
if setup_ssl:
#pylint: disable=W0212
ssl._create_default_https_context = ssl._create_unverified_context
[docs]def setup_tensorflow():
'make tensorflow silent unless TF_CPP_MIN_LOG_LEVEL envvar found'
tf_log_key = 'TF_CPP_MIN_LOG_LEVEL'
tf_logger = logging.getLogger('tensorflow')
if tf_log_key not in os.environ:
os.environ[tf_log_key] = '3'
tf_logger.setLevel(logging.INFO)
else:
tf_logger.setLevel(logging.DEBUG)
# redirect stdout/stderr, import keras, then restore stdout/stderr
# avoids keras cluttering up the console during version or other query cmds
save_stdout, save_stderr = sys.stdout, sys.stderr
try:
sys.stdout = open(os.devnull, 'w')
sys.stderr = sys.stdout
#pylint: disable=unused-import,import-outside-toplevel
import tensorflow.keras
finally:
sys.stdout, sys.stderr = save_stdout, save_stderr
[docs]class HELP_FMT(
argparse.ArgumentDefaultsHelpFormatter,
argparse.RawTextHelpFormatter,
):
'''
composite class to provide both default args in help and raw help strings
goes to crazy lengths to split up lists of choices...
'''
def format_help(self):
tmp = argparse.HelpFormatter.format_help(self)
result = []
for line in tmp.split('\n'):
if '[' in line and '{' in line and line.count(',') > 5:
test = line
total_whitespace = line.count(' ')
test = test.strip()
leading = total_whitespace - test.count(' ')
if test[0] == '[' and test[-1] == ']':
test = test[1:-1]
# use shlex to hanle list tokenizing
# by turning lists into strings
test = test.replace('[', '"')
test = test.replace(']', '"')
test = test.replace('{', "'")
test = test.replace('}', "'")
test = test.replace(' ...', '')
parts = shlex.split(test, comments=False)
# remove crazy duplication of the same list
A = parts[-1]
B = parts[-2]
C = "'%s'" % (B)
if A == C:
parts.pop()
norm_line = ' '.join(parts)
indent = ' ' * leading
line = indent + ('\n ' + indent).join(norm_line.split(','))
result.append(line)
return '\n'.join(result)
VERBOSE_MAP = {0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG}
[docs]def add_verbose_parse_arg(parser):
'add verbosity levels to a parser'
if not getattr(parser, 'vm_build_utils_has_verbose', False):
parser.add_argument(
'-v',
'--verbose',
action='count',
help='verbose level... repeat up to 2 times',
)
parser.vm_build_utils_has_verbose = True
[docs]def set_log_level_from_args(args):
'args is a command line parser result - use it to configure logging'
if args.verbose is None:
args.verbose = 0
set_log_level(VERBOSE_MAP[args.verbose])
[docs]def add_run_mode_parse_arg(parser):
'add controls to run sub commands / persistent system operations'
if not getattr(parser, 'vm_build_utils_has_run_mode', False):
RUN_MODE_GROUP = parser.add_mutually_exclusive_group()
RUN_MODE_GROUP.add_argument(
'--run-never',
action='store_true',
help='no actions will be taken, only logging will be performed')
RUN_MODE_GROUP.add_argument(
'--run-confirm',
action='store_true',
help='actions will be performed with user confirmation')
RUN_MODE_GROUP.add_argument(
'--run-always',
action='store_true',
help='actions will be performed always [ default ]',
)
parser.vm_build_utils_has_run_mode = True
[docs]def setup_run_mode(args):
'args is a command line parser result - use it to configure the run mode'
if not args.run_confirm and not args.run_never:
args.run_always = True
result = None
if args.run_never:
result = RUN_CMD_NEVER
elif args.run_confirm:
result = RUN_CMD_CONFIRM
elif args.run_always:
result = RUN_CMD_ALWAYS
else:
raise ValueError('one of [run-never,run-confirm,run-always] must be True')
return result
[docs]def add_file_logging_parse_arg(parser):
'add file logging output + verbosity to a parser'
if not getattr(parser, 'vm_build_utils_has_file_log', False):
parser.add_argument(
'-fv',
'--file-verbose',
action='count',
help='verbose level for --file-log ... repeat up to 2 times',
)
parser.add_argument(
'--file-log',
default=None,
type=Path,
help='direct logging stream to this file in addition to stderr',
)
parser.vm_build_utils_has_file_log = True
[docs]def set_file_logging_from_args(args):
'args is a command line parser result - use it to configure file logging'
if args.file_log is None:
return
if args.file_verbose is None:
args.file_verbose = 0
level = VERBOSE_MAP[args.file_verbose]
file_log = logging.FileHandler(args.file_log, mode='w')
file_log.setLevel(level)
file_log.setFormatter(
logging.Formatter('%(levelname)s %(message)s', None, '%'))
logging.getLogger('').addHandler(file_log)
[docs]def finish_args(parser):
'add common arguments to a parser if not already added: verbose, run_mode'
add_verbose_parse_arg(parser)
add_file_logging_parse_arg(parser)
add_run_mode_parse_arg(parser)
return parser
[docs]def log_parsed_args(args_namespace, level=logging.DEBUG):
'log each elemenet in an argparser namespace'
items = dict(vars(args_namespace)).items()
key_whitespace_len = -1
for key, _ in items:
key_whitespace_len = max(key_whitespace_len, len(key))
key_whitespace_len += 2
newline_whitespace_len = key_whitespace_len + 6
newline_whitespace = ''.join(['\n'] + [' '] * newline_whitespace_len)
for key, value in items:
tmp = str(value)
if isinstance(value, list):
tmp = newline_whitespace.join([str(c) for c in value])
logging.log(level, '%s[%s]', key.rjust(key_whitespace_len), tmp)
[docs]def parse_args(parser, args=None, parse_known_args=False, return_unknown=False):
'parse, handle logging and run mode arguments'
finish_args(parser)
if parse_known_args:
args, unknown = parser.parse_known_args(args=args)
else:
args = parser.parse_args(args=args)
set_log_level_from_args(args)
set_file_logging_from_args(args)
args.run_mode = setup_run_mode(args)
if return_unknown:
return args, unknown
return args
KB = float(10**3)
GB = float(10**9) # 1000000000
MiB = float(2**20) # 1048576
GiB = float(2**30) # 1073741824
[docs]def get_rss_and_total():
'resident and total physical memory in GB'
try:
total = (os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')) / GB
except ValueError:
total = -1
return (get_rss(), total)
[docs]def get_gpu_used_and_total():
'total physical memory in GB'
if nvidia_smi is None:
return 0, 0
nvsmi = nvidia_smi.getInstance()
qresult = nvsmi.DeviceQuery('memory.used, memory.total')
mem = qresult['gpu'][0]['fb_memory_usage']
assert mem['unit'] == 'MiB'
used = (mem['used'] * MiB) / GiB
total = (mem['total'] * MiB) / GiB
return used, total
[docs]class T(object):
'simple timer'
def __init__(self, name, level=logging.INFO):
self.name = name
self.start = self.end = self.interval = 0
self.level = level
def __enter__(self):
self.start = time.perf_counter()
return self
def __exit__(self, *args):
self.end = time.perf_counter()
self.interval = self.end - self.start
gc.collect()
rss, total = get_rss_and_total()
gpu_used, gpu_total = get_gpu_used_and_total()
logging.log(
self.level,
'%s [%s sec] [%s/%s GB] [%s/%s GB gpu]',
self.name.rjust(40),
yellow_text('% 7.2f' % (self.interval)),
yellow_text('% 6.2f' % (rss)),
yellow_text('%02.2f' % (total)),
yellow_text('% 6.2f' % (gpu_used)),
yellow_text('%02.2f' % (gpu_total)),
)
[docs]def remove_prefix(value, prefix):
'remove string prefix'
if value.startswith(prefix):
return value[len(prefix):]
return value
[docs]def get_sitepackages_path():
'get path to python site-packages directory'
try:
return site.getsitepackages()[0]
except AttributeError:
for path in sys.path:
if 'local' in path:
continue
if 'site-packages' in path:
return path
raise ValueError('no site packages found')
[docs]def executable_path():
'get a path to the python interpreter than can be tweaked via env var'
result = sys.executable
override = os.environ.get('VM_EXECUTABLE')
if override is not None:
result = override
result = str(result)
result = remove_prefix(result, '/System/Volumes/Data')
return Path(result)
[docs]def project_path_components():
'validate and return paths related to /comet/PROJECT/env/DEVREL/bin/python'
template_path = '"/comet/PROJECT/env/DEVREL/bin/python"'
err_msg = 'python path must be of the form %s' % (template_path)
python_exec = executable_path()
assert len(python_exec.parts) >= 6, err_msg
user_parts = python_exec.parts[:-4]
_env, dev_rel, _bin, _python = python_exec.parts[-4:]
assert (_env, _bin, _python) == ('env', 'bin', 'python'), err_msg
return user_parts, dev_rel
[docs]def project_path():
'abs path relative to the directory containing env/container/bin/python'
user_parts, _ = project_path_components()
return Path().joinpath(*user_parts)
[docs]def env_root(rel_path=''):
'abs path relative to the directory containing bin/python'
python_exec = executable_path()
bin_path = python_exec.parent.resolve()
env = bin_path.parent
if rel_path:
result = env / rel_path
else:
result = env
return result