# -*- coding: utf-8 -*-
"""
This module's scope are the operations required to reduce data from a high resolution spectrograph.
Example::
>>> import tuna
>>> file_object = tuna.io.read ( "tuna/test/unit/unit_io/adhoc.ad3" )
>>> reducer = tuna.pipelines.calibration_lamp_high_resolution.reducer ( \
calibration_wavelength = 6598.953125, \
finesse = 12, \
free_spectral_range = 8.36522123894, \
interference_order = 791, \
interference_reference_wavelength = 6562.7797852, \
pixel_size = 9, \
scanning_wavelength = 6616.89, \
tuna_can = file_object, \
channel_subset = [ 0, 1, 2, 5 ], \
continuum_to_FSR_ratio = 0.125, \
min_rings = 2, \
noise_mask_radius = 8, \
dont_fit = False, \
unwrapped_only = False, \
verify_center = None ); reducer.join ( )
>>> reducer.wavelength_calibrated.array [ 10 ] [ 10 ]
31.187012437244345
"""
import IPython
import logging
import math
import numpy
import random
import threading
import time
import tuna
[docs]class reducer ( threading.Thread ):
"""
Creates and stores an unwrapped phase map, taking as input a raw data cube.
It inherits from the :ref:`threading_label`.Thread class, and it auto-starts its thread execution. Clients are expected to use its .join ( ) method before using its results.
Intermediary products are:
- continuum
- discontinuum
- wrapped_phase_map
- noise
- borders_to_center_distances
- order_map
- unwrapped_phase_map
- parabolic_fit
- airy_fit
- airy_fit_residue
- substituted_channels
- wavelength_calibrated
Its constructor parameters are:
* best_ring_plane : integer : 0
This optional parameter, when specified, will limit the search for the ring structure to the specified plane. Note that the first plane on a cube has index 0.
* calibration_wavelength : float : 0
Encodes the magnitude of the calibration wavelength, in Angstroms.
* channel_subset : list of integers : [ ]
A list of the channels to be substituted by their fitted model's data.
* continuum_to_FSR_ratio : float : 0.5
The ratio between the number of channels expected to have a continuum-dominated signal, and the channels expected to have a line-dominated signal. This is used to produce a continuum map; larger ratios means more data belongs to the continuum.
* dont_fit : bool : False
Specifies whether to fit models to the data.
* finesse : float : 1
Containing the value of the Finesse for this spectrographer.
* free_spectral_range : float : 1
Containing the value in Angstroms for the bandwidth that corresponds to one interference order.
* interference_order : integer : 1
The order of the interference pattern from the spectrograph for this wavelength, and étalon separation.
* interference_reference_wavelength : float : 0
The order of the interference pattern for the reference wavelength.
* min_rings : integer : 1
The number of rings present in the data cube.
* noise_mask_radius : integer : 1
The distance from a noise pixel that will be marked as noise also (size of a circle around each noise pixel);
* noise_threshold : float : None
The minimal value for a pixel content to be marked as signal, instead of noise. If None, this value will be automatically computed.
* overscan_removal : dict : { }
A dictionary of elements that must be removed from the datacube.
* parameter_file : str : ""
The path for a text file containing the values for the other parameters needed for this pipeline. The format must be:
parameter = value
* pixel_size : float : 1
The size in micrometers of the separation between each pixel center in the CCD.
* plot_log : boolean : False
Specifies whether to matplotlib plot the partial results (which will be always available as ndarrays).
* ring_minimal_percentile : integer : None
The value for the minimal percentile that contains some data on the dataset. If None, will be determined automatically.
* scanning_wavelength : float : 0
The value in Angstroms for the wavelength used for scanning.
* tuna_can : can : None
The raw interferograph data. Must be a :ref:`tuna_io_can_label` object.
* unwrapped_only : boolean : False
If True, will avoid computing the model fits and the wavelength calibration.
* verify_center : tuple of 2 integers : None
If not None, the center calculated by the pipeline will be validated against the input value.
"""
def __init__ ( self,
best_ring_plane : int = None,
calibration_wavelength : float = 0,
channel_subset = [ ],
continuum_to_FSR_ratio = 0.125,
dont_fit = False,
finesse : float = 1,
free_spectral_range : float = 1,
interference_order : int = 1,
interference_reference_wavelength : float = 0,
min_rings = 1,
noise_mask_radius = 1,
noise_threshold = None,
overscan_removal = { },
parameter_file : str = "",
pixel_size : float = 1,
plot_log = False,
ring_minimal_percentile = None,
scanning_wavelength : float = 0,
tuna_can : tuna.io.can = None,
unwrapped_only = False,
verify_center = None ):
super ( self.__class__, self ).__init__ ( )
self.log = logging.getLogger ( __name__ )
self.log.setLevel ( logging.INFO )
if not isinstance ( tuna_can, tuna.io.can ):
self.log.info ( "array must be a numpy.ndarray or derivative object." )
return
try:
if tuna_can.ndim != 3:
self.log.warning ( "Image does not have 3 dimensions, aborting." )
return
except AttributeError as e:
self.log.warning ( "%s, aborting." % str ( e ) )
return
self.ipython = IPython.get_ipython ( )
if self.ipython != None:
self.ipython.magic ( "matplotlib qt" )
self.log.info ( "Starting {} pipeline.".format ( self.__module__ ) )
"""inputs:"""
self.best_ring_plane = best_ring_plane
self.calibration_wavelength = calibration_wavelength
self.channel_subset = channel_subset
self.continuum_to_FSR_ratio = continuum_to_FSR_ratio
self.dont_fit = dont_fit
self.finesse = finesse
self.free_spectral_range = free_spectral_range
self.interference_order = interference_order
self.interference_reference_wavelength = interference_reference_wavelength
self.min_rings = min_rings
self.noise_mask_radius = noise_mask_radius
self.noise_threshold = noise_threshold
self.overscan_removal = overscan_removal
self.pixel_size = pixel_size
self.plot_log = plot_log
self.ring_minimal_percentile = ring_minimal_percentile
self.scanning_wavelength = scanning_wavelength
self.tuna_can = tuna_can
self.unwrapped_only = unwrapped_only
self.verify_center = verify_center
if parameter_file != "":
self.log.debug ( "Parameter file specified: {}".format ( parameter_file ) )
"""outputs:"""
self.airy_fit = None
self.airy_fit_residue = None
self.borders_to_center_distances = None
self.continuum = None
self.discontinuum = None
self.noise = None
self.order_map = None
self.parabolic_fit = None
self.parabolic_model = None
self.rings_center = None
self.substituted_channels = None
self.unwrapped_phase_map = None
self.wavelength_calibrated = None
self.wrapped_phase_map = None
self.start ( )
[docs] def run ( self ):
"""
:ref:`threading_label` method for starting execution.
"""
self.overscanned = tuna.plugins.run ( "Overscan" ) ( data = self.tuna_can,
elements_to_remove = self.overscan_removal )
self.continuum = tuna.plugins.run ( "Continuum detector" ) ( self.overscanned,
self.continuum_to_FSR_ratio )
self.discontinuum = tuna.io.can ( array = numpy.ndarray ( shape = self.overscanned.shape ) )
for plane in range ( self.overscanned.planes ):
self.discontinuum.array [ plane, : , : ] = numpy.abs ( self.overscanned.array [ plane, : , : ] - self.continuum.array )
self.wrapped_phase_map = tuna.plugins.run ( "Barycenter algorithm" ) ( data_can = self.discontinuum )
self.noise = tuna.plugins.run ( "Noise detector" ) ( data = self.overscanned,
wrapped = self.wrapped_phase_map,
noise_mask_radius = self.noise_mask_radius,
noise_threshold = self.noise_threshold )
self.find_rings = tuna.plugins.run ( "Ring center finder" ) ( plane = self.best_ring_plane,
data = self.overscanned.array,
ipython = self.ipython,
min_rings = self.min_rings,
plot_log = self.plot_log )
center = self.find_rings [ 'concentric_rings' ] [ 0 ]
self.rings_center = center
if self.verify_center != None:
true_center = self.verify_center [ 0 ]
threshold = self.verify_center [ 1 ]
difference = math.sqrt ( ( center [ 0 ] - true_center [ 0 ] ) ** 2 + \
( center [ 1 ] - true_center [ 1 ] ) ** 2 )
if difference >= threshold:
return
if len ( self.find_rings [ 'concentric_rings' ] [ 1 ] ) < 2 and self.dont_fit == False:
self.log.error ( "Airy fitting requires at least 2 concentric rings. Blocking request to fit data." )
self.dont_fit = True
if self.dont_fit == False:
sorted_radii = sorted ( self.find_rings [ 'concentric_rings' ] [ 1 ] )
self.log.info ( "sorted_radii = {}".format (
[ "{:.2f}".format ( radius ) for radius in sorted_radii ] ) )
initial_b_ratio = tuna.tools.estimate_b_ratio ( sorted_radii [ : 2 ],
[ self.interference_order,
self.interference_order - 1 ] )
self.log.debug ( "initial_b_ratio = {}".format ( initial_b_ratio ) )
initial_gap = self.calibration_wavelength * self.interference_order * \
math.sqrt ( 1 + initial_b_ratio**2 * sorted_radii [ 0 ] ** 2 ) / 2
self.log.info ( "inital_gap = {:.2e} microns".format ( initial_gap ) )
parinfo_initial = [ ]
parbase = { 'fixed' : False, 'limits' : ( initial_b_ratio * 0.9, initial_b_ratio * 1.1 ) }
parinfo_initial.append ( parbase )
parbase = { 'fixed' : False, 'limits' : ( center [ 0 ] - 50,
center [ 0 ] + 50 ) }
parinfo_initial.append ( parbase )
parbase = { 'fixed' : False, 'limits' : ( center [ 1 ] - 50,
center [ 1 ] + 50 ) }
parinfo_initial.append ( parbase )
parbase = { 'fixed' : False }
parinfo_initial.append ( parbase )
parbase = { 'fixed' : False, 'limits' : ( self.finesse * 0.9, self.finesse * 1.1 ) }
parinfo_initial.append ( parbase )
parbase = { 'fixed' : False, 'limits' : ( initial_gap - self.calibration_wavelength / 4,
initial_gap + self.calibration_wavelength / 4 ) }
parinfo_initial.append ( parbase )
parbase = { 'fixed' : False }
parinfo_initial.append ( parbase )
airy_fitter_0 = tuna.plugins.run ( "Airy fit" ) ( b_ratio = initial_b_ratio,
center_col = center [ 0 ],
center_row = center [ 1 ],
data = self.overscanned.array [ 0 ],
finesse = self.finesse,
gap = initial_gap,
wavelength = self.calibration_wavelength,
mpyfit_parinfo = parinfo_initial )
airy_fit = numpy.ndarray ( shape = self.overscanned.shape )
airy_fit [ 0 ] = airy_fitter_0 [ 1 ].array
b_ratio = airy_fitter_0 [ 0 ] [ 0 ]
center_col = airy_fitter_0 [ 0 ] [ 1 ]
center_row = airy_fitter_0 [ 0 ] [ 2 ]
finesse = airy_fitter_0 [ 0 ] [ 4 ]
initial_gap = airy_fitter_0 [ 0 ] [ 5 ]
parinfo = [ ]
parbase = { 'fixed' : True }
parinfo.append ( parbase )
parbase = { 'fixed' : True }
parinfo.append ( parbase )
parbase = { 'fixed' : True }
parinfo.append ( parbase )
parbase = { 'fixed' : False }
parinfo.append ( parbase )
parbase = { 'fixed' : True }
parinfo.append ( parbase )
parbase = { 'fixed' : False, 'limits' : ( initial_gap - self.calibration_wavelength / 4,
initial_gap + self.calibration_wavelength / 4 ) }
parinfo.append ( parbase )
parbase = { 'fixed' : False }
parinfo.append ( parbase )
mid_plane = round ( self.overscanned.shape [ 0 ] / 2 )
airy_fitter_1 = tuna.plugins.run ( "Airy fit" ) ( b_ratio = b_ratio,
center_col = center_col,
center_row = center_row,
data = self.overscanned.array [ mid_plane ],
finesse = finesse,
gap = initial_gap,
wavelength = self.calibration_wavelength,
mpyfit_parinfo = parinfo )
airy_fit [ 1 ] = airy_fitter_1 [ 1 ].array
channel_gap = ( airy_fitter_1 [ 0 ] [ 5 ] - airy_fitter_0 [ 0 ] [ 5 ] ) / mid_plane
self.log.info ( "channel_gap = {} microns.".format ( channel_gap ) )
latest_gap = airy_fitter_0 [ 0 ] [ 5 ] + channel_gap
for plane in range ( 1, self.overscanned.planes ):
parinfo = [ ]
parbase = { 'fixed' : True }
parinfo.append ( parbase )
parbase = { 'fixed' : True }
parinfo.append ( parbase )
parbase = { 'fixed' : True }
parinfo.append ( parbase )
parbase = { 'fixed' : False }
parinfo.append ( parbase )
parbase = { 'fixed' : True }
parinfo.append ( parbase )
if channel_gap >= 0:
parbase = { 'fixed' : False,
'limits' : ( latest_gap,
latest_gap + 10 * abs ( channel_gap ) ) }
else:
parbase = { 'fixed' : False,
'limits' : ( latest_gap - 10 * abs ( channel_gap ),
latest_gap ) }
parinfo.append ( parbase )
parbase = { 'fixed' : False }
parinfo.append ( parbase )
self.log.debug ( "Fitting Airy plane {}: gap at {:.5f}.".format (
plane, latest_gap ) )
airy_fitter = tuna.plugins.run ( "Airy fit" ) ( b_ratio = b_ratio,
center_col = center_col,
center_row = center_row,
data = self.overscanned.array [ plane ],
finesse = finesse,
gap = latest_gap + channel_gap,
wavelength = self.calibration_wavelength,
mpyfit_parinfo = parinfo )
latest_gap = airy_fitter [ 0 ] [ 5 ]
airy_fit [ plane ] = airy_fitter [ 1 ].array
self.log.debug ( "Plane {} fitted.".format ( plane ) )
self.airy_fit = tuna.io.can ( airy_fit )
airy_fit_residue = self.overscanned.array - self.airy_fit.array
self.airy_fit_residue = tuna.io.can ( array = airy_fit_residue )
airy_pixels = airy_fit_residue.shape [ 0 ] * airy_fit_residue.shape [ 1 ] * airy_fit_residue.shape [ 2 ]
self.log.info ( "Airy <|residue|> = {:.1f} photons / pixel".format (
numpy.sum ( numpy.abs ( airy_fit_residue ) ) / airy_pixels ) )
ring_border_detector = tuna.tools.phase_map.ring_border_detector (
self.wrapped_phase_map,
center,
self.noise,
self.find_rings )
ring_border_detector.join ( )
self.borders_to_center_distances = ring_border_detector.distances
self.order_map = tuna.plugins.run ( "FSR mapper" ) ( distances = self.borders_to_center_distances,
wrapped = self.wrapped_phase_map,
center = center,
concentric_rings = self.find_rings [ 'concentric_rings' ] )
self.create_unwrapped_phase_map ( )
if self.unwrapped_only == True:
return
if self.dont_fit == False:
# It seems Astropy's fitters ain't thread safe, so the airy fit must be already joined.
parabolic_fitter = tuna.models.parabolic_fitter ( self.noise,
self.unwrapped_phase_map,
center )
wavelength_calibrator = tuna.tools.wavelength.wavelength_calibrator ( self.unwrapped_phase_map,
self.calibration_wavelength,
self.free_spectral_range,
self.interference_order,
self.interference_reference_wavelength,
self.overscanned.shape [ 0 ],
center,
self.scanning_wavelength )
if self.dont_fit == False:
substituted_channels = numpy.copy ( self.overscanned.array )
for channel in range ( self.overscanned.planes ):
if channel in self.channel_subset:
substituted_channels [ channel ] = numpy.copy ( self.airy_fit.array [ channel ] )
self.substituted_channels = tuna.io.can ( array = substituted_channels )
parabolic_fitter.join ( )
self.parabolic_model = parabolic_fitter.coefficients
self.parabolic_fit = parabolic_fitter.fit
self.verify_parabolic_model ( )
wavelength_calibrator.join ( )
self.wavelength_calibrated = wavelength_calibrator.calibrated
self.log.debug ( "wavelength_calibrated.array [ 0 ] [ 0 ] == {}".format (
self.wavelength_calibrated.array [ 0 ] [ 0 ] ) )
[docs] def create_unwrapped_phase_map ( self ):
"""
Unwraps the phase map according using the order array constructed.
"""
start = time.time ( )
max_x = self.wrapped_phase_map.array.shape[0]
max_y = self.wrapped_phase_map.array.shape[1]
max_channel = numpy.amax ( self.wrapped_phase_map.array )
min_channel = numpy.amin ( self.wrapped_phase_map.array )
unwrapped_phase_map = numpy.zeros ( shape = self.wrapped_phase_map.shape )
self.log.debug ( "unwrapped_phase_map.ndim == %d" % unwrapped_phase_map.ndim )
self.log.debug ( "Phase map 0% unwrapped." )
last_percentage_logged = 0
for x in range ( max_x ):
percentage = 10 * int ( x / max_x * 10 )
if percentage > last_percentage_logged:
last_percentage_logged = percentage
self.log.debug ( "Phase map %d%% unwrapped." % percentage )
for y in range ( max_y ):
unwrapped_phase_map [ x ] [ y ] = self.wrapped_phase_map.array [ x ] [ y ] + \
max_channel * float ( self.order_map.array [ x ] [ y ] )
self.log.info ( "Phase map unwrapped." )
self.unwrapped_phase_map = tuna.io.can ( array = unwrapped_phase_map )
self.log.debug ( "create_unwrapped_phase_map() took %ds." % ( time.time ( ) - start ) )
[docs] def verify_parabolic_model ( self ):
"""
Since the parabolic model fits a second-degree equation in two variables to the data, which we expect to have a circular symmetry, the coefficients for each second-degree element in the fitted equation should be "close". This can be quantified as the ratio between these coefficients, which this method prints on the log.
"""
self.log.debug ( "Ratio between 2nd degree coefficients is: %f" % ( self.parabolic_model [ 'x2y0' ] /
self.parabolic_model [ 'x0y2' ] ) )
[docs]def pixel_profiler ( reducer, pixel ):
"""
This function's goal is to conveniently return a structure containing the data for a given "position" throughout the pipeline. Since objects can have 2 or 3 dimensions, the data structure returns either a value or a 1 dimensional array for each product.
Parameters:
* reducer : reference to a calibration_lamp_high_resolution object
Should be set to the (already run) pipeline.
* pixel : tuple of 2 integers
Containing the values for column and row of the point to be investigated.
Returns a dictionary with 8 fields (each field corresponds to the result of a method of the calibration_lamp_high_resolution class):
* 'Original data' : numpy.ndarray
Contains the spectrum for the input pixel.
* 'Discontinuum' : numpy.ndarray
The value of the continuum for the input pixel.
* 'wrapped phase map' : float
The value of the wrapped phase map at the input pixel.
* 'Order map' : float
The order to which the pixel belongs to (relative to the order at the center of the ring structure).
* 'Unwrapped phase map' : float
The value of the unwrapped phase map at the input pixel.
* 'Parabolic fit' : float
The value of the fitted parabolic model at the input pixel.
* 'Airy fit' : numpy.ndarray
The value of the fitted Airy model at the input pixel.
* 'Wavelength' : float
The value of the wavelength-calibrated map at the input pixel.
"""
profile = { }
profile [ 0 ] = ( 'Original data', reducer.tuna_can.array [ :, pixel [ 0 ], pixel [ 1 ] ] )
profile [ 1 ] = ( 'Discontinuum', reducer.discontinuum.array [ :, pixel [ 0 ], pixel [ 1 ] ] )
profile [ 2 ] = ( 'Wrapped phase map', reducer.wrapped_phase_map.array [ pixel [ 0 ] ] [ pixel [ 1 ] ] )
profile [ 3 ] = ( 'Order map', reducer.order_map.array [ pixel [ 0 ] ] [ pixel [ 1 ] ] )
profile [ 4 ] = ( 'Unwrapped phase map', reducer.unwrapped_phase_map.array [ pixel [ 0 ] ] [ pixel [ 1 ] ] )
profile [ 5 ] = ( 'Parabolic fit', reducer.parabolic_fit.array [ pixel [ 0 ] ] [ pixel [ 1 ] ] )
profile [ 6 ] = ( 'Airy fit', reducer.airy_fit.array [ :, pixel [ 0 ], pixel [ 1 ] ] )
profile [ 7 ] = ( 'Wavelength', reducer.wavelength_calibrated.array [ pixel [ 0 ] ] [ pixel [ 1 ] ] )
return profile