Source code for tuna.models.airy

"""
This module's scope is the modeling and fitting of Airy functions to data.
"""

__version__ = "0.1.0"
__changelog__ = { "Tuna" : "0.15.0", "Change" : "Added wrapper function fit_airy" }

import copy
import logging
import math
try:
    import mpyfit
except ImportError as e:
    print ( "Could not import mpyfit. Exception: {}".format ( e ) )
import numpy
import threading
import time
import tuna

[docs]def airy_plane ( b_ratio = 9.e-6, center_col = 256, center_row = 256, continuum = 1, finesse = 15, gap = 250, intensity = 100, shape_cols = 512, shape_rows = 512, wavelength = 0.6563 ): """ This function's goal is to model the Airy function as: .. math:: I = C + \dfrac{I_0}{1 + \dfrac{4F^2}{\pi^2} \sin^2 (\phi/2)} From the input parameters, this function will return a dataset with the value for I calculated on each point. The intention is to produce a tridimensional array where each plane is an image with rings equivalent to the ones in Fabry-Pérot interferograms. To calculate phi: .. math:: \phi = 2 \pi \dfrac{ 2 n e_i \cos (theta)} {\lambda_c } Where: .. math:: n e_i = n ( e + i \delta_e ) theta = tan^{-1} ( b r ), b = s / f Parameters: (In parentheses, the equation variable the paremeter corresponds to) * b_ratio : float : defaults to 9.e-6 The ratio between the pixel size and the camera focal length (b). * center_col : integer : defaults to 256 The column for the center of the rings, in pixels. * center_row : integer : defaults to 256 The row for the center of the rings, in pixels. * continuum : float : defaults to 1 The value of the background intensity. * finesse : float : defaults to 15 (F). * gap : float : defaults to 250 The distance between the étalons, in microns (n e_i). * intensity : float : defaults to 100 The beam maximum intensity (I_0). * shape_cols : integer : defaults to 512 The number of columns for the 2D shape (columns, rows) to be generated. * shape_rows : integer : defaults to 512 The number of rows for the 2D shape (columns, rows) to be generated. * wavelength : float : defaults to 0.6563 Wavelength in microns ( lambda_c ). Returns: * result : numpy.ndarray Contains the data for the Airy model, given the input parameters. """ log = logging.getLogger ( __name__ ) log.debug ( "airy_plane" ) log.debug ( "b_ratio = %f," " center = (%d, %d)," " continuum = %f," " finesse = %f," " gap = %f," " intensity = %f," " shape = (%d, %d)" " wavelength = %f." % ( b_ratio, center_col, center_row, continuum, finesse, gap, intensity, shape_cols, shape_rows, wavelength ) ) indices_rows, indices_cols = numpy.indices ( ( shape_cols, shape_rows ) ) distances = numpy.sqrt ( ( indices_rows - center_col ) ** 2 + ( indices_cols - center_row ) ** 2 ) log.debug ( "distances [ 0 ] [ 0 ] = %s" % str ( distances [ 0 ] [ 0 ] ) ) log.debug ( "Calculating airy_function_I" ) airy_function_I = 4.0 * finesse ** 2 / numpy.pi ** 2 phase = 2.0 * numpy.pi * gap / ( wavelength * numpy.sqrt ( 1 + b_ratio ** 2 * distances ** 2 ) ) log.debug ( "phase [ 0 ] [ 0 ] = %s" % str ( phase [ 0 ] [ 0 ] ) ) result = continuum + intensity / ( 1. + airy_function_I * numpy.sin ( phase ) ** 2 ) log.debug ( "result [ 0 ] [ 0 ] = %s" % str ( result [ 0] [ 0 ] ) ) log.debug ( "/airy_plane" ) return result
[docs]def least_mpyfit ( p, args ): """ This function's goal is to wrap the airy function proper, so that the API for using mpyfit is obeyed. Parameters: * p : tuple Containing the b_ratio, center_col, center_row, continuum, finesse, gap and intensity values. * args : tuple Containing the shape_cols, shape_rows, wavelength, data and flat values. """ log = logging.getLogger ( __name__ ) log.debug ( "least_mpyfit" ) b_ratio, center_col, center_row, continuum, finesse, gap, intensity = p shape_cols, shape_rows, wavelength, data, flat = args try: plane = airy_plane ( b_ratio, center_col, center_row, continuum, finesse, gap, intensity, shape_cols, shape_rows, wavelength ) #log.debug ( "plane = %s" % str ( plane ) ) log.debug ( "plane.shape = {}, data.shape = {}".format ( plane.shape, data.shape ) ) except Exception as e: print ( "Exception: %s" % str ( e ) ) try: residue = ( plane - data ) * flat except Exception as e: log.error ( tuna.console.output_exception ( e ) ) #log.debug ( "residue = %s" % str ( residue ) ) log.debug ( "numpy.sum ( residue ) = %f" % numpy.sum ( residue ) ) log.debug ( "/least_mpyfit" ) return ( residue.flatten ( ) )
[docs]class airy_fitter ( threading.Thread ): """ This class' responsibility is to fit the Airy function. It will use the given parameters as initial guesses. The fit will stop when the fitter converges. It inherits from the :ref:`threading_label`.Thread class, and it auto-starts its thread execution. Clients are expected to use its .join ( ) method before using its results. Its constructor signature is: Parameters: * b_ratio : float The ratio between the pixel size and the camera focal length (b). * center_col : integer The column for the center of the rings, in pixels. * center_row : integer The row for the center of the rings, in pixels. * data : numpy.ndarray Contains the data to be fitted. * finesse : float (F). * gap : float The distance between the étalons, in microns (n e_i). * wavelength : float Wavelength in microns ( lambda_c ). * mpyfit_parinfo : list : defaults to [ ] List of parameters' boundaries, and whether they are fixed or not. Must respect mpyfit's specification. """ def __init__ ( self, b_ratio, center_col, center_row, data, finesse, gap, wavelength, mpyfit_parinfo = [ ] ): self.log = logging.getLogger ( __name__ ) super ( self.__class__, self ).__init__ ( ) self.__version__ = "0.1.5" self.changelog = { "0.1.5" : "Tuna 0.14.0 : updated docstrings to new style.", "0.1.4" : "Added docstrings.", "0.1.3" : "Tweaked xtol to 1e-6, works on some tests.", "0.1.2" : "Changedx xtol from 1e-10 to 1e-5 to improve speed.", "0.1.1" : "Refactored algorithm for getting lowest percentile into another module.", "0.1.0" : "Initial changelog." } self.b_ratio = b_ratio self.center_col = center_col self.center_row = center_row self.data = data self.log.debug ( "self.data.shape = {}".format ( self.data.shape ) ) self.finesse = finesse self.gap = gap self.mpyfit_parinfo = mpyfit_parinfo self.shape_cols = self.data.shape [ 0 ] self.shape_rows = self.data.shape [ 1 ] self.wavelength = wavelength self.fit = None self.parameters = None self.start ( )
[docs] def run ( self ): """ This method's goal is to fit an Airy model to a given input. """ start = time.time ( ) lower_percentile = tuna.tools.find_lowest_nonnull_percentile ( self.data ) upper_percentile = 99 self.log.debug ( "%d-percentile = %f, %d-percentile = %f." % ( lower_percentile, numpy.percentile ( self.data, lower_percentile ), upper_percentile, numpy.percentile ( self.data, upper_percentile ) ) ) intensity = numpy.percentile ( self.data, upper_percentile ) - numpy.percentile ( self.data, lower_percentile ) self.log.debug ( "percentile difference = %f" % intensity ) finesse_factor = 4.0 * self.finesse ** 2 / numpy.pi ** 2 finesse_intensity_factor = ( 1 + finesse_factor ) / finesse_factor self.log.debug ( "finesse_intensity_factor = ( 1 + F ) / F = %f" % finesse_intensity_factor ) intensity *= finesse_intensity_factor self.log.debug ( "intensity = %f" % intensity ) continuum = abs ( numpy.percentile ( self.data, upper_percentile ) - intensity ) self.log.debug ( "guesses:\n" "\tb_ratio = {:e},\n" "\tcenter = ( {}, {} )\n" "\tcontinuum = {:e}\n" "\tfinesse = {:e}\n" "\tinitial gap = {:e}\n" "\tintensity = {:e}".format ( self.b_ratio, self.center_col, self.center_row, continuum, self.finesse, self.gap, intensity ) ) parameters = ( self.b_ratio, self.center_col, self.center_row, continuum, self.finesse, self.gap, intensity ) # Constraints on parameters if self.mpyfit_parinfo == [ ]: parinfo = [ ] parbase = { 'fixed' : False, 'limits' : ( self.b_ratio * 0.9, self.b_ratio * 1.1 ) } parinfo.append ( parbase ) parbase = { 'fixed' : False, 'limits' : ( self.center_col - 5, self.center_col + 5 ) } parinfo.append ( parbase ) parbase = { 'fixed' : False, 'limits' : ( self.center_row - 5, self.center_row + 5 ) } parinfo.append ( parbase ) parbase = { 'fixed' : False, 'limits' : ( continuum * 0.9, continuum * 1.1 ) } parinfo.append ( parbase ) parbase = { 'fixed' : False, 'limits' : ( self.finesse * 0.95, self.finesse * 1.05 ) } parinfo.append ( parbase ) parbase = { 'fixed' : False, 'limits' : ( self.gap - self.wavelength / 4., self.gap + self.wavelength / 4. ) } parinfo.append ( parbase ) parbase = { 'fixed' : False, 'limits' : ( intensity * 0.9, intensity * 1.1 ) } parinfo.append ( parbase ) else: parinfo = [ ] parbase = self.mpyfit_parinfo [ 0 ] parinfo.append ( parbase ) parbase = self.mpyfit_parinfo [ 1 ] parinfo.append ( parbase ) parbase = self.mpyfit_parinfo [ 2 ] parinfo.append ( parbase ) parbase = { 'fixed' : self.mpyfit_parinfo [ 3 ] [ 'fixed' ], 'limits' : ( continuum * 0.9, continuum * 1.1 ) } parinfo.append ( parbase ) parbase = self.mpyfit_parinfo [ 4 ] parinfo.append ( parbase ) parbase = self.mpyfit_parinfo [ 5 ] parinfo.append ( parbase ) parbase = { 'fixed' : self.mpyfit_parinfo [ 6 ] [ 'fixed' ], 'limits' : ( intensity * 0.9, intensity * 1.1 ) } parinfo.append ( parbase ) for entry in parinfo: self.log.debug ( "parinfo = %s" % str ( entry ) ) flat = 1 self.log.debug ( "run ( )" ) try: self.log.debug ( "parameters = %s" % str ( parameters ) ) fit_parameters, fit_result = mpyfit.fit ( least_mpyfit, parameters, args = ( self.shape_cols, self.shape_rows, self.wavelength, self.data, flat ), parinfo = parinfo, xtol = 1e-7 ) #stepfactor = 10 ) except Exception as e: self.log.error ( tuna.console.output_exception ( e ) ) self.log.error ( "Error was using parameters = {}".format ( parameters ) ) raise ( e ) self.log.debug ( "fit_result [ 'bestnorm' ] = %s" % str ( fit_result [ 'bestnorm' ] ) ) non_spammy_results = copy.copy ( fit_result ) del ( non_spammy_results [ 'covariances' ] ) for key in non_spammy_results.keys ( ): self.log.debug ( "fit_result [ {} ] = {}".format ( key, non_spammy_results [ key ] ) ) self.log.debug ( "results:\n" "\tb_ratio = {:e},\n" "\tcenter = ( {}, {} )\n" "\tcontinuum = {:e}\n" "\tfinesse = {:e}\n" "\tinitial gap = {:e}\n" "\tintensity = {:e}".format ( fit_parameters [ 0 ], fit_parameters [ 1 ], fit_parameters [ 2 ], fit_parameters [ 3 ], fit_parameters [ 4 ], fit_parameters [ 5 ], fit_parameters [ 6 ] ) ) self.log.debug ( "Airy fit took %ds." % ( time.time ( ) - start ) ) self.fit = tuna.io.can ( airy_plane ( fit_parameters [ 0 ], fit_parameters [ 1 ], fit_parameters [ 2 ], fit_parameters [ 3 ], fit_parameters [ 4 ], fit_parameters [ 5 ], fit_parameters [ 6 ], self.shape_cols, self.shape_rows, self.wavelength ) ) self.parameters = fit_parameters
[docs]def fit_airy ( b_ratio : float, center_col : float, center_row : float, data : numpy.ndarray, finesse : float, gap : float, wavelength : float, mpyfit_parinfo : list = [ ] ) -> ( dict, tuna.io.can ): """ This method's goal is to conveniently fit the Airy function. It will use the given parameters as initial guesses. The fit will stop when the fitter converges. It inherits from the :ref:`threading_label`.Thread class, and it auto-starts its thread execution. Clients are expected to use its .join ( ) method before using its results. Arguments: * b_ratio : float The ratio between the pixel size and the camera focal length (b). * center_col : float The column for the center of the rings, in pixels. * center_row : float The row for the center of the rings, in pixels. * data : numpy.ndarray Contains the data to be fitted. * finesse : float (F). * gap : float The distance between the étalons, in microns (n e_i). * wavelength : float Wavelength in microns ( lambda_c ). * mpyfit_parinfo : list : defaults to [ ] List of parameters' boundaries, and whether they are fixed or not. Must respect mpyfit's specification. """ fitter = airy_fitter ( b_ratio, center_col, center_row, data, finesse, gap, wavelength, mpyfit_parinfo ) fitter.join ( ) return ( fitter.parameters, fitter.fit )