Source code for tuna.tools.barycenter

"""
This module's scope is the creation of barycenter "maps" from Fabry-Pérot interferogram data.

The barycenter map is one of the so-called "wrapped" phase map types.

Example:

    >>> import tuna
    >>> import numpy
    >>> raw = tuna.io.read ( "tuna/test/unit/unit_io/adhoc_3_planes.fits" )
    >>> barycenter = tuna.plugins.run ( "Barycenter algorithm" ) ( data_can = raw )
    >>> barycenter.shape
    (512, 512)
"""

__version__ = "0.1.0"
changelog = {
    "0.1.0" : ( "0.15.0", "Updated example to use plugins." )
    }

from astropy.modeling import models, fitting
import logging
import math
import numpy
import threading
import time
import tuna
import warnings

[docs]class barycenter_detector ( threading.Thread ): """ This class' responsibility is to generate barycenter maps from spectral cubes. Its algorithm utilizes a fitter to fit a parabola to each pixel, and find the barycenter from the fit parameters. It inherits from the :ref:`threading_label`.Thread class, and it auto-starts its thread execution. Clients are expected to use its .join ( ) method before using its results. Its constructor expects the following parameters: * data : can Containing the interferograph data. """ def __init__ ( self, data ): super ( self.__class__, self ).__init__ ( ) self.__version__ = "0.1.0" self.changelog = { "0.1.0" : "Tuna 0.14.0 : updated docstrings to new style." } self.log = logging.getLogger ( __name__ ) self.log.setLevel ( logging.DEBUG ) self.data = data self.__array = self.data.array self.__number_of_spectral_regions_array = None self.__photon_counts_array = None self.__number_spectral_peaks_array = None self.mask = numpy.mgrid [ 1 : self.__array.shape [ 0 ] + 1 ] self.barycenter = None self.result = None self.start ( )
[docs] def run ( self ): """ This method's goal is to wrap execution to be called from the thread's .start ( ) method. """ start = time.time ( ) result = self.create_barycenter ( ) self.log.debug ( "result.shape = %s" % str ( result.shape ) ) try: self.result = tuna.io.can ( array = result ) except TypeError: self.result = tuna.io.can.can ( array = result ) self.log.debug ( "self.result.array.shape = %s" % str ( self.result.array.shape ) ) self.log.debug ( "Barycenter detection took %ds." % ( time.time ( ) - start ) )
[docs] def create_barycenter ( self ): """ This method's goal is to find the barycenter for each pixel by fitting a 1D gaussian model to each pixel's spectral data, and then calculating the barycenter of the gaussian. Returns: * barycenter : numpy.ndarray Containing the generated barycenter map. """ barycenter = numpy.zeros ( shape = ( self.__array.shape [ 1 ], self.__array.shape [ 2 ] ) ) planes_indexes = numpy.mgrid [ 0 : self.__array.shape [ 0 ] ] # Supposing a single peak, a single Gaussian distribution should be ok. gaussian = models.Gaussian1D ( amplitude = 1.0, mean = self.__array.shape [ 0 ] / 2, stddev = 1 ) fitter = fitting.LevMarLSQFitter ( ) for row in range ( self.__array.shape [ 1 ] ): self.log.debug ( "barycenter at row %d" % row ) for col in range ( self.__array.shape [ 2 ] ): profile = self.__array [ :, row, col ] # Single peak: centering the largest signal should center the profile. profile_max = numpy.argmax ( profile ) roll = round ( self.__array.shape [ 0 ] / 2 ) - profile_max shifted_profile = numpy.roll ( profile, roll ) with warnings.catch_warnings ( ): warnings.simplefilter ( "ignore" ) fit = fitter ( gaussian, planes_indexes, shifted_profile ) mean = fit.parameters [ 1 ] # From https://en.wikipedia.org/wiki/Gaussian_function: FWHH = 2.35482 * abs ( fit.parameters [ 2 ] ) left_shoulder = max ( 0, math.floor ( mean - FWHH * 4 ) ) right_shoulder = min ( self.__array.shape [ 0 ], math.ceil ( mean + FWHH * 4 ) ) peak = shifted_profile [ left_shoulder : right_shoulder ] peak_center_of_mass = self.get_center_of_mass ( peak ) rolled_center_of_mass = left_shoulder + peak_center_of_mass unrolled_center_of_mass = rolled_center_of_mass - roll moduled_center_of_mass = unrolled_center_of_mass % self.__array.shape [ 0 ] barycenter [ row ] [ col ] = moduled_center_of_mass if col == 0: self.log.debug ( "(%d, %3d) prof = %s" % ( row, col, str ( profile ) ) ) self.log.debug ( "(%d, %3d) roll = %d" % ( row, col, roll ) ) self.log.debug ( "(%d, %3d) spro = %s" % ( row, col, str ( shifted_profile ) ) ) self.log.debug ( "(%d, %3d) ampl = %f, mean = %f, stddev = %f." % ( row, col, fit.parameters [ 0 ], mean, fit.parameters [ 2 ] ) ) self.log.debug ( "(%d, %3d) fwhh = %f." % ( row, col, FWHH ) ) self.log.debug ( "(%d, %3d) shou = [ %d : %d ]" % ( row, col, left_shoulder, right_shoulder ) ) self.log.debug ( "(%d, %3d) peak = %s" % ( row, col, str ( peak ) ) ) self.log.debug ( "(%d, %3d) pcen = %f" % ( row, col, peak_center_of_mass ) ) self.log.debug ( "(%d, %3d) rcen = %f" % ( row, col, rolled_center_of_mass ) ) self.log.debug ( "(%d, %3d) ucen = %f" % ( row, col, unrolled_center_of_mass ) ) self.log.debug ( "(%d, %3d) mcen = %f" % ( row, col, moduled_center_of_mass ) ) self.log.debug ( "(%d, %d) barycenter detected at %.3f using %.1f%% of the spectrum." % ( row, col, moduled_center_of_mass, peak.shape [ 0 ] / self.__array.shape [ 0 ] * 100 ) ) self.log.debug ( "barycenter.shape = %s" % str ( barycenter.shape ) ) return barycenter
[docs] def get_center_of_mass ( self, peak ): """ This method's goal is to calculate the center-of-mass of an array, assuming as the distance one "unit" of distance per column. Parameters: * peak: numpy.ndarray One dimensional data (spectra) corresponding to 1 pixel's "depth" in the cube. Returns: * center_of_mass : float The value of the "pseudo channel" where the center of mass resider, for this spectra. """ total_mass = numpy.sum ( peak ) if ( total_mass == 0 or peak == [ ] ): return 0 #mask = numpy.mgrid [ 1 : peak.shape [ 0 ] + 1 ] weighted_mass = self.mask [ : peak.shape [ 0 ] ] * peak center_of_mass = numpy.sum ( weighted_mass ) / total_mass return center_of_mass
[docs]class barycenter_fast ( threading.Thread ): """ This class' responsibility is to generate and store barycenter maps from spectral cubes. Its algorithm uses geometric considerations to determine where is the center of mass for each spectrum, and is generally much faster than the algorithm using fitted data, with very similar numerical results. However, some pixelation is observed, when using this algorithm instead of the "slow" one. It inherits from the :ref:`threading_label`.Thread class, and it auto-starts its thread execution. Clients are expected to use its .join ( ) method before using its results. Its constructor expects the following parameters: * data : can Containing the interferograph data. """ def __init__ ( self, data ): self.log = logging.getLogger ( __name__ ) self.log.setLevel ( logging.INFO ) super ( self.__class__, self ).__init__ ( ) self.data = data self.__array = self.data.array self.__number_of_spectral_regions_array = None self.__photon_counts_array = None self.__number_spectral_peaks_array = None self.result = None self.start ( )
[docs] def run ( self ): """ This method's goal is to execute the main algorithm, when called by this object's threading.Thread.start ( ) method. """ start = time.time ( ) result = self.create_barycenter_using_peak ( ) self.log.debug ( "result.shape = %s" % str ( result.shape ) ) self.result = tuna.io.can ( array = result ) self.log.debug ( "self.result.array.shape = %s" % str ( self.result.array.shape ) ) self.log.debug ( "Barycenter detection took %ds." % ( time.time ( ) - start ) )
[docs] def create_barycenter_using_peak ( self ): """ This method's goal is to generate the barycenter map using the peak method. Will find the center of mass of each spectrum, using the shoulder-to-shoulder peak channels as the relevant signals. To understand what is meant by this, consider the profile of a pixel has the following appearance:: _ _ _/ \_/ \ __ / \ _ \ / \__/ \ / \__/ \__/ 01234567890123456789012345678 1111111111222222222 The FWHH channels would encompass the channels 8 to 17. But we want more signal, so we extend the channel interval until the first channel that would belong to another "peak", so the channels considered for the barycenter are actually the channels 4 to 20, inclusive. Once this "peak" is found, the center of mass for it is calculated, and the result is mapped onto the complete spectra, which yields the barycenter for that spectrum. Returns: * barycenter_array : numpy.ndarray Containing the barycenter map as calculated above, for each pixel in the input data. """ if self.__array.ndim != 3: return barycenter_array = numpy.ndarray ( shape = ( self.__array.shape [ 1 ], self.__array.shape [ 2 ] ) ) # Linear algebra to produce the barycenter: # All calculations are made using arrays that are created with index 0 containing values in reference to the "leftmost" channel of the spectral range being considered. # In the end, this shifted result is shifted to the data position by adding the index for this "leftmos" channel. # The barycenter being the ratio of weighted-by-distance mass and mass, two auxiliary arrays are created. Both start as zero-valued arrays with the same size as the profile. # The "multipliers" gets the integer sequence 1, 2, 3, ... starting from the leftmost channel and ending in the rightmost channel of the spectral ROI. # The "shoulder_mask" gets the integer 1 at every index between the leftmost and rightmost channels in the spectral ROI. # The weighted-by-distance mass value is the profile times the multipliers array; # The total mass value is the profile times the shoulder_mask. self.log.debug ( "Barycenter 0% done." ) last_percentage_logged = 0 for row in range ( self.__array.shape [ 1 ] ): percentage = 10 * int ( row / self.__array.shape [ 1 ] * 10 ) if percentage > last_percentage_logged: self.log.debug ( "Barycenter %d%% done." % ( percentage ) ) last_percentage_logged = percentage for col in range ( self.__array.shape [ 2 ] ): profile = self.__array[:,row,col] shoulder = self.get_fwhh ( profile = profile ) shoulder_mask = numpy.zeros ( shape = ( self.__array.shape [ 0 ] ) ) multipliers = numpy.zeros ( shape = ( self.__array.shape [ 0 ] ) ) cursor = shoulder['i_left_shoulder'] multiplier = 1 multipliers[cursor] = multiplier shoulder_mask[cursor] = 1 cursor = self.get_right_channel ( channel = cursor, profile = profile ) while ( cursor != self.get_right_channel ( channel = shoulder [ 'i_right_shoulder' ], profile = profile ) ): multiplier += 1 multipliers[cursor] = multiplier shoulder_mask[cursor] = 1 cursor = self.get_right_channel ( channel = cursor, profile = profile ) weighted_photon_count = multipliers * profile weighted_sum = numpy.sum ( weighted_photon_count ) shoulder_photon_count = shoulder_mask * profile shoulder_photon_count_sum = numpy.sum ( shoulder_photon_count ) if shoulder_photon_count_sum == 0: self.log.debug ( "At ( %d, %d ), shoulder_photon_count_sum == 0" % ( row, col ) ) self.print_fwhh ( shoulder ) center_of_mass = 0 else: center_of_mass = weighted_sum / shoulder_photon_count_sum shifted_center_of_mass = shoulder['i_left_shoulder'] - 1 + center_of_mass if shifted_center_of_mass != self.__array.shape [ 0 ]: ordered_shifted_center_of_mass = shifted_center_of_mass % ( self.__array.shape [ 0 ] ) barycenter_array[row][col] = ordered_shifted_center_of_mass self.log.info ( "Barycenter done." ) return barycenter_array
[docs] def get_fwhh ( self, profile = numpy.ndarray ): """ This method's goal is to map "geometrical" features of the input spectra. The features are the indices of the FWHH channels in the profile, and the relevant data for FWHH calculation. This is done using auxiliary methods to get the next channel to the right or left, wrapping around the end of the array as appropriate. First the FWHH values are found, by searching the first channel that has the maximal value in the profile, and adding channels left and right while their values are greater or equal to the half height of the maximal channel. Then this region is increased in both directions as long as the channels adjacent to the current spectral ROI have values smaller or equal to the "border" channels of the ROI. This extends the FWHH region to the "base" of the peak set that contains the FWHH. Parameters: * profile : numpy.ndarray Containing the spectrum to be mapped. Returns: * fwhh_dict : dictionary Containing the following entries:: max_height : The value contained in the channel with maximum signal. half_height : Half of max_height. max_height_index : The index of the channel with maximum signal. leftmost_hh : The "leftmost" channel that has signal >= half_height. rightmost_hh : The "rightmost" channel that has signal >= half_height. profile_indices : All indices of the spectra that are contained in the "half height peak". i_left_shoulder : The index of the leftmost channel of the peak. i_right_shoulder : The index of the rightmost channel of the peak. il_shoulder_indices : All indices of the spectra that are contained in the peak. """ max_height = numpy.amax ( profile ) max_height_index = numpy.argmax ( profile ) half_height = max_height / 2 leftmost_hh = self.get_left_channel ( max_height_index ) while ( profile[leftmost_hh] >= half_height and leftmost_hh != max_height_index ): leftmost_hh = self.get_left_channel ( channel = leftmost_hh, profile = profile ) leftmost_hh = self.get_right_channel ( channel = leftmost_hh, profile = profile ) rightmost_hh = self.get_right_channel ( max_height_index ) while ( profile[rightmost_hh] >= half_height and rightmost_hh != max_height_index ): rightmost_hh = self.get_right_channel ( channel = rightmost_hh, profile = profile ) rightmost_hh = self.get_left_channel ( channel = rightmost_hh, profile = profile ) fwhh_profile_indices = [ leftmost_hh, max_height_index, rightmost_hh ] cursor = leftmost_hh while ( cursor != rightmost_hh ): if cursor not in fwhh_profile_indices: fwhh_profile_indices.append ( cursor ) cursor = self.get_right_channel ( cursor ) # Using only the FWHH channels causes pixelation of the phase map, # therefore the whole peak is used, starting with the FWHH. left_shoulder = leftmost_hh while ( ( profile [ self.get_left_channel ( left_shoulder ) ] - \ profile [ left_shoulder ] <= 0 ) and ( self.get_left_channel ( left_shoulder ) != rightmost_hh ) ): left_shoulder = self.get_left_channel ( left_shoulder ) right_shoulder = rightmost_hh while ( ( profile [ self.get_right_channel ( right_shoulder ) ] - \ profile [ right_shoulder ] <= 0 ) and ( self.get_right_channel ( right_shoulder ) != left_shoulder ) ): right_shoulder = self.get_right_channel ( right_shoulder ) shoulder_indices = [ left_shoulder ] cursor = left_shoulder while ( cursor != right_shoulder ): if cursor not in shoulder_indices: shoulder_indices.append ( cursor ) cursor = self.get_right_channel ( cursor ) fwhh_dict = { } fwhh_dict['max_height'] = max_height fwhh_dict['half_height'] = half_height fwhh_dict['max_height_index'] = max_height_index fwhh_dict['leftmost_hh'] = leftmost_hh fwhh_dict['rightmost_hh'] = rightmost_hh fwhh_dict['profile_indices'] = fwhh_profile_indices fwhh_dict['i_left_shoulder'] = left_shoulder fwhh_dict['i_right_shoulder'] = right_shoulder fwhh_dict['il_shoulder_indices'] = shoulder_indices return fwhh_dict
[docs] def get_left_channel ( self, channel = int, profile = numpy.ndarray ): """ This method's goal is to find the next channel 'to the left' in the profile. If the channel is the leftmost channel in the profile, consider the last channel as its left neighbour. Parameters: * channel : int The channel for which we want to find the left "neighbour". * profile : numpy.ndarray The unidimensional profile. Returns: * unnamed variable : integer The index of the channel to the "left" of the input channel. """ if channel == 0: return self.__array.shape [ 0 ] - 1 else: return channel - 1
[docs] def get_right_channel ( self, channel = int, profile = numpy.ndarray ): """ This method's goal is to find the next channel 'to the right' in the profile. If the channel is the last channel in the profile, consider the first channel as its right neighbour. Parameters: * channel : int The channel for which we want to find the right "neighbour". * profile : numpy.ndarray The unidimensional profile. Returns: * unnamed variable : integer The index of the channel to the "right" of the input channel. """ if ( channel == self.__array.shape [ 0 ] - 1 ): return 0 else: return channel + 1
[docs] def print_fwhh ( self, fwhh_dict ): """ This method's goal is to output to the current logger, with debug priority, the values contained in the fwhh_dict parameter. Parameters: * fwhh_dict : dictionary Structure with the same fields as specified in the self.get_fwhh ( ) method. """ self.log.debug ( "max_height = %d" % fwhh_dict['max_height'] ) self.log.debug ( "half_height = %d" % fwhh_dict['half_height'] ) self.log.debug ( "max_height_index = %d" % fwhh_dict['max_height_index'] ) self.log.debug ( "leftmost_hh = %d" % fwhh_dict['leftmost_hh'] ) self.log.debug ( "rightmost_hh = %d" % fwhh_dict['rightmost_hh'] ) self.log.debug ( "profile_indices = %s" % str ( fwhh_dict['profile_indices'] ) ) self.log.debug ( "i_left_shoulder = %d" % fwhh_dict['i_left_shoulder'] ) self.log.debug ( "i_right_shoulder = %d" % fwhh_dict['i_right_shoulder'] ) self.log.debug ( "il_shoulder_indices = %s" % str ( fwhh_dict['il_shoulder_indices'] ) )
[docs]def barycenter_geometry ( data_can : tuna.io.can ) -> tuna.io.can: """ This function's goal is to conveniently return an array of the barycenter position for each pixel of the input. Arguments: * data_can : tuna.io.can Should contain a 3D cube of raw Fabry-Pérot data. Returns: * tuna.io.can Containing a 2D array of floats, where each point is the barycenter of the respective spectrum on the input data. """ detector = barycenter_fast ( data = data_can ) detector.join ( ) return detector.result
[docs]def barycenter_polynomial_fit ( data_can : tuna.io.can ) -> tuna.io.can: """ This function's goal is to conveniently return an array of the barycenter position for each pixel of the input. Arguments: * data_can : tuna.io.can Should contain a 3D cube of raw Fabry-Pérot data. Returns: * tuna.io.can Containing a 2D array of floats, where each point is the barycenter of the respective spectrum on the input data. """ detector = barycenter_detector ( data = data_can ) detector.join ( ) return detector.result