Source code for tuna.tools.spectral_rings_fitter

# -*- coding: utf-8 -*-
"""
This module's scope is to find rings within tridimensional Fabry-Pérot spectrographs.

Example::

    >>> import tuna
    >>> raw = tuna.io.read ( "tuna/test/unit/unit_io/adhoc.ad3" )
    >>> rings = tuna.plugins.run ( "Ring center finder" ) ( data = raw )
    >>> sorted ( list ( rings.keys ( ) ) )
    ['concentric_rings', 'construction', 'gradient', 'gradients', 'lower_percentile_regions', 'ridge', 'ring_fit', 'ring_fit_parameters', 'ring_pixel_sets', 'rings', 'upper_percentile_regions']
    >>> rings [ 'rings' ]
    [(218.56306556317449, 256.97877557329144, 231.82699113784105, [0]), (219.14654183804043, 254.87497666726719, 110.1292761603854, [1]), (190.48575616356123, 248.81898301262672, 338.64377512691351, [2, 3])]
"""

import copy
import IPython
import logging
import math
try:
    import mpyfit
except ImportError:
    print ( "Could not import mpyfit. Please install this library before running Tuna." )
import numpy
import random
import sympy
import time
import tuna

[docs]class rings_finder ( object ): """ This class' responsibility is to find all rings contained in a data cube. Its constructor expects the following parameters: * array : numpy.ndarray Containing the data where ring structures are to be found. * plane : integer The index for the plane where to search for the ring. * ipython : object A reference to a ipython object, so plots produced by this module won't suppress previous plots. * plot_log : boolean If set to True, will spawn plots of intermediary products, as they are produced. """ def __init__ ( self, array, plane, ipython, plot_log ): self.log = logging.getLogger ( __name__ ) self.log.setLevel ( logging.DEBUG ) self.plot_log = plot_log self.ipython = ipython if self.plot_log: if self.ipython == None: self.ipython = IPython.get_ipython() self.ipython.magic("matplotlib qt") self.chosen_plane = plane self.ridge_threshold = 6 self.array = array self.result = { }
[docs] def execute ( self ): """ This method's goal is to run the main algorithm: #. segment the image in ring and non-ring regions. #. Create a separate array for each individual ring. #. Calculate the center and the average radius. """ start_time = time.time ( ) self.segment ( ) # self.result [ 'ridge' ] contains this subresult. segment_time = time.time ( ) self.separate_rings ( ) # self.result [ 'ring_pixel_sets' ] contains this subresult separate_time = time.time ( ) self.fit_circles ( ) fit_time = time.time ( ) self.aggregate_fits ( ) aggregate_time = time.time ( ) self.aggregate_concentric_rings ( ) concentric_time = time.time ( ) self.log.debug ( "rings_finder finished. segment() {:.1f}s, separate_rings() {:.1f}s, fit_circles() {:.1f}s, aggregate_fits() {:.1f}s, aggregate_concentric_rings() {:.1f}s..".format ( segment_time - start_time, separate_time - segment_time, fit_time - separate_time, aggregate_time - fit_time, concentric_time - aggregate_time ) ) try: self.log.info ( "Concentric rings structure: {}".format ( self.result [ "concentric_rings" ] ) ) except: pass
[docs] def segment ( self ): """ This method's goal is to segment the image in ring and non-ring regions. This is accomplished applying the numpy.gradient method on the input array; and then obtaining the lower percentile region (the regions in the data where the gradient is minimal) and the higher percentile region. The very "middle" of each ring will consist of the pixels where the gradient is in the process of changing sign; and therefore, finding these pixels (the "ridge"), is equivalent to finding the "middle" of the rings. """ if len ( self.array.shape ) != 3: self.log.info ( "This procedure expects a 3D numpy ndarray as input." ) gradients = numpy.gradient ( self.array ) if self.plot_log: tuna.tools.plot ( gradients [ 0 ] [ self.chosen_plane ], "gradients [ 0 ] [ {} ]".format ( self.chosen_plane ), self.ipython ) self.result [ 'gradients' ] = gradients gradient = gradients [ 0 ] [ self.chosen_plane ] self.result [ 'gradient' ] = gradient self.upper_percentile = self.find_upper_percentile ( gradient ) upper_percentile_value = numpy.percentile ( gradient, self.upper_percentile ) upper_dep_gradient = numpy.where ( gradient > upper_percentile_value, 1, 0 ) if self.plot_log: tuna.tools.plot ( upper_dep_gradient, "({}) upper_dep_gradient".format ( self.upper_percentile ), self.ipython ) self.result [ 'upper_percentile_regions' ] = upper_dep_gradient lower_percentile = tuna.tools.find_lowest_nonnull_percentile ( gradient ) lower_percentile_value = numpy.percentile ( gradient, lower_percentile ) lower_percentile_regions = numpy.where ( gradient < lower_percentile_value, 1, 0 ) if self.plot_log: tuna.tools.plot ( lower_percentile_regions, "({}) lower_percentile_regions".format ( lower_percentile ), self.ipython ) self.result [ 'lower_percentile_regions' ] = lower_percentile_regions ridge = numpy.zeros ( shape = gradient.shape ) for col in range ( ridge.shape [ 0 ] ): for row in range ( ridge.shape [ 1 ] ): ridge [ col ] [ row ] = self.ridgeness ( upper_dep_gradient, lower_percentile_regions, col, row, threshold = self.ridge_threshold ) if self.plot_log: tuna.tools.plot ( ridge, "ridge ", self.ipython ) self.log.debug ( "ridge found" ) self.result [ 'ridge' ] = ridge
[docs] def ridgeness ( self, upper, lower, col, row, threshold = 1 ): """ Returns 1 if point at col, row has threshold neighbours that is 1 in both upper and lower arrays. Parameters: * upper : numpy.ndarray Should contain the array with a higher percentile gradient region. * lower : numpy.ndarray Should contain the array with a lower percentile gradient region. * col : integer The column where the pixel's ridgeness is to be assessed. * row : integer The row where the pixel's ridgeness is to be assessed. * threshold : integer : 1 The minimum number of neighbours in each region that a pixel must possess to be considered a "ridge" pixel. Returns: * unnamed variable : integer Will be 0 for pixels in the "ridge" and 0 otherwise. """ neighbours = tuna.tools.get_pixel_neighbours ( ( col, row ), upper, distance_threshold = 2 ) upper_neighbour = 0 lower_neighbour = 0 for neighbour in neighbours: if ( upper_neighbour >= threshold and lower_neighbour >= threshold ): return 1 if upper [ neighbour [ 0 ] ] [ neighbour [ 1 ] ] == 1: upper_neighbour += 1 if lower [ neighbour [ 0 ] ] [ neighbour [ 1 ] ] == 1: lower_neighbour += 1 if ( upper_neighbour >= threshold and lower_neighbour >= threshold ): return 1 return 0
[docs] def fit_circles ( self ): """ This method's goal is to fit circles to the data. """ count = 0 for pixel_set_tuple in self.result [ 'ring_pixel_sets' ]: pixel_set = pixel_set_tuple [ 0 ] self.log.debug ( "attempting to fit set {}".format ( count ) ) start = time.time ( ) minimal_point, minimal_radius = self.construct_ring_center ( pixel_set ) self.log.debug ( "constructing_center took {:.1f}s".format ( time.time ( ) - start ) ) if minimal_point == None or minimal_radius == None: self.log.error ( "Could not find center or radius." ) minimal_point = ( 0, 0 ) minimal_radius = 1 ring_parameters, ring_fit = fit_circle ( minimal_point [ 0 ], minimal_point [ 1 ], minimal_radius, pixel_set, circle, ridge_thickness = pixel_set_tuple [ 1 ] ) self.log.debug ( "ring_parameters {} = {}".format ( count, ring_parameters ) ) if self.plot_log: tuna.tools.plot ( ring_fit, "ring fit {}".format ( count ), self.ipython ) tuna.tools.plot ( pixel_set - ring_fit, "pixel_set {} - ring_fit {}".format ( count, count ), self.ipython ) tuna.tools.plot ( self.result [ 'construction' ] [ count ], "construction {}".format ( count ), self.ipython ) try: self.result [ 'ring_fit' ].append ( ring_fit ) except KeyError: self.result [ 'ring_fit' ] = [ ring_fit ] try: self.result [ 'ring_fit_parameters' ].append ( ring_parameters ) except KeyError: self.result [ 'ring_fit_parameters' ] = [ ring_parameters ] count += 1 self.log.debug ( "All sets fitted." )
[docs] def aggregate_fits ( self ): """ This method's goal is to coalesce all circles fitted in the minimum number of geometric entities. After fitting circles to pixel sets, some fits will correspond to the same ring. These must be aggregated into single results. """ if 'ring_fit' not in list ( self.result.keys ( ) ): return known_centers = [ ] for pixel_set_index in range ( len ( self.result [ 'ring_fit' ] ) ): distance_threshold = min ( self.result [ 'ring_fit' ] [ pixel_set_index ].shape [ 0 ], self.result [ 'ring_fit' ] [ pixel_set_index ].shape [ 1 ] ) * 0.1 center_is_unknown = True for structure in known_centers: center = ( structure [ 0 ], structure [ 1 ] ) center_fit_index = structure [ 2 ] [ 0 ] distance = tuna.tools.calculate_distance ( ( self.result [ 'ring_fit_parameters' ] [ pixel_set_index ] [ 0 ], self.result [ 'ring_fit_parameters' ] [ pixel_set_index ] [ 1 ] ), center ) if distance < distance_threshold: self.log.debug ( "ring_fit {} center close to ring_fit {} center".format ( pixel_set_index, center_fit_index ) ) if abs ( self.result [ 'ring_fit_parameters' ] [ pixel_set_index ] [ 2 ] - \ self.result [ 'ring_fit_parameters' ] [ center_fit_index ] [ 2 ] ) < distance_threshold: self.log.debug ( "ring_fit {} radius similar to ring_fit {} radius".format ( pixel_set_index, center_fit_index ) ) center_is_unknown = False structure [ 2 ].append ( pixel_set_index ) if center_is_unknown: known_centers.append ( [ self.result [ 'ring_fit_parameters' ] [ pixel_set_index ] [ 0 ], self.result [ 'ring_fit_parameters' ] [ pixel_set_index ] [ 1 ], [ pixel_set_index ] ] ) self.log.debug ( "known_centers = {}".format ( known_centers ) ) rings = [ ] for structure in known_centers: center_col = 0 center_row = 0 radius = 0 for entry in structure [ 2 ]: center_col += self.result [ 'ring_fit_parameters' ] [ entry ] [ 0 ] center_row += self.result [ 'ring_fit_parameters' ] [ entry ] [ 1 ] radius += self.result [ 'ring_fit_parameters' ] [ entry ] [ 2 ] center_col /= len ( structure [ 2 ] ) center_row /= len ( structure [ 2 ] ) radius /= len ( structure [ 2 ] ) try: self.result [ 'rings' ].append ( ( center_col, center_row, radius, structure [ 2 ] ) ) except KeyError: self.result [ 'rings' ] = [ ( center_col, center_row, radius, structure [ 2 ] ) ]
# at this point, aggregated all pixel_sets that share the same radius and center.
[docs] def aggregate_concentric_rings ( self ): """ This method's goal is to construct the geometric description of the concentric ring structure. The most abstract and relevant information is the following structure: ( center, radii ), where center is a tuple of floats and radii is a list of floats. Each spectrograph should have a single result of this kind, which is generated here and stored in self.concentric_rings. """ if "rings" not in list ( self.result.keys ( ) ): return self.log.debug ( "self.result [ 'rings' ] = {}".format ( self.result [ 'rings' ] ) ) distance_threshold = min ( self.result [ 'ring_fit' ] [ 0 ].shape [ 0 ], self.result [ 'ring_fit' ] [ 0 ].shape [ 1 ] ) * 0.01 concentric_rings = [ ] for structure in self.result [ 'rings' ]: struct_center = ( structure [ 0 ], structure [ 1 ] ) struct_fits = structure [ 3 ] struct_radius = structure [ 2 ] center_is_unknown = True for another in concentric_rings: another_fits = another [ 3 ] if another_fits [ 0 ] == struct_fits [ 0 ]: continue another_center = ( another [ 0 ], another [ 1 ] ) another_radius = another [ 2 ] distance = tuna.tools.calculate_distance ( struct_center, another_center ) if distance < distance_threshold: center_is_unknown = False another [ 3 ].append ( struct_fits [ 0 ] ) break if center_is_unknown: concentric_rings.append ( ( struct_center [ 0 ], struct_center [ 1 ], [ struct_radius ], [ struct_fits [ 0 ] ] ) ) self.log.debug ( "concentric_rings = {}".format ( concentric_rings ) ) # select best structure (with the most rings, with the most pixels ) max_num = 0 max_structure = None for structure in concentric_rings: num_fits = len ( structure [ 3 ] ) if num_fits > max_num: max_num = num_fits max_structure = structure self.log.debug ( "max_structure = {}".format ( max_structure ) ) # generate averaged result averaged_col = 0 averaged_row = 0 averaged_num = 0 radii = [ ] sets = [ ] for index in max_structure [ 3 ]: averaged_col += self.result [ 'ring_fit_parameters' ] [ index ] [ 0 ] averaged_row += self.result [ 'ring_fit_parameters' ] [ index ] [ 1 ] averaged_num += 1 radii.append ( self.result [ 'ring_fit_parameters' ] [ index ] [ 2 ] ) sets.append ( index ) averaged_col /= averaged_num averaged_row /= averaged_num averaged_concentric_rings = ( ( averaged_col, averaged_row ), radii, sets ) self.log.debug ( "averaged_concentric_rings = {}".format ( averaged_concentric_rings ) ) self.result [ 'concentric_rings' ] = averaged_concentric_rings
[docs] def construct_ring_center ( self, pixel_set ): """ This method's goal is to estimate the center and radius of a circle that is osculatory to the curve contained in the pixel_set. Parameters: * pixel_set : numpy.ndarray Bidimensional array, where each pixel has either a 0 or a 1 as value. Returns: * center : tuple of 2 floats Containing the column and row "coordinates" for the center. * radius : float The radius of this center. """ self.log.debug ( "construct_ring_center" ) construction = numpy.copy ( pixel_set ) self.log.debug ( "pixel_set copied" ) max_pair = self.find_max_pair ( pixel_set ) if not max_pair: self.log.error ( "Could not find a pixel pair!" ) return None, None self.log.debug ( "max_pair = {}".format ( max_pair ) ) color = 2 construction [ max_pair [ 0 ] [ 0 ] ] [ max_pair [ 0 ] [ 1 ] ] = color construction [ max_pair [ 1 ] [ 0 ] ] [ max_pair [ 1 ] [ 1 ] ] = color mid_point = ( ( max_pair [ 0 ] [ 0 ] + max_pair [ 1 ] [ 0 ] ) / 2, ( max_pair [ 0 ] [ 1 ] + max_pair [ 1 ] [ 1 ] ) / 2 ) sympy_mid_point = sympy.Point ( mid_point [ 0 ], mid_point [ 1 ] ) construction [ mid_point [ 0 ] ] [ mid_point [ 1 ] ] = color self.log.debug ( "mid_point = {}".format ( mid_point ) ) chord_extreme_1 = sympy.Point ( max_pair [ 0 ] [ 0 ], max_pair [ 0 ] [ 1 ] ) chord_extreme_2 = sympy.Point ( max_pair [ 1 ] [ 0 ], max_pair [ 1 ] [ 1 ] ) chord_line = sympy.Line ( chord_extreme_1, chord_extreme_2 ) self.plot_sympy_line ( construction, chord_line, color ) color = 3 concurrent_line = chord_line.perpendicular_line ( sympy_mid_point ) self.plot_sympy_line ( construction, concurrent_line, color ) color = 4 min_pixel, concurrent_extreme_min = self.find_min_pixel ( pixel_set, concurrent_line ) if min_pixel == None or concurrent_extreme_min == None: self.log.error ( "min_pixel == {}, concurrent_extreme_min = {}".format ( min_pixel, concurrent_extreme_min ) ) return None, None construction [ min_pixel [ 0 ] ] [ min_pixel [ 1 ] ] = color secondary_chord = sympy.Line ( chord_extreme_1, concurrent_extreme_min ) self.plot_sympy_line ( construction, secondary_chord, color ) secondary_angle = chord_line.angle_between ( secondary_chord ) self.log.debug ( "secondary_angle = {} radians".format ( secondary_angle ) ) color += 1 thertiary_chord = secondary_chord.perpendicular_line ( chord_extreme_1 ) self.plot_sympy_line ( construction, thertiary_chord, color ) color += 1 diameter_extremes_list = thertiary_chord.intersection ( concurrent_line ) if len ( diameter_extremes_list ) == 0: self.log.warning ( "Could not find intersection between thertiary_chord and concurrent_line. Will attempt to recursively find another set of segments, removing one of the points from current set." ) new_set = numpy.copy ( pixel_set ) new_set [ max_pair [ 0 ] [ 0 ] ] [ max_pair [ 0 ] [ 1 ] ] = 0 return self.construct_ring_center ( new_set ) diameter_extreme = diameter_extremes_list [ 0 ] center = ( ( sympy.N ( diameter_extreme.x ) + min_pixel [ 0 ] ) / 2, ( sympy.N ( diameter_extreme.y ) + min_pixel [ 1 ] ) / 2 ) radius = tuna.tools.calculate_distance ( center, min_pixel ) for neighbour in tuna.tools.get_pixel_neighbours ( ( round ( center [ 0 ] ), round ( center [ 1 ] ) ), construction ): construction [ neighbour [ 0 ] ] [ neighbour [ 1 ] ] = color try: self.result [ 'construction' ].append ( construction ) except KeyError: self.result [ 'construction' ] = [ construction ] return center, radius
[docs] def find_min_pixel ( self, pixel_set, line ): """ This method's goal is to find the pixel that has the minimal distance to the input line. Parameters: * line : sympy.Line The line regarding which we want to find the closest pixel in a set. * pixel_set : numpy.ndarray The set of pixels of which we want the one closest to the input line. Returns: * min_pixel : tuple of 2 integers Containing the column and row coordinates for the pixel in the input set that is closest to the input line. * concurrent_extreme_min : sympy.Point The point on the input line where is the projection of the pixel in the input pixel_set that is closest to the input line. """ intersection_points = [ ] for col in range ( pixel_set.shape [ 0 ] - 1 ): point_A = sympy.Point ( col, 0 ) point_B = sympy.Point ( col, pixel_set.shape [ 1 ] - 1 ) col_line = sympy.Line ( point_A, point_B ) try: intersection = line.intersection ( col_line ) [ 0 ] except: continue if intersection == None: continue if isinstance ( intersection, sympy.Point ): x = round ( sympy.N ( intersection.x ) ) y = round ( sympy.N ( intersection.y ) ) if x < 0 or x >= pixel_set.shape [ 0 ]: continue if y < 0 or y >= pixel_set.shape [ 1 ]: continue intersection_points.append ( ( x, y ) ) continue if isinstance ( intersection, sympy.Line ): for row in range ( pixel_set.shape [ 1 ] - 1 ): intersection_points.append ( ( col, row ) ) continue self.log.error ( "Intersection loop reached impossible condition!" ) self.log.debug ( "len ( intersection_points ) = {}".format ( len ( intersection_points ) ) ) pixel_set_intersections = [ ] for intersection in intersection_points: if pixel_set [ intersection [ 0 ] ] [ intersection [ 1 ] ] == 1: pixel_set_intersections.append ( intersection ) if len ( pixel_set_intersections ) == 0: self.log.debug ( "len ( pixel_set_intersections ) == 0, falling back to whole pixel_set" ) for col in range ( pixel_set.shape [ 0 ] ): for row in range ( pixel_set.shape [ 1 ] ): if pixel_set [ col ] [ row ] == 1: pixel_set_intersections.append ( ( col, row ) ) self.log.debug ( "len ( pixel_set_intersections ) = {}".format ( len ( pixel_set_intersections ) ) ) min_distance = float ( "inf" ) min_pixel = None concurrent_extreme_min = None for pixel in pixel_set_intersections: sympy_point = sympy.Point ( pixel [ 0 ], pixel [ 1 ] ) self.log.debug ( "sympy_point.x = {}, sympy_point.y = {}".format ( sympy.N ( sympy_point.x ), sympy.N ( sympy_point.y ) ) ) projection = line.projection ( sympy_point ) self.log.debug ( "projection.x = {}, projection.y = {}".format ( sympy.N ( projection.x ), sympy.N ( projection.y ) ) ) distance = tuna.tools.calculate_distance ( ( round ( pixel [ 0 ] ), round ( pixel [ 1 ] ) ), ( sympy.N ( projection.x ), sympy.N ( projection.y ) ) ) self.log.debug ( "distance = {}".format ( distance ) ) if distance <= min_distance: min_distance = distance min_pixel = ( round ( pixel [ 0 ] ), round ( pixel [ 1 ] ) ) concurrent_extreme_min = ( sympy.N ( projection.x ), sympy.N ( projection.y ) ) self.log.debug ( "current min_distance = {}, min_pixel = {}, concurrent_extreme_min = {}".format ( min_distance, min_pixel, concurrent_extreme_min ) ) self.log.debug ( "final min_distance = {}, min_pixel = {}, concurrent_extreme_min = {}".format ( min_distance, min_pixel, concurrent_extreme_min ) ) return ( min_pixel, concurrent_extreme_min )
[docs] def find_max_pair ( self, pixel_set ): """ This method's goal is to find the two pixels in the input set that are maximally distant to each other. Parameters: * pixel_set : numpy.ndarray An array where zeros indicate lack of points, and ones indicate presence of points. Returns: * max_pair : tuple of 2 tuples, of 2 integers each Contains the coordinates of the two points maximally distant. """ pixels = [ ] for col in range ( pixel_set.shape [ 0 ] ): for row in range ( pixel_set.shape [ 1 ] ): if pixel_set [ col ] [ row ] == 1: pixels.append ( ( col, row ) ) self.log.debug ( "pixels list populated with {} pixels".format ( len ( pixels ) ) ) if len ( pixels ) > 5000: old_pixels = copy.copy ( pixels ) pixels = [ ] for count in range ( 5000 ): index = random.randint ( 0, len ( old_pixels ) - 1 ) pixels.append ( old_pixels [ index ] ) self.log.debug ( "pixels list downgraded to {} pixels.".format ( len ( pixels ) ) ) max_distance = 0 max_pair = None for pixel in pixels: for other in pixels: distance = tuna.tools.calculate_distance ( pixel, other ) if distance >= max_distance: max_distance = distance max_pair = ( pixel, other ) return max_pair
[docs] def find_upper_percentile ( self, gradient ): """ Tries to find the maximum percentile that contains at least 10 % of the pixels. """ full = gradient.shape [ 0 ] * gradient.shape [ 1 ] attempt = 99 ratio = 0 while ( ratio < 0.1 ): attempt_value = numpy.percentile ( gradient, attempt ) ratio = numpy.sum ( numpy.where ( gradient > attempt_value, 1, 0 ) ) / full attempt -= 1 if attempt < 3: break self.log.debug ( "find_upper_percentile: {}".format ( attempt ) ) return attempt
[docs] def plot_sympy_line ( self, array, sympy_line, color ): """ This method's goal is to add a line to an array. It expects to receive an array where pixels that have points have non-null value. From a line specified as a Sympy object, it will color pixels in the array using the value for color. Parameters: * array : numpy.ndarray Containing the data where the line will be drawn in-place. * sympy_line : sympy.Line An object describing the geometry of the line to be drawn. * color : integer The value to attribute to each pixel in the input array that is crossed by the geometric input line. """ for col in range ( array.shape [ 0 ] ): sympy_point_0 = sympy.Point ( col, 0 ) sympy_point_1 = sympy.Point ( col, array.shape [ 1 ] - 1 ) concurrent = sympy.Line ( sympy_point_0, sympy_point_1 ) if sympy.Line.is_parallel ( concurrent, sympy_line ): if sympy_line.contains ( sympy_point_0 ): self.log.warning ( "Line to plot parallel ({}) to col axis.".format ( sympy_point_0.x ) ) for row in range ( array.shape [ 1 ] ): array [ sympy_point_0.x ] [ row ] = color return continue intersections_list = sympy_line.intersection ( concurrent ) intersection = intersections_list [ 0 ] if ( round ( intersection.y ) >= 0 and round ( intersection.y ) < array.shape [ 1 ] ): array [ round ( intersection.x ) ] [ round ( intersection.y ) ] = color
[docs] def separate_rings ( self ): """ This method's goal is to create distinct array objects for each disjoint pixel set in the current self.result [ 'ridge' ] array. """ array = self.result [ 'ridge' ] visited = numpy.zeros ( shape = array.shape ) connected_pixels_sets = [ ] for col in range ( array.shape [ 0 ] ): for row in range ( array.shape [ 1 ] ): if visited [ col ] [ row ] == 1: continue if array [ col ] [ row ] == 1: to_visit = [ ( col, row ) ] region = [ ] while ( len ( to_visit ) > 0 ): here = to_visit.pop ( ) visited [ here [ 0 ] ] [ here [ 1 ] ] = 1 region.append ( here ) neighbours = tuna.tools.get_pixel_neighbours ( here, array ) for neighbour in neighbours: if visited [ neighbour [ 0 ] ] [ neighbour [ 1 ] ] == 1: continue if array [ neighbour [ 0 ] ] [ neighbour [ 1 ] ] == 0: continue to_visit.append ( neighbour ) connected_pixels_sets.append ( region ) self.log.debug ( "len connected_pixels_sets = {}".format ( len ( connected_pixels_sets ) ) ) shape_len = array.shape [ 0 ] * array.shape [ 1 ] min_len = math.ceil ( array.shape [ 0 ] * array.shape [ 1 ] * 0.1 ) min_threshold = math.ceil ( array.shape [ 0 ] * array.shape [ 1 ] * 0.001 ) ring_pixel_sets = [ ] old_len = 0 while ( len ( ring_pixel_sets ) < len ( connected_pixels_sets ) ): min_len *= 0.9 self.log.debug ( "min_len for a pixel set = {:.0f}".format ( min_len ) ) if min_len < min_threshold: self.log.debug ( "min_len below min_threshold" ) break ring_pixel_sets = [ ] for pixels_set in connected_pixels_sets: if len ( pixels_set ) < min_len: continue set_array = numpy.zeros ( shape = array.shape ) for pixel in pixels_set: set_array [ pixel [ 0 ] ] [ pixel [ 1 ] ] = 1 ring_pixel_sets.append ( set_array ) if len ( ring_pixel_sets ) == len ( connected_pixels_sets ): break ring_pixel_sets = self.remove_lumped_pixel_sets ( ring_pixel_sets ) self.result [ 'ring_pixel_sets' ] = ring_pixel_sets self.log.debug ( "{} rings found.".format ( len ( self.result [ 'ring_pixel_sets' ] ) ) ) if self.plot_log: count = 0 for pixel_set in self.result [ 'ring_pixel_sets' ]: tuna.tools.plot ( pixel_set [ 0 ], "pixel_set {}".format ( count ), self.ipython ) count += 1
[docs] def remove_lumped_pixel_sets ( self, pixel_sets ): """ For some cubes, in some planes, the central region has many pixels that do not belong to a ring, but are identified as part of the ridge. This is possibly due to a "nascent" ring in that plane. This function identifies such regions and removes them from the returned set. Parameters: * pixel_sets : list of numpy.ndarrays Containing disjoint pixel sets. """ result = [ ] for pixel_set in pixel_sets: # test: each column and row should have either none, one * thickness or two * thickness pixels in the set. num_good_cols = 0 num_cols = 0 latest_thickness = 0 average_thickness = 0 for col in range ( pixel_set.shape [ 0 ] ): per_col_pixels = ( numpy.sum ( pixel_set [ col ] ) ) if per_col_pixels == 0: continue num_cols += 1 if latest_thickness == 0: num_good_cols += 1 latest_thickness = per_col_pixels continue ratio = round ( latest_thickness / per_col_pixels ) latest_thickness = per_col_pixels average_thickness += latest_thickness if ratio == 0.5 or ratio == 1 or ratio == 2: num_good_cols +=1 continue good_cols_ratio = num_good_cols / num_cols average_thickness /= num_good_cols * 4 self.log.debug ( "good_cols_ratio = {:.2f}".format ( good_cols_ratio ) ) self.log.debug ( "average_thickness = {:.2f}".format ( average_thickness ) ) if good_cols_ratio > 0.9: self.log.debug ( "pixel set probably contains a circle or arc." ) result.append ( ( pixel_set, average_thickness ) ) continue return result
[docs]def circle ( center, radius, thickness, shape ): """ This function's goal is to generate an array with the value 1 for every pixel that is "thickness" distant to a circle with the input radius and center. Parameters: * center : tuple of two floats Containing the coordinates for the circle center. * radius : float * thickness : float Each pixel's distance to the center is calculated; if this distance minus the radius is less than or equal to the thickness, then the pixel receives the value 1. * shape : tuple of integers The dimensions for the resulting array. """ log = logging.getLogger ( __name__ ) log.debug ( "center = ( {:.5f}, {:.5f} ), radius = {:.5f}".format ( center [ 0 ], center [ 1 ], radius, shape ) ) distances = numpy.zeros ( shape = shape ) for col in range ( shape [ 0 ] ): for row in range ( shape [ 1 ] ): center_distance = tuna.tools.calculate_distance ( center, ( col, row ) ) if abs ( center_distance - radius ) <= thickness: distances [ col ] [ row ] = 1 return distances
[docs]def least_circle ( p, args ): """ This function's goal is to wrap around a call to the function circle, so that it is called according to mpyfit's API. Parameters: * p : tuple Contains parameters used to call the function. * args : tuple Contains parameters used to call the function. Returns: * residue.flatten ( ) : numpy.ndarray With the fitted circle. """ log = logging.getLogger ( __name__ ) center_col, center_row, radius, thickness = p shape, data, function = args try: calculated_circle = function ( ( center_col, center_row ), radius, thickness, shape ) except Exception as e: log.error ( tuna.console.output_exception ( e ) ) try: residue = calculated_circle - data except Exception as e: log.error ( tuna.console.output_exception ( e ) ) log.debug ( "residue = %f" % numpy.sum ( numpy.abs ( residue ) ) ) return ( residue.flatten ( ) )
[docs]def fit_circle ( center_col, center_row, radius, data, function, ridge_thickness = 1 ): """ This function's goal is to fit a circle to the input data. Parameters: * center_col : float The column coordinate of the center. * center_row : float The row coordinate of the center. * radius : float * data : numpy.ndarray * function : object A reference to the function to be used for fitting. * ridge_thickness : float : 1 The thickness of the ring to be fitted to the data. """ log = logging.getLogger ( __name__ ) parameters = ( float ( center_col ), float ( center_row ), float ( radius ), float ( ridge_thickness ) ) # Constraints on parameters parinfo = [ ] parbase = { 'fixed' : False, 'step' : 2 } parinfo.append ( parbase ) parbase = { 'fixed' : False, 'step' : 2 } parinfo.append ( parbase ) parbase = { 'fixed' : False, 'step' : 2 } parinfo.append ( parbase ) # ridge_thickness parbase = { 'fixed' : False, 'step' : 0.1 } parinfo.append ( parbase ) for entry in parinfo: log.debug ( "parinfo = %s" % str ( entry ) ) try: log.debug ( "fit_circle input parameters = {}".format ( parameters ) ) fit_parameters, fit_result = mpyfit.fit ( least_circle, parameters, args = ( data.shape, data, function ), parinfo = parinfo, xtol = 1e-1 ) except Exception as e: log.error ( tuna.console.output_exception ( e ) ) raise ( e ) log.debug ( "fit_circle result = {}".format ( fit_result ) ) return fit_parameters, function ( ( fit_parameters [ 0 ], fit_parameters [ 1 ] ), fit_parameters [ 2 ], ridge_thickness, data.shape )
[docs]def find_rings ( data : numpy.ndarray, min_rings : int = 1, plane : int = None, ipython : object = None, plot_log : bool = False ) -> dict: """ Attempts to find rings contained in a 3D numpy ndarray input. Parameters: * data : numpy.ndarray A tridimensional spectrogram. This parameter can also be a Tuna can. * min_rings : integer This is the minimal number of rings expected to be found. * plane : integer : None This is the index in the cube for the spectrograph whose rings the user wants. If no plane is specified, all planes will be search (from 0 onwards) until at least two rings are found in a plane. * ipython : object : None Contains a reference to the ipython object, in case it exists. * plot_log : boolean : False Flags whether to output plots of the intermediary products. Even when not plotting, all numpy arrays are accessible in the result structure. Returns: * dict With the following keys: * 'array' : 2D numpy.ndarray where pixels in the ring have value 1 and other pixels have value 0. * 'center' : a tuple of 2 floats, where the first is the column and the second is the row of the most probable locations for the center of that ring. * 'radius' : a float with the average value of the distance of the ring pixels to its center. * 'construction' : a list of numpy arrays containing the geometric construction that led to the estimated center and radius used in the fit. * 'pixel_set' : a list of numpy arrays containing the segmented pixel sets corresponding to each identified ring. """ log = logging.getLogger ( __name__ ) if isinstance ( data, tuna.io.can ): effective_array = data.array log.debug ( "Using can's array as input." ) else: effective_array = data if plane != None: finder = rings_finder ( effective_array, plane, ipython, plot_log ) finder.execute ( ) if 'concentric_rings' in list ( finder.result.keys ( ) ): log.info ( "Fitted rings to plane {}.".format ( plane ) ) return finder.result log.info ( "Could not find rings in plane {}, searching the whole cube.".format ( plane ) ) list_of_planes = list ( range ( plane + 1, effective_array.shape [ 0 ] ) ) list_of_planes += list ( range ( plane ) ) else: # no plane was specified list_of_planes = range ( effective_array.shape [ 0 ] ) best_so_far = None for effective_plane in list_of_planes: log.info ( "Searching for concentric rings in plane {}.".format ( effective_plane ) ) finder = rings_finder ( effective_array, effective_plane, ipython, plot_log ) finder.execute ( ) if 'concentric_rings' not in list ( finder.result.keys ( ) ): log.warning ( "No concentric rings on plane {}.".format ( effective_plane ) ) continue log.debug ( "concentric_rings [ 1 ] = {}".format ( finder.result [ 'concentric_rings' ] [ 1 ] ) ) if len ( finder.result [ 'concentric_rings' ] [ 1 ] ) >= min_rings: # check if pixel_sets are large percentage_pixels_in_ring = 0 for pixel_set in finder.result [ "ring_pixel_sets" ]: percentage_pixels_in_ring += numpy.sum ( pixel_set [ 0 ] ) percentage_pixels_in_ring /= ( data.shape [ 0 ] * data.shape [ 1 ] ) percentage_pixels_in_ring *= 100 log.info ( "Ring structure obtained from plane where borders occupy {}% of the array.".format ( int ( percentage_pixels_in_ring ) ) ) if percentage_pixels_in_ring > 10: return finder.result if best_so_far == None: best_so_far = finder.result log.info ( "Ring structure in plane {} is the best so far ({} rings).".format ( effective_plane, len ( finder.result [ "concentric_rings" ] [ 1 ] ) ) ) else: if len ( finder.result [ 'concentric_rings' ] [ 1 ] ) > len ( best_so_far [ 'concentric_rings' ] [ 1 ] ): best_so_far = finder.result log.info ( "Ring structure in plane {} is the best so far ({} rings).".format ( effective_plane, len ( finder.result [ "concentric_rings" ] [ 1 ] ) ) ) else: if len ( finder.result [ "concentric_rings" ] [ 1 ] ) > 1 and len ( best_so_far [ "concentric_rings" ] [ 1 ] ) > 1: best_distance = tuna.tools.calculate_distance ( best_so_far [ "concentric_rings" ] [ 0 ], best_so_far [ "concentric_rings" ] [ 1 ] ) this_distance = tuna.tools.calculate_distance ( finder.result [ "concentric_rings" ] [ 0 ], finder.result [ "concentric_rings" ] [ 1 ] ) log.info ( "Distance between first 2 centers: {}.".format ( this_distance ) ) if this_distance < best_distance: best_so_far = finder.result log.info ( "Ring structure in plane {} is the best so far ({} rings).".format ( effective_plane, len ( finder.result [ "concentric_rings" ] [ 1 ] ) ) ) log.warning ( "Could not find a plane with two rings on the cube!" ) if best_so_far != None: log.warning ( "Returning single ring result." ) return best_so_far