"""Symlink raw files into a redux directory
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import datetime as dt
import logging
import multiprocessing
import os
from astropy.io import fits
import peewee
import six
from pyhetdex.doc import docstring
import pyhetdex.tools.files.file_tools as ft
import pyhetdex.tools.processes as proc
from pyhetdex.tools import six_ext
import vdat.database as db
import vdat.utilities as vutil
import vdat.config as confp
# lock for the _symlink_cal function
_manager = multiprocessing.Manager()
_lock = _manager.Lock()
# date formats
DATE_HEAD_KEY = '{} {}'
"""Format string for the ``DATE-OBS`` and ``UT`` keys from the fits files"""
FMT_HEAD_KEY = "%Y-%m-%d %H:%M:%S.%f"
"""Format for converting ``DATE_HEAD_KEY`` into a :class:`~datetime.datetime`
instance"""
FMT_DATE_DIR = "%Y%m%d_%H%M%S"
"""Format for converting a :class:`~datetime.datetime` instance into as string
used as directory name"""
[docs]def symlink(log_message, redux_name=None):
"""Symlink shots in the directory 'path' to an output redux directory.
Format of redux directory follows
`issue 820 <https://luna.mpe.mpg.de/redmine/issues/820>`
``redux_name`` is a unique name for this reduction. It is set to the
current date and time if not specified. Each reduction puts its own folder
called ``redux_name`` under.
Adds flt, sci, cmp, bias directories to config object after creation. In
this way you can run the code without symlinking and just fill in the
needed directories in the config file instead. After creating the required
directories loops over all the fits files in that night and puts them in a
sensible place.
Parameters
----------
log_message : string
Message to log as info before starting
redux_name : string
a name for this reduction, if not given generate it from the local
time. This String has to be a valid name of a folder (XXX not currently
tested)
"""
log = logging.getLogger('logger')
conf = confp.get_config('main')
log.info(log_message)
# get the relevant information
raw_dir = os.path.abspath(conf.get("general", "rawdir"))
redux_dir = os.path.abspath(conf.get("general", 'redux_dir'))
if not os.path.exists(redux_dir) and not os.path.exists(raw_dir):
log.critical("Neither the raw directory '%s' nor the redux directoy"
" '%s' exist. This is a problem." " Aborting.", raw_dir,
redux_dir)
raise vutil.VDATDirError("No redux directory found: VDAT cannot be"
" run.")
# create the redux directory if needed
_mkdir(redux_dir, log)
# start the database
db.init(redux_dir)
# scan the redux directory to build/update the database
_scan_dirs(redux_dir)
if os.path.exists(raw_dir):
_do_symlink(raw_dir, redux_dir)
else:
log.warning("The raw directory '%s' does not exist, but I have"
" found the redux '%s'. Skip the symlinking.", raw_dir,
redux_dir)
# create the zro and calibration references in the database
_db_create_references()
@docstring.format_docstring(tmp=vutil.SHOT_FILE)
[docs]def _scan_dirs(redux_dir):
"""Scan the redux directories updating the '{tmp}' file if the redux
directory has changed and filling the database
Parameters
----------
redux_dir : string
name of the redux directory
"""
def _yield_rows():
for dir_ in ft.scan_dirs(redux_dir, followlinks=False):
# If the shot file exists create the database entry
try:
shot_file = vutil.read_shot_file(dir_)
except six_ext.FileOpenError:
# the shot file doesn't exist, continue the loop
continue
# if the redux directory has changed update it in the shot file
if shot_file[0]['redux_dir'] != redux_dir:
append = False
for l in shot_file:
l['path'] = l['path'].replace(l['redux_dir'], redux_dir)
l['redux_dir'] = redux_dir
vutil.write_to_shot_file(dir_, append=append, **l)
append = True
if len(shot_file) == 1:
shot_file = shot_file[0]
elif len(shot_file) > 1:
shot_file = vutil.merge_dicts(shot_file,
exclude=['id', 'zero_dir',
'cal_dir', 'ifus'])
yield shot_file
# create the database entries
rows = list(_yield_rows())
if not rows:
# reduction directory empty
return
with db.connect():
insert_query = db.VDATDir.insert_many(rows)
insert_query.execute()
[docs]def _do_symlink(raw_dir, redux_dir):
"""Run the symlinking from the raw to the redux directory
Parameters
----------
raw_dir, redux_dir : string
name of the raw and the redux directory
"""
log = logging.getLogger('logger')
conf = confp.get_config('main')
# get other relevant information
wildcard_night = conf.get('wildcards', 'night')
wildcard_shot = conf.get('wildcards', 'virus_shot')
log.info("Symlinking fits files from %s to %s", raw_dir, redux_dir)
# get the multiprocessors (already initialised in the main)
worker = proc.get_worker()
for night in ft.scan_dirs(raw_dir, matches=wildcard_night,
recursive=False):
log.info("Symlinking night %s...", night)
redux_night = night.replace(raw_dir, redux_dir)
# create the redux night directory if needed
_mkdir(redux_night, log)
# loop through the directories in the night
for shot in ft.scan_dirs(night, matches=wildcard_shot,
recursive=False):
worker(_symlink_shot, shot, redux_night)
for job in worker.jobs:
try:
job.get()
except (vutil.VDATFitsTypeError, vutil.VDATDateError) as e:
log.error("The shot symlinking has been skipped due to error: %s",
str(e))
except Exception:
log.critical("First exception during the symlinking",
exc_info=True)
raise
worker.wait()
n_complete, n_error, _ = worker.jobs_stat()
log.info("Symlinking of %d shots is complete", n_complete)
if n_error > 0:
log.warning("There where %d errors.", n_error)
# clean up the job list
worker.clear_jobs()
# TODO: remove it eventually
# Tell the filebrowser to update (in case we just created a new redux dir)
# signals.update_filebrowser.emit()
[docs]def _symlink_shot(shot_dir, redux_dir):
"""Create the redux directory for the shot and symlink all the files from
the shot directory
Parameters
----------
shot_dir : string
name of the shot directory
redux_dir : string
name of the directory where the new directory and symlink must go
Raises
------
:class:`~vdat.utilities.VDATFitsTypeError`
if the image type or object are not consistent or unknown
"""
log = logging.getLogger('logger')
conf = confp.get_config('main')
shot = os.path.split(shot_dir)[1]
log.debug("Symlinking shot %s", shot)
# get all the virus fits files
# TODO: for now it uses only the virus files, add also the guide probe and
# wavefront sensor data
fits_files = list(ft.scan_files(shot_dir, matches="*/virus/*.fits"))
if len(fits_files) <= 0:
log.warning("No files found in whot '%s'. Skipping", shot)
return
# get all the relevant info from the fits files
image_type, object_, date = set(), set(), set()
for f in fits_files:
header = fits.getheader(f)
image_type.add(header.get("IMAGETYP", 'unknown').strip())
object_.add(header.get("OBJECT", 'unknown').strip())
date.add(DATE_HEAD_KEY.format(header.get('DATE-OBS',
"unknown").strip(),
header.get("UT", "unknown").strip()))
# check that all the types are the same
if len(image_type) != 1:
msg = ("Not all the fits files in the shot '{}' are of the image"
" type '{}'. Aborting shot symlinking")
msg = msg.format(shot_dir, list(image_type)[0])
log.error(msg)
raise vutil.VDATFitsTypeError(msg)
image_type = list(image_type)[0]
# check that all the objects are the same
if len(object_) != 1:
msg = ("Not all the fits files in the shot '{}' represent the same"
" object type '{}'. Aborting shot symlinking")
msg = msg.format(shot_dir, list(object_)[0])
log.error(msg)
raise vutil.VDATFitsTypeError(msg)
object_ = list(object_)[0]
if image_type == conf.get('types', 'sci'):
type_ = 'sci'
elif image_type == conf.get('types', 'zro'):
type_ = 'zro'
elif image_type in [conf.get("types", "flt"), conf.get("types", "cmp")]:
type_ = 'cal'
avg_date, avg_date_string = _average_timestamps(date)
# Instantiate the entry. The following fields are filled with values that
# must or might be changed when doing the symlinking
# * path: left empty for now. Must be filled before the symlinking. The
# path can be created using the method VDATDir.make_path
# * name: average timestamp as a string; is the target name of
# the directory for the zro and cal frames; must be changed for the
# science frames and can be changed in the other cases
# * shot: original shot name; for cal if necessary merge two names
# * timestamp: average time stamp; for cal if necessary average
# * original_type_: original type of the data name; for cal if necessary
# merge two names
# * object_: original object name; for cal if necessary merge two names
_redux_dir, night = os.path.split(redux_dir)
vdat_dir = db.VDATDir(redux_dir=_redux_dir, night=night, type_=type_,
name=avg_date_string,
is_clone=False, shot=shot,
timestamp=avg_date, original_type_=image_type,
object_=object_)
if type_ == 'sci':
vdat_dir = _symlink_sci(fits_files, vdat_dir)
elif type_ == 'zro':
vdat_dir = _symlink_zro(fits_files, vdat_dir)
elif type_ == 'cal':
vdat_dir = _symlink_cal(fits_files, vdat_dir)
else:
msg = ("Object type '{}' for files in shot '{}' is unknown."
" Aborting the symlinking".format(image_type, shot_dir))
log.error(msg)
raise vutil.VDATFitsTypeError(msg)
with db.connect():
vdat_dir.save()
log.debug("Symlinking shot %s done", shot)
[docs]def _symlink_sci(fits_files, vdat_dir):
"""Symlink the science shots
Parameters
----------
fits_files : list of strings
names of the fits files
vdat_dir : :class:`vdat.database.VDATDir` instance
contains all the relevant information for the symlinking
Returns
-------
out_vdat_dir : :class:`vdat.database.VDATDir` instance
database entry to save
"""
log = logging.getLogger('logger')
log.debug("Symlinking 'science' frames")
object_ = vdat_dir.object_.replace(" ", "-")
# search if the current shot has already been symlinked
q = db.VDATDir.select().where((db.VDATDir.night == vdat_dir.night) &
(db.VDATDir.type_ == vdat_dir.type_) &
(db.VDATDir.name == object_) &
(db.VDATDir.shot == vdat_dir.shot) &
(db.VDATDir.is_clone == False)
)
if q.exists():
out_vdat_dir = q.get()
else:
vdat_dir.name = object_
vdat_dir.make_path()
out_vdat_dir = vdat_dir
target_dir = out_vdat_dir.path
_mkdir(target_dir, log)
_symlink_file(fits_files, target_dir, log)
if not q.exists():
vutil.write_to_shot_file(target_dir, append=False, **out_vdat_dir.data)
return out_vdat_dir
[docs]def _symlink_zro(fits_files, vdat_dir):
"""Symlink the bias shots.
Parameters
----------
fits_files : list of strings
names of the fits files
vdat_dir : :class:`vdat.database.VDATDir` instance
contains all the relevant information for the symlinking
Returns
-------
out_vdat_dir : :class:`vdat.database.VDATDir` instance
database entry to save
"""
log = logging.getLogger('logger')
log.debug("Symlinking 'bias' frames")
# search if the current shot has already been symlinked
q = db.VDATDir.select().where((db.VDATDir.night == vdat_dir.night) &
(db.VDATDir.type_ == vdat_dir.type_) &
(db.VDATDir.shot == vdat_dir.shot) &
(db.VDATDir.is_clone == False)
)
if q.exists():
out_vdat_dir = q.get()
else:
vdat_dir.make_path()
out_vdat_dir = vdat_dir
# set the default target directory
target_dir = out_vdat_dir.path
_mkdir(target_dir, log)
_symlink_file(fits_files, target_dir, log)
if not q.exists():
vutil.write_to_shot_file(target_dir, append=False, **out_vdat_dir.data)
return out_vdat_dir
[docs]def _symlink_cal(fits_files, vdat_dir):
"""Symlink the calibration, flat and arc, shots.
Flats and arcs taken together goes into the same directory.
1. If the shot is already symlinked, reuse the directory
2. If not look for directories with different original type or object,
order them at increasing time distance and take the nearest one: if
it's within a maximum time distance, symlink into that directory
3. Otherwise create a new directory and symlink into it
Parameters
----------
fits_files : list of strings
names of the fits files
vdat_dir : :class:`vdat.database.VDATDir` instance
contains all the relevant information for the symlinking
Returns
-------
out_vdat_dir : :class:`vdat.database.VDATDir` instance
database entry to save
"""
conf = confp.get_config('main')
log = logging.getLogger('logger')
log.debug("Symlinking '%s' frames",
"flat" if vdat_dir.original_type_ == "flat" else "arc")
# lock the remaining of the function to be able to group arc and
# calibration shots also when running in multiprocessing mode
with _lock:
# get all the calibration entries for the night
qcal = db.VDATDir.select().where((db.VDATDir.night == vdat_dir.night) &
(db.VDATDir.type_ == vdat_dir.type_) &
(db.VDATDir.is_clone == False)
)
# search for the current shot
qshot = qcal.where(db.VDATDir.shot % ("*" + vdat_dir.shot + "*"))
if qshot.exists():
out_vdat_dir = qshot.get()
else:
q_noobj = qcal.where(~(db.VDATDir.object_ %
("*" + vdat_dir.object_ + "*")))
# find the nearest entry
max_timedelta = dt.timedelta(conf.getint("redux_dirs",
"max_delta_cal"))
q_noobj = _find_nearest(q_noobj, vdat_dir.timestamp, n_nearest=1,
nearest_then=max_timedelta)
if q_noobj:
out_vdat_dir = q_noobj[0]
vdat_dir.name = out_vdat_dir.name
out_vdat_dir.merge_entries(vdat_dir)
else:
vdat_dir.make_path()
out_vdat_dir = vdat_dir
# set the default target directory
target_dir = out_vdat_dir.path
_mkdir(target_dir, log)
_symlink_file(fits_files, target_dir, log)
if not qshot.exists():
# save the entry in the file
vutil.write_to_shot_file(target_dir, **vdat_dir.data)
with db.connect():
out_vdat_dir.save()
return out_vdat_dir
[docs]def _mkdir(dirname, log, failsafe=True):
"""Create the directory.
If it exists, log it as error and, if ``failsafe`` is False, re-raise the
exception
Parameters
----------
dirname : string
name of the directory to create
log : :class:`logging.Logging` instance
log messages to this logger
safe : bool, optional
if true silently ignores :class:`OSError`` due to existing directories
Raises
------
:class:`~vdat.utilities.VDATDirError` if the creation fails with a
:class:`OSError` and ``failsafe`` is False
"""
try:
os.makedirs(dirname)
log.debug("Directory '%s' created", dirname)
except OSError as e:
if failsafe and e.errno == 17 and 'File exists' in e.strerror:
log.debug("Cannot create output directory '%s'. Error: %s",
dirname, str(e))
else:
six.raise_from(vutil.VDATDirError(e), e)
[docs]def _symlink_file(file_list, target_dir, log, failsafe=True):
"""Symlink the files into the target directory.
If it exists, log it as error and, if ``failsafe`` is False, re-raise the
exception
Parameters
----------
file_list : list of strings
names of the fits files to symlink
target_dir : string
name of the directory where to do the symlink
log : :class:`logging.Logging` instance
log messages to this logger
safe : bool, optional
if true ignores :class:`OSError`` due to existing files
Raises
------
:class:`~vdat.utilities.VDATSymlinkError` if the symlink creation fails
with a :class:`OSError` and ``failsafe`` is False
"""
for fn in file_list:
name = os.path.split(fn)[1]
try:
os.symlink(fn, os.path.join(target_dir, name))
log.debug("Symlink to '%s' created", fn)
except OSError as e:
if failsafe and e.errno == 17 and 'File exists' in e.strerror:
log.debug("Cannot create symlink to '%s' in '%s'. Error: %s",
fn, target_dir, str(e))
else:
six.raise_from(vutil.VDATSymlinkError(e), e)
[docs]def _average_timestamps(dates, infmt=FMT_HEAD_KEY, outfmt=FMT_DATE_DIR):
"""Average the list of timestamps.
Parameters
----------
dates : list of strings
strings containing timestamps
infmt : strings, optional
format of ``dates``
outfmt : string, optional
format of the output time stamp
Returns
-------
avg_timestamp : :class:`datetime.datetime` instance
average time
string
``avg_timestamp`` formatted according to ``outfmt``
Raises
------
:class:`~vdat.utilities.VDATDateError`
if it fails to parse dates from the fits headers
"""
try:
timestamps = [dt.datetime.strptime(d, infmt) for d in dates]
except ValueError as e:
six.raise_from(vutil.VDATDateError(e), e)
avg_deltas = sum((t - timestamps[0] for t in timestamps),
dt.timedelta()) // len(timestamps)
avg_timestamp = timestamps[0] + avg_deltas
return avg_timestamp, avg_timestamp.strftime(outfmt)
[docs]def _find_nearest(q, timestamp, n_nearest=1, nearest_then=None):
"""Go through the list of query results, order them according to the absolute
distance from ``timestamp`` and return the ``n_nearest``.
Parameters
----------
q : :class:`peewee.SelectQuery`
query to use
timestamp : :class:`~datetime.datetime` instance
timestamp to use as reference
n_nearest : int, optional
maximum number of directories returned; set it to negative to return
all
nearest_then : :class:`~datetime.timedelta` instance
if not None, don't consider any directory whose delta time is larger
than ``nearest_then``; applied after n_nearest
Returns
-------
sorted_q : list of query results
ordered with respect to the timestamp
"""
def _key(element):
"""Create the key for ordering as timedeltas from ``timestamp``"""
return abs(element.timestamp - timestamp)
def _filter(element):
"""Test whether the ``timedelta`` is less than ``nearest_then``"""
return _key(element) < nearest_then
sorted_q = sorted(q, key=_key)
if n_nearest > 0:
sorted_q = sorted_q[:n_nearest]
if nearest_then:
sorted_q = list(filter(_filter, sorted_q))
return sorted_q
[docs]def _db_create_references():
# search reference zero and calibration directories
with db.connect():
qzro = db.VDATDir.select().where((db.VDATDir.type_ == 'zro') &
(db.VDATDir.is_clone == False))
qcal = db.VDATDir.select().where((db.VDATDir.type_ == 'cal') &
(db.VDATDir.is_clone == False))
for vdir in db.VDATDir.select().where(db.VDATDir.type_ << ['cal',
'sci']):
# for both find the reference zero directory
ref_zro = _find_nearest(qzro, vdir.timestamp)
if ref_zro:
vdir.zero_dir = ref_zro[0]
if vdir.type_ == 'sci':
ref_cal = _find_nearest(qcal, vdir.timestamp)
if ref_cal:
vdir.cal_dir = ref_cal[0]
vdir.save()