Source code for pyhetdex.het.reconstruct_ifu

"""Reconstruct the IFU

"""

from __future__ import print_function, absolute_import

import itertools as it
import os

from astropy.io import fits
import numpy as np

from pyhetdex.tools.files.fits_tools import wavelength_to_index
from pyhetdex import het


[docs]class RecontructIndexError(IndexError): """Error for miss-matching the number of fibers in the ifu center files and the fiber extracted ones"""
[docs]class ReconstructIOError(IOError): """Error when the name and/or number fiber extracted file names is not correct""" pass
[docs]class ReconstructValueError(ValueError): """Errors for wrong combinations of input parameters in :class:`~ReconstructedIFU` or :meth:`~ReconstructedIFU.from_files`""" pass # TODO: account for illumination weights and fiber throughput
[docs]class ReconstructedIFU(object): r"""Reconstructed IFU head image from the fiber extracted frames given the ``ifu_center`` and the ``dither``. Parameters ---------- ifu_center : instance of :class:`~pyhetdex.het.ifu_centers.IFUCenter` fiber number to fiber position mapping dither : instance child of :class:`~pyhetdex.het.dither._BaseDither` dither relative position, illumination, image quality fextract : None or list of fits files, optional * if None the list of files is inferred from :attr:`dither.basename`; * if not None must have ``ndither`` x ``nchannels`` elements. The channel name and dither number are extracted from the ``CCDPOS`` and the ``DITHER`` header keywords fe_prefix : string, optional when getting the names from the dither file, prepend ``fe_prefix`` to the ``basename`` Attributes ---------- ifu_center dither x, y : 1-dimensional arrays x and y position of the fibers flux : list of 2-dimensional arrays each element is the content of one fiber extracted file header : list of dictionaries each element contains the ``CRVAL1`` and ``CDELT1`` keywords :value pairs from the headers of the fiber extracted files; used to determine the wavelength range in :meth:`~reconstruct` Raises ------ ReconstructValueError if an empty dither is passed and ``fextract`` is ``None`` ReconstructIOError if the number and/or number of fiber extracted frames is not correct; raised by :meth:`~_fedict` RecontructIndexError if the number of fibers from the fiber extracted files and from the ifu center files do not match; raised by :meth:`~_reconstruct` """ def __init__(self, ifu_center, dither, fextract=None, fe_prefix=""): if isinstance(dither, het.dither.EmptyDither) and fextract is None: msg = "With an empty dither file a fiber extract file name must be" msg += " provided" raise ReconstructValueError(msg) self.ifu_center = ifu_center self.dither = dither # public attributes, filled in *_reconstruct* self.x, self.y = np.array([[], []], dtype=float) self.flux, self.header = [], [] self._fe_prefix = fe_prefix # the fiber extracted file names are saved into a dictionary with key # like: D1_L self._key = "{d}_{ch}" self._reconstruct(self._fedict(fextract)) @classmethod
[docs] def from_files(cls, ifu_center_file, dither_file=None, fextract=None, fe_prefix=""): """ Read and parse the file Parameters ---------- ifu_center_file : string file containing the fiber number to fiber position mapping dither_file : string, optional file containing the dither relative position. If not given, a singe dither is assumed fextract : None or list of fits files, optional if None the list of files is inferred from the second column of the ``dither_file``; if not None must have ``ndither`` x ``nchannels`` elements. The channel name and dither number are extracted from the ``CCDPOS`` and the ``DITHER`` header keywords fe_prefix : string, optional when getting the names from the dither file, prepend ``fe_prefix`` to the ``basename`` Raises ------ ReconstructValueError if both ``dither_file`` and ``fextract`` are ``None`` ReconstructValueError, ReconstructIOError as in :class:`~ReconstructedIFU` Notes ----- if ``dither_file`` is None, ``fextract`` must contain a number of files equal to the number of channels (2) """ if dither_file is None and fextract is None: msg = "dither_file and/or fextract must be provided" raise ReconstructValueError(msg) _ifu_center = het.ifu_centers.IFUCenter(ifu_center_file) if dither_file is None: _dither = het.dither.EmptyDither() else: _dither = het.dither.ParseDither(dither_file) return cls(_ifu_center, _dither, fextract=fextract, fe_prefix=fe_prefix)
[docs] def _fedict(self, fextract): """ Organize the fiber extracted file names into a dictionary Parameters ---------- fextract : None or list of string If None get the file names from the dither, otherwise from the *fextract* list Returns ------- dfextract : dict dictionary of fiber extracted frames """ channels = self.ifu_center.channels dithers = self.dither.dithers n_shots = len(channels) * len(dithers) dfextract = {} if fextract is None: # get the file names from the dither file fetemplate = os.path.join(self.dither.abspath, self._fe_prefix + "{bn}_{ch}.fits") for ch, (d, bn) in it.product(channels, self.dither.basename.items()): k = self._key.format(ch=ch, d=d) dfextract[k] = fetemplate.format(ch=ch, bn=bn) else: # the file names are provided if len(fextract) != n_shots: msg = "The number of fiber extracted files is different from" msg += " {} ({} channels and {} dithers)" raise ReconstructIOError(msg.format(n_shots, len(channels), len(dithers))) # as the fiber extracted files are ordered, pop elements. This way # makes sure that all the dithers and channels are represented all_keys = [self._key.format(d=d, ch=ch) for d in dithers for ch in channels] for fe in fextract: header = fits.getheader(fe) ch = header['CCDPOS'].strip() d = "D{0:d}".format(header['DITHER']) k = self._key.format(ch=ch, d=d) # check that the files are correct if k in dfextract: msg = "The file '{}' is for the same dither".format(fe) msg += " and channel as file '{}'".format(dfextract[k]) raise ReconstructIOError(msg) else: try: all_keys.remove(k) except ValueError: msg = "The file '{}' is for an unknown combination of" msg += " dither ({}) and channel ({}). One of {}" msg += " expected" raise ReconstructIOError(msg.format(fe, d, ch, ", ".join(all_keys))) dfextract[k] = fe return dfextract
[docs] def _reconstruct(self, dfextract): """ Read the fiber extracted files and creates a set of three lists for x, y and flux. Parameters ---------- dfextract : dictionary name of the fiber extracted file """ for ch, d in it.product(self.ifu_center.channels, self.dither.dithers): # get the correct values of x and y self.x = np.concatenate([self.x, self.xpos(ch, d)]) self.y = np.concatenate([self.y, self.ypos(ch, d)]) # read the fiber extracted file, order the fibers and save the # necessary keys into self.h # python starts from 0 fib_numbs = [fn - 1 for fn in self.ifu_center.fib_number[ch]] k = self._key.format(ch=ch, d=d) with fits.open(dfextract[k]) as hdu: h = hdu[0].header data = hdu[0].data # if the number of fiber from the fiber extracted file is # different from the number of fibers, something wrong if data.shape[0] != len(fib_numbs): msg = "The number of rows in file '{0}' ({1:d}) does not" msg += " agree with the number of active fibers from file" msg += " '{2}' ({3:d})" msg = msg.format(hdu.filename(), data.shape[0], self.ifu_center.filename, len(fib_numbs)) raise RecontructIndexError(msg) self.flux.append(data[fib_numbs, :]) # order the fibers # get the header keywords needed to get the index at a given # wavelength self.header.append({k: h.get(k) for k in ["CRVAL1", "CDELT1"]})
[docs] def xpos(self, channel, dither): """get the position for the x *dither* in *channel* Parameters ---------- channel : string name of the channel [L, R] dither : string name of the dither [D1, D2, ..] Returns ------- ndarray x position of the fibers for the given channel and dither """ return np.array(self.ifu_center.xifu[channel]) + self.dither.dx[dither]
[docs] def ypos(self, channel, dither): """get the position for the y *dither* in *channel* Parameters ---------- channel : string name of the channel [L, R] dither : string name of the dither [D1, D2, ..] Returns ------- ndarray y position of the fibers for the given channel and dither """ return np.array(self.ifu_center.yifu[channel]) + self.dither.dy[dither]
[docs] def reconstruct(self, wmin=None, wmax=None): """ Returns the reconstructed IFU with the flux computed between [``wmin``, ``wmax``] Parameters ---------- wmin, wmax : float, optional min and max wavelength to use. If ``None``: use the min and/or max from the file Returns ------- x, y : 1d arrays x and y position of the fibers flux : 1d array flux of the fibers within ``wmin`` and ``wmax`` """ _flux = [] for f, h in zip(self.flux, self.header): _imin = wavelength_to_index(h, wmin) _imax = wavelength_to_index(h, wmax) _flux.append(f[:, _imin:_imax].sum(axis=1)) return self.x, self.y, np.concatenate(_flux)
[docs]class QuickReconstructedIFU(object): r"""Reconstructed IFU head image from the fiber extracted frames given the ``ifu_center`` and the ``dither``. Parameters ---------- ifu_center : instance of :class:`~pyhetdex.het.ifu_centers.IFUCenter` fiber number to fiber position mapping files : Basefilename of the image to be reconstructed, or a tuple of three images to reconstruct a complete dither. dist : Distortion file for the ifu Attributes ---------- ifu_center Raises ------ ReconstructValueError if an empty dither is passed and ``fextract`` is ``None`` ReconstructIOError if the number and/or number of fiber extracted frames is not correct; raised by :meth:`~_fedict` RecontructIndexError if the number of fibers from the fiber extracted files and from the ifu center files do not match; raised by :meth:`~_reconstruct` """ def __init__(self, ifu_center, files, dist): if not ifu_center: raise ReconstructValueError('An IFU center file is needed to quickreconstruct an image') if not dist: raise ReconstructValueError('A distortion is needed to quickreconstruct an image') self.ifu_center = ifu_center self.dist = dist self.files = files # public attributes, filled in *_reconstruct* self.x, self.y = np.array([[], []], dtype=float) self.flux, self.header = [], [] self._reconstruct()
[docs] def _reconstruct(self): """ Read the fiber extracted files and creates a set of three lists for x, y and flux. Parameters ---------- dfextract : dictionary name of the fiber extracted file """ for ch, d in it.product(self.ifu_center.channels, self.dither.dithers): # get the correct values of x and y self.x = np.concatenate([self.x, self.xpos(ch, d)]) self.y = np.concatenate([self.y, self.ypos(ch, d)]) # read the fiber extracted file, order the fibers and save the # necessary keys into self.h # python starts from 0 fib_numbs = [fn - 1 for fn in self.ifu_center.fib_number[ch]] k = self._key.format(ch=ch, d=d) with fits.open(dfextract[k]) as hdu: h = hdu[0].header data = hdu[0].data # if the number of fiber from the fiber extracted file is # different from the number of fibers, something wrong if data.shape[0] != len(fib_numbs): msg = "The number of rows in file '{0}' ({1:d}) does not" msg += " agree with the number of active fibers from file" msg += " '{2}' ({3:d})" msg = msg.format(hdu.filename(), data.shape[0], self.ifu_center.filename, len(fib_numbs)) raise RecontructIndexError(msg) self.flux.append(data[fib_numbs, :]) # order the fibers # get the header keywords needed to get the index at a given # wavelength self.header.append({k: h.get(k) for k in ["CRVAL1", "CDELT1"]})
[docs] def xpos(self, channel, dither): """get the position for the x *dither* in *channel* Parameters ---------- channel : string name of the channel [L, R] dither : string name of the dither [D1, D2, ..] Returns ------- ndarray x position of the fibers for the given channel and dither """ return np.array(self.ifu_center.xifu[channel]) + self.dither.dx[dither]
[docs] def ypos(self, channel, dither): """get the position for the y *dither* in *channel* Parameters ---------- channel : string name of the channel [L, R] dither : string name of the dither [D1, D2, ..] Returns ------- ndarray y position of the fibers for the given channel and dither """ return np.array(self.ifu_center.yifu[channel]) + self.dither.dy[dither]
[docs] def reconstruct(self, wmin=None, wmax=None): """ Returns the reconstructed IFU with the flux computed between [``wmin``, ``wmax``] Parameters ---------- wmin, wmax : float, optional min and max wavelength to use. If ``None``: use the min and/or max from the file Returns ------- x, y : 1d arrays x and y position of the fibers flux : 1d array flux of the fibers within ``wmin`` and ``wmax`` """ _flux = [] for f, h in zip(self.flux, self.header): _imin = wavelength_to_index(h, wmin) _imax = wavelength_to_index(h, wmax) _flux.append(f[:, _imin:_imax].sum(axis=1)) return self.x, self.y, np.concatenate(_flux)