Source code for vdat.command_interpreter.core

"""This module implements the core of the command interpreter and any part
essential for running it
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import copy
from distutils.spawn import find_executable
import logging
import shlex
from string import Template
import subprocess as sp
import time
import textwrap
import types

import pyhetdex.tools.processes as proc
import six
from six.moves import copyreg

from . import exceptions
from . import types as citypes
from .relay import get_relay

__all__ = ["CommandInterpreter"]


[docs]class CommandInterpreter(object): """Interpret and execute the command. See :ref:`interpreter` section in the documentation for more details All the custom errors are defined in :mod:`vdat.command_interpreter.exceptions`. The ones raised in the constructor are derived from from :exc:`~.exceptions.CIValidationError`, Parameters ---------- command : string command to parse command_config : dict dictionary containing the instructions to execute the ``command``. A deep copy is executed selected : list-like, optional None or a list of the selected items; if ``None`` no filtering of the primary files is done; otherwise must be an object supporting the membership test operator ``in``. Raises ------ CINoExeError if the executable does not exists CIParseError if there is some error when extracting the keywords CIKeywordError for missing keywords or for keywords of wrong type CIKeywordTypeError if the type of the keywords is not among the known ones """ primary_types = citypes.PrimaryTypes() keyword_types = citypes.KeywordTypes() execute_types = citypes.ExecuteTypes() # the public interface def __init__(self, command, command_config, selected=None): self.command = command self.exe = self.original_exe = command.split()[0] self.config = copy.deepcopy(command_config) self.selected = selected # check the executable self._replace_alias() self._check_exe() # extract the keys self._keys = self._get_keys(self.command) # check that all the required keywords are in the configuration and # that all the mandatory ones exists self._validate_mandatory() self._validate_keywords() # get the functions to deal with the keywords self._primary_func, self._keyword_map = self._key_types(self._keys) # check if the selection needs to happen and that the ``selected`` type # is acceptable self.filter_func = self._filter_selected() # get the function to use to decide whether to run on step or not self.execute = self._execute() # create the template self.template = Template(self.command)
[docs] def run(self): """Collect the files, expand and run the required command All the custom errors raised here derive from :exc:`~.exceptions.CIRunError`. Raises ------ CICommandFmtError if the substitution of the command doesn't work """ target_dir = self._get_value_as_dict('target_dir')['value'] primaries = self._primary_func(target_dir, self.config[self.config['primary']]) # filter the primaries according to the ``selected`` list primaries = list(filter(self.filter_func, primaries)) n_tot = len(primaries) if n_tot == 0: msg = "No primary file collected" get_relay('logger').emit(logging.WARNING, msg) worker = proc.get_worker() n_exists = 0 for i, fn in enumerate(primaries): try: do_execute = self.execute(fn) except Exception as e: msg = ('Failed to decide to run function to decide whether' ' to execute the next step of the reduction because of' ' {}'.format(e)) get_relay('logger').emit(logging.WARNING, msg) do_execute = True if do_execute: worker(self._run, i, fn) else: n_exists += 1 ndone, nerror, _ = worker.jobs_stat() get_relay('progress').emit(n_tot, ndone + n_exists, nerror) ndone, nerror, _ = worker.jobs_stat() while (ndone + n_exists) < n_tot: ndone, nerror, _ = worker.jobs_stat() get_relay('progress').emit(n_tot, ndone + n_exists, nerror) time.sleep(0.25) worker.wait() worker.clear_jobs() msg = textwrap.dedent(""" <b> {} </b> run {} times with {} failures. Skipped {} times, because the output file(s) already existed. Logs are under {}.""").format(self.original_exe, ndone, nerror, n_exists, self.exe) get_relay('logger').emit(logging.INFO, msg) # init helpers
[docs] def _replace_alias(self): """If the command configuration has the ``is_alias_of`` replace the executable name """ try: alias_name = self.config['is_alias_of'] self.command = self.command.replace(self.exe, alias_name, 1) self.exe = alias_name except KeyError: """No alias""" pass
[docs] def _check_exe(self): """Check that the executable can be found and replace it with the full path returned by :func:`distutils.spawn.find_executable`""" exe = find_executable(self.exe) if exe: # if the command exists, replace it in the command string self.command.replace(self.exe, exe, 1) else: raise exceptions.CINoExeError(self.exe)
[docs] def _get_keys(self, command): """Extract the keywords from the command""" keyGroups = Template.pattern.findall(command) keys = [] for k in keyGroups: kjoin = ''.join(k).strip() if not kjoin: msg = ("In the command there is an isolated '$'" " that is not" " allowed by the keyword" " expansion. Use '$$' as an" " escape") raise exceptions.CIParseError(msg) elif kjoin != '$': keys.append(kjoin) return keys
[docs] def _validate_mandatory(self): """Check that all the mandatory keywords are provided. If ``mandatory`` is not found, return without doing anything """ if 'primary' not in self.config: msg = ("The mandatory keyword 'primary' is not in the" " configuration. Please provide it." " If you think that there are good reasons not to have the" " 'primary' keyword as mandatory, please contact the" " developers with your case.") raise exceptions.CIKeywordValidationError(msg) if self.config['primary'] not in self.config: msg = ("The keyword '{}' pointed to by 'primary' is not in the" " configuration. Please provide it.") msg = msg.format(self.config['primary']) raise exceptions.CIKeywordValidationError(msg) if 'mandatory' not in self.config: return # if the mandatory keys are not a subset of the given keys, complain missing_mandatory = set(self.config['mandatory']) - set(self._keys) if missing_mandatory: msg = ("The mandatory key(s) '{}' is(are) missing from the" " provided command".format(', '.join(missing_mandatory))) raise exceptions.CIKeywordValidationError(msg)
[docs] def _validate_keywords(self): """Check that all the requested keywords are in the configuration""" missing_keys = set(self._keys) - set(self.config.keys()) if missing_keys: msg = ("One or more of the keywords ({}) is unknown." " Edit the command or the configuration to" " sync the keys.".format(", ".join(missing_keys))) raise exceptions.CIKeywordValidationError(msg)
[docs] def _key_types(self, keys): """Scan the keys and check that the interpreter knows how to deal with them. Parameters ---------- keys : list of strings keys extracted from the command Returns ------- primary_func : callable function to call to get the list of primary files keyword_map : dictionary map non-primary keywords to the function used to handle them Raises ------ CIKeywordTypeError if the type of the keyword is not known """ # get the function to use to get the primary key primary_key = self.config['primary'] primary_value = self._get_value_as_dict(primary_key) try: primary_func = self.primary_types[primary_value['type']] except KeyError as e: msg = ("Unknown primary type '{}'; either edit the configuration" " file or create the function to handle it. Consult the" " documentation for more info.") msg = msg.format(primary_value['type']) six.raise_from(exceptions.CIKeywordTypeError(msg), e) # loop through all the keywords, excluding the primary, and map the # type with the function to handle it keyword_map = {} for k in keys: if k == primary_key: # skip the primary key continue value = self._get_value_as_dict(k) try: keyword_map[k] = self.keyword_types[value['type']] except KeyError as e: msg = ("Unknown type '{}' for keyword '{}'; either edit the" " configuration file or create the function to handle" " it. Consult the documentation for more info.") msg = msg.format(value['type'], k) six.raise_from(exceptions.CIKeywordTypeError(msg), e) return primary_func, keyword_map
[docs] def _get_value_as_dict(self, key): """Get the value of ``key`` from the configuration. If it's a string, convert it to a dictionary with two entries: * type: ``plain`` * value: ``value`` and re-add it in the configuration dictionary Parameters ---------- key : string key to get Returns ------- value : dictionary dictionary defining the type Raises ------ CIKeywordError if ``value`` is not a dictionary or a string """ value = self.config[key] if isinstance(value, six.string_types): value = {'type': 'plain', 'value': value} self.config[key] = value if 'type' not in value: msg = ("The value associated with the keyword '{}' is either of" " the wrong type or doesn't have the required ``type`` key") raise exceptions.CIKeywordError(msg.format(key)) return value
[docs] def _filter_selected(self): """Look for the existence of the ``filter_selected`` option and check that it is of the correct type and that ``selected`` is of the correct type Returns ------- filter_func : function function that accepts one string (one element of the primary list) and returns ``True`` or ``False`` if that element is valid or not """ if self.selected is None or 'filter_selected' not in self.config: return self._true # otherwise check that ``selected`` type is corrected try: '' in self.selected except TypeError: msg = ("The type of ``selected`` is not correct. It must support" " the ``in`` statement") raise exceptions.CIValidationError(msg) # get the function to deal with ``filter_selected`` value = self._get_value_as_dict('filter_selected') try: filter_ = self.keyword_types[value['type']] except KeyError as e: msg = ("Unknown type '{}' for keyword '{}'; either edit the" " configuration file or create the function to handle" " it. Consult the documentation for more info.") msg = msg.format(value['type'], 'filter_selected') six.raise_from(exceptions.CIKeywordTypeError(msg), e) def filter_func(primary): return filter_(primary, value) in self.selected return filter_func
[docs] def _execute(self): """Look for the existence of the ``execute`` option in the configuration and check that it is of the correct type. Returns ------- execute_func : function function that accepts one string (one element of the primary list) and returns ``True`` or ``False`` if that element must be run or not """ if 'execute' not in self.config: return self._true # get the function to deal with ``execute`` value = self._get_value_as_dict('execute') try: execute_ = self.execute_types[value['type']] except KeyError as e: msg = ("Unknown type '{}' for keyword '{}'; either edit the" " configuration file or create the function to handle" " it. Consult the documentation for more info.") msg = msg.format(value['type'], 'execute') six.raise_from(exceptions.CIKeywordTypeError(msg), e) def execute_func(primary): return execute_(primary, self.config) return execute_func # run helpers
[docs] def _run(self, index, primary): """Job that create the keywords, create and run the command string and log it Parameters ---------- index : int index of the current job primary : string primary file(s) to pass to the functions handling the keywords """ log = logging.getLogger(self.exe) template_dict = {} template_dict[self.config['primary']] = primary for key, func in six.iteritems(self._keyword_map): try: template_dict[key] = func(primary, self.config[key]) except Exception as e: msg = 'Failed to handle key {} because of {}' msg = msg.format(key, str(e)) log.critical(msg) six.raise_from(exceptions.CIKeywordError(msg), e) try: command = self.template.substitute(**template_dict) except KeyError as e: msg = ("Failed to replace key '{}' in the string '{}' when" " creating the command to execute").format(e.args[0], self.command) log.critical(msg) six.raise_from(exceptions.CICommandFmtError(msg), e) get_relay('command_string').emit(index, command) log.info("running: %s", command) try: split_command = shlex.split(command) except ValueError as e: msg = "Unable to split the command '{}' because of '{}'" msg = msg.format(command, e.args[0]) log.critical(msg) six.raise_from(exceptions.CICommandFmtError(msg), e) try: p = sp.Popen(split_command, stdout=sp.PIPE, stderr=sp.PIPE, universal_newlines=True) stdout, stderr = p.communicate() except Exception: log.critical("Command '%s' failed with the following error", command, exc_info=True) raise if stdout: log.info(stdout) if stderr: log.error(stderr) if p.returncode > 0: log.critical("Code crashed with return code %d", p.returncode) # generic helpers
[docs] def _true(self, *_, **__): """returns always true""" return True
if six.PY2: # pragma: no cover def _pickle_method(m): # pragma: no cover """In python 2 it's not possible to pickle instance methods directly. This function does some magic to do it. Only for python 2 is this function registers to allow methods pickling Copied from `Stack Overflow <http://stackoverflow.com/questions/25156768/cant-pickle-type-instancemethod-using-pythons-multiprocessing-pool-apply-a>`_ """ if m.im_self is None: return getattr, (m.im_class, m.im_func.func_name) else: return getattr, (m.im_self, m.im_func.func_name) copyreg.pickle(types.MethodType, _pickle_method)