"""This module implements the core of the command interpreter and any part
essential for running it
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import copy
from distutils.spawn import find_executable
import logging
import shlex
from string import Template
import subprocess as sp
import time
import textwrap
import types
import pyhetdex.tools.processes as proc
import six
from six.moves import copyreg
from . import exceptions
from . import types as citypes
from .relay import get_relay
__all__ = ["CommandInterpreter"]
[docs]class CommandInterpreter(object):
"""Interpret and execute the command.
See :ref:`interpreter` section in the documentation for more details
All the custom errors are defined in
:mod:`vdat.command_interpreter.exceptions`. The ones raised in the
constructor are derived from from :exc:`~.exceptions.CIValidationError`,
Parameters
----------
command : string
command to parse
command_config : dict
dictionary containing the instructions to execute the ``command``. A
deep copy is executed
selected : list-like, optional
None or a list of the selected items; if ``None`` no filtering of the
primary files is done; otherwise must be an object supporting the
membership test operator ``in``.
Raises
------
CINoExeError
if the executable does not exists
CIParseError
if there is some error when extracting the keywords
CIKeywordError
for missing keywords or for keywords of wrong type
CIKeywordTypeError
if the type of the keywords is not among the known ones
"""
primary_types = citypes.PrimaryTypes()
keyword_types = citypes.KeywordTypes()
execute_types = citypes.ExecuteTypes()
# the public interface
def __init__(self, command, command_config, selected=None):
self.command = command
self.exe = self.original_exe = command.split()[0]
self.config = copy.deepcopy(command_config)
self.selected = selected
# check the executable
self._replace_alias()
self._check_exe()
# extract the keys
self._keys = self._get_keys(self.command)
# check that all the required keywords are in the configuration and
# that all the mandatory ones exists
self._validate_mandatory()
self._validate_keywords()
# get the functions to deal with the keywords
self._primary_func, self._keyword_map = self._key_types(self._keys)
# check if the selection needs to happen and that the ``selected`` type
# is acceptable
self.filter_func = self._filter_selected()
# get the function to use to decide whether to run on step or not
self.execute = self._execute()
# create the template
self.template = Template(self.command)
[docs] def run(self):
"""Collect the files, expand and run the required command
All the custom errors raised here derive from
:exc:`~.exceptions.CIRunError`.
Raises
------
CICommandFmtError
if the substitution of the command doesn't work
"""
target_dir = self._get_value_as_dict('target_dir')['value']
primaries = self._primary_func(target_dir,
self.config[self.config['primary']])
# filter the primaries according to the ``selected`` list
primaries = list(filter(self.filter_func, primaries))
n_tot = len(primaries)
if n_tot == 0:
msg = "No primary file collected"
get_relay('logger').emit(logging.WARNING, msg)
worker = proc.get_worker()
n_exists = 0
for i, fn in enumerate(primaries):
try:
do_execute = self.execute(fn)
except Exception as e:
msg = ('Failed to decide to run function to decide whether'
' to execute the next step of the reduction because of'
' {}'.format(e))
get_relay('logger').emit(logging.WARNING, msg)
do_execute = True
if do_execute:
worker(self._run, i, fn)
else:
n_exists += 1
ndone, nerror, _ = worker.jobs_stat()
get_relay('progress').emit(n_tot, ndone + n_exists, nerror)
ndone, nerror, _ = worker.jobs_stat()
while (ndone + n_exists) < n_tot:
ndone, nerror, _ = worker.jobs_stat()
get_relay('progress').emit(n_tot, ndone + n_exists, nerror)
time.sleep(0.25)
worker.wait()
worker.clear_jobs()
msg = textwrap.dedent("""
<b> {} </b> run {} times with {} failures.
Skipped {} times, because the output file(s) already existed.
Logs are under {}.""").format(self.original_exe, ndone, nerror,
n_exists, self.exe)
get_relay('logger').emit(logging.INFO, msg)
# init helpers
[docs] def _replace_alias(self):
"""If the command configuration has the ``is_alias_of`` replace the
executable name
"""
try:
alias_name = self.config['is_alias_of']
self.command = self.command.replace(self.exe, alias_name, 1)
self.exe = alias_name
except KeyError:
"""No alias"""
pass
[docs] def _check_exe(self):
"""Check that the executable can be found and replace it with the full
path returned by :func:`distutils.spawn.find_executable`"""
exe = find_executable(self.exe)
if exe:
# if the command exists, replace it in the command string
self.command.replace(self.exe, exe, 1)
else:
raise exceptions.CINoExeError(self.exe)
[docs] def _get_keys(self, command):
"""Extract the keywords from the command"""
keyGroups = Template.pattern.findall(command)
keys = []
for k in keyGroups:
kjoin = ''.join(k).strip()
if not kjoin:
msg = ("In the command there is an isolated '$'" " that is not"
" allowed by the keyword" " expansion. Use '$$' as an"
" escape")
raise exceptions.CIParseError(msg)
elif kjoin != '$':
keys.append(kjoin)
return keys
[docs] def _validate_mandatory(self):
"""Check that all the mandatory keywords are provided.
If ``mandatory`` is not found, return without doing anything
"""
if 'primary' not in self.config:
msg = ("The mandatory keyword 'primary' is not in the"
" configuration. Please provide it."
" If you think that there are good reasons not to have the"
" 'primary' keyword as mandatory, please contact the"
" developers with your case.")
raise exceptions.CIKeywordValidationError(msg)
if self.config['primary'] not in self.config:
msg = ("The keyword '{}' pointed to by 'primary' is not in the"
" configuration. Please provide it.")
msg = msg.format(self.config['primary'])
raise exceptions.CIKeywordValidationError(msg)
if 'mandatory' not in self.config:
return
# if the mandatory keys are not a subset of the given keys, complain
missing_mandatory = set(self.config['mandatory']) - set(self._keys)
if missing_mandatory:
msg = ("The mandatory key(s) '{}' is(are) missing from the"
" provided command".format(', '.join(missing_mandatory)))
raise exceptions.CIKeywordValidationError(msg)
[docs] def _validate_keywords(self):
"""Check that all the requested keywords are in the configuration"""
missing_keys = set(self._keys) - set(self.config.keys())
if missing_keys:
msg = ("One or more of the keywords ({}) is unknown."
" Edit the command or the configuration to"
" sync the keys.".format(", ".join(missing_keys)))
raise exceptions.CIKeywordValidationError(msg)
[docs] def _key_types(self, keys):
"""Scan the keys and check that the interpreter knows how to deal with
them.
Parameters
----------
keys : list of strings
keys extracted from the command
Returns
-------
primary_func : callable
function to call to get the list of primary files
keyword_map : dictionary
map non-primary keywords to the function used to handle them
Raises
------
CIKeywordTypeError
if the type of the keyword is not known
"""
# get the function to use to get the primary key
primary_key = self.config['primary']
primary_value = self._get_value_as_dict(primary_key)
try:
primary_func = self.primary_types[primary_value['type']]
except KeyError as e:
msg = ("Unknown primary type '{}'; either edit the configuration"
" file or create the function to handle it. Consult the"
" documentation for more info.")
msg = msg.format(primary_value['type'])
six.raise_from(exceptions.CIKeywordTypeError(msg), e)
# loop through all the keywords, excluding the primary, and map the
# type with the function to handle it
keyword_map = {}
for k in keys:
if k == primary_key: # skip the primary key
continue
value = self._get_value_as_dict(k)
try:
keyword_map[k] = self.keyword_types[value['type']]
except KeyError as e:
msg = ("Unknown type '{}' for keyword '{}'; either edit the"
" configuration file or create the function to handle"
" it. Consult the documentation for more info.")
msg = msg.format(value['type'], k)
six.raise_from(exceptions.CIKeywordTypeError(msg), e)
return primary_func, keyword_map
[docs] def _get_value_as_dict(self, key):
"""Get the value of ``key`` from the configuration.
If it's a string, convert it to a dictionary with two entries:
* type: ``plain``
* value: ``value``
and re-add it in the configuration dictionary
Parameters
----------
key : string
key to get
Returns
-------
value : dictionary
dictionary defining the type
Raises
------
CIKeywordError
if ``value`` is not a dictionary or a string
"""
value = self.config[key]
if isinstance(value, six.string_types):
value = {'type': 'plain', 'value': value}
self.config[key] = value
if 'type' not in value:
msg = ("The value associated with the keyword '{}' is either of"
" the wrong type or doesn't have the required ``type`` key")
raise exceptions.CIKeywordError(msg.format(key))
return value
[docs] def _filter_selected(self):
"""Look for the existence of the ``filter_selected`` option and check
that it is of the correct type and that ``selected`` is of the correct
type
Returns
-------
filter_func : function
function that accepts one string (one element of the primary list)
and returns ``True`` or ``False`` if that element is valid or not
"""
if self.selected is None or 'filter_selected' not in self.config:
return self._true
# otherwise check that ``selected`` type is corrected
try:
'' in self.selected
except TypeError:
msg = ("The type of ``selected`` is not correct. It must support"
" the ``in`` statement")
raise exceptions.CIValidationError(msg)
# get the function to deal with ``filter_selected``
value = self._get_value_as_dict('filter_selected')
try:
filter_ = self.keyword_types[value['type']]
except KeyError as e:
msg = ("Unknown type '{}' for keyword '{}'; either edit the"
" configuration file or create the function to handle"
" it. Consult the documentation for more info.")
msg = msg.format(value['type'], 'filter_selected')
six.raise_from(exceptions.CIKeywordTypeError(msg), e)
def filter_func(primary):
return filter_(primary, value) in self.selected
return filter_func
[docs] def _execute(self):
"""Look for the existence of the ``execute`` option in the
configuration and check that it is of the correct type.
Returns
-------
execute_func : function
function that accepts one string (one element of the primary list)
and returns ``True`` or ``False`` if that element must be run or
not
"""
if 'execute' not in self.config:
return self._true
# get the function to deal with ``execute``
value = self._get_value_as_dict('execute')
try:
execute_ = self.execute_types[value['type']]
except KeyError as e:
msg = ("Unknown type '{}' for keyword '{}'; either edit the"
" configuration file or create the function to handle"
" it. Consult the documentation for more info.")
msg = msg.format(value['type'], 'execute')
six.raise_from(exceptions.CIKeywordTypeError(msg), e)
def execute_func(primary):
return execute_(primary, self.config)
return execute_func
# run helpers
[docs] def _run(self, index, primary):
"""Job that create the keywords, create and run the command string and
log it
Parameters
----------
index : int
index of the current job
primary : string
primary file(s) to pass to the functions handling the keywords
"""
log = logging.getLogger(self.exe)
template_dict = {}
template_dict[self.config['primary']] = primary
for key, func in six.iteritems(self._keyword_map):
try:
template_dict[key] = func(primary, self.config[key])
except Exception as e:
msg = 'Failed to handle key {} because of {}'
msg = msg.format(key, str(e))
log.critical(msg)
six.raise_from(exceptions.CIKeywordError(msg), e)
try:
command = self.template.substitute(**template_dict)
except KeyError as e:
msg = ("Failed to replace key '{}' in the string '{}' when"
" creating the command to execute").format(e.args[0],
self.command)
log.critical(msg)
six.raise_from(exceptions.CICommandFmtError(msg), e)
get_relay('command_string').emit(index, command)
log.info("running: %s", command)
try:
split_command = shlex.split(command)
except ValueError as e:
msg = "Unable to split the command '{}' because of '{}'"
msg = msg.format(command, e.args[0])
log.critical(msg)
six.raise_from(exceptions.CICommandFmtError(msg), e)
try:
p = sp.Popen(split_command, stdout=sp.PIPE, stderr=sp.PIPE,
universal_newlines=True)
stdout, stderr = p.communicate()
except Exception:
log.critical("Command '%s' failed with the following error",
command, exc_info=True)
raise
if stdout:
log.info(stdout)
if stderr:
log.error(stderr)
if p.returncode > 0:
log.critical("Code crashed with return code %d", p.returncode)
# generic helpers
[docs] def _true(self, *_, **__):
"""returns always true"""
return True
if six.PY2: # pragma: no cover
def _pickle_method(m): # pragma: no cover
"""In python 2 it's not possible to pickle instance methods directly.
This function does some magic to do it.
Only for python 2 is this function registers to allow methods pickling
Copied from `Stack Overflow
<http://stackoverflow.com/questions/25156768/cant-pickle-type-instancemethod-using-pythons-multiprocessing-pool-apply-a>`_
"""
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copyreg.pickle(types.MethodType, _pickle_method)