Source code for tuna.tools.phase_map.find_image_center_by_arc_segmentation

"""
This module's scope are the procedures necessary to find the center in a spectrograph using the arc segmentation method.
"""

import logging
from math import acos, sqrt
import numpy
import random
import sympy
import threading
import time
import tuna

[docs]class arc_segmentation_center_finder ( threading.Thread ): """ This class' responsibility is to find the center of an image by finding the intersection of rays computed from arc segments. If a cube is received, it will use the first plane as its input. It inherits from the :ref:`threading_label`.Thread class, and it auto-starts its thread execution. Clients are expected to use its .join ( ) method before using its results. Its constructor expects the following parameters: * noise : can Containing the noise related to the data at hand. * wrapped : can Containing a wrapped phase map, such as one generated by one of the barycenter's tools in Tuna. Example: >>> import tuna >>> barycenter = tuna.io.read ( "tuna/test/unit/unit_io/G094_03_wrapped_phase_map.fits" ) >>> noise = tuna.io.read ( "tuna/test/unit/unit_io/G094_04_noise.fits" ) >>> center = tuna.tools.phase_map.arc_segmentation_center_finder ( noise = noise, wrapped = barycenter ); center.join ( ) >>> center.get_center ( ) (218, 254) """ def __init__ ( self, wrapped, noise ): super ( self.__class__, self ).__init__ ( ) self.__version__ = "0.1.0" self.changelog = { "0.1.0" : "Tuna 0.14.0 : improved documentation, silenced logs." } self.log = logging.getLogger ( __name__ ) self.log.setLevel ( logging.INFO ) self.noise = noise self.wrapped = wrapped self.center = None self.__center_row = None self.__center_col = None random.seed ( ) self.start ( )
[docs] def run ( self ): """ This method's goal is to execute the main algorithm once its thread .start ( ) method is called. """ start = time.time ( ) self.log.debug ( "Trying to find_image_center_by_arc_segmentation()." ) if not isinstance ( self.wrapped, tuna.io.can ): self.log.error ( "Unexpected value for parameter." ) return None self.center = self.get_center ( ) self.log.debug ( "Center detected at %s." % str ( self.center ) ) self.log.debug ( "find_image_center_by_arc_segmentation() took %ds." % ( time.time ( ) - start ) )
[docs] def detect_ring_borders ( self ): """ This method's goal is to detect borders, and select only the first connected set of pixels in this border. """ start = time.time ( ) ring_borders_map = numpy.zeros ( shape = self.wrapped.shape, dtype = numpy.int16 ) max_rows = self.wrapped.shape[0] max_cols = self.wrapped.shape[1] max_channel = numpy.amax ( self.wrapped.array ) channel_threshold = max_channel - 1 for row in range ( max_rows ): for col in range ( max_cols ): neighbours = tuna.tools.get_pixel_neighbours ( ( row, col ), ring_borders_map ) distances = [ ] for neighbour in neighbours: distances.append ( int ( abs ( self.wrapped.array [ row ] [ col ] - self.wrapped.array [ neighbour [ 0 ] ] [ neighbour [ 1 ] ] ) ) ) max_distance = max ( distances ) if ( max_distance > channel_threshold and int ( self.wrapped.array [ row ] [ col ] ) == 0 and self.noise.array [ row ] [ col ] == 0 ): ring_borders_map [ row ] [ col ] = 1 if ( numpy.sum ( ring_borders_map ) == 0 ): self.log.warning ( "No borders detected." ) self.__ring_borders = numpy.ones ( shape = ring_borders_map.shape ) return # color continuous ring borders with different colors next_color = 2 for row in range ( max_rows ): self.log.debug ( "i_row = %d" % row ) for col in range ( max_cols ): if ring_borders_map [ row ] [ col ] == 1: borderhood = [ ( row, col ) ] while ( borderhood != [ ] ): self.log.debug ( "l_borderhood = %s" % str ( borderhood ) ) tocolor_pixel = borderhood.pop ( ) neighbours = tuna.tools.get_pixel_neighbours ( ( tocolor_pixel [ 0 ], tocolor_pixel [ 1 ] ), ring_borders_map ) for neighbour in neighbours: if neighbour not in borderhood: if ring_borders_map [ neighbour [ 0 ] ] [ neighbour [ 1 ] ] == 1: borderhood.append ( neighbour ) ring_borders_map [ tocolor_pixel [ 0 ] ] [ tocolor_pixel [ 1 ] ] = next_color next_color += 1 # how many pixels are colored in each color? color_counts = { } for row in range ( max_rows ): for col in range ( max_cols ): color = ring_borders_map [ row ] [ col ] if color > 0: if color in color_counts.keys ( ): color_counts [ color ] += 1 else: color_counts [ color ] = 1 # filter out everything but the largest arc if ( color_counts == { } ): self.log.debug ( "color_counts == { }" ) else: max_arc_count = max ( color_counts.values ( ) ) for color, count in color_counts.items ( ): if count == max_arc_count: max_arc_color = color break #self.log ( "i_max_arc_color, max_arc_count = %d, %d" % ( max_arc_color, max_arc_count ) ) for row in range ( max_rows ): for col in range ( max_cols ): if ring_borders_map [ row ] [ col ] != max_arc_color: ring_borders_map [ row ] [ col ] = 0 else: ring_borders_map [ row ] [ col ] = 1 # if the largest arc is too small, abort if max_arc_count < 20: self.log.error ( "max_arc_count = %d, which is too small. Assuming no border." % max_arc_count ) self.__ring_borders = numpy.ones ( shape = ring_borders_map.shape ) return self.__ring_borders = numpy.copy ( ring_borders_map ) self.log.debug ( "detect_ring_borders() took %ds." % ( time.time ( ) - start ) )
[docs] def find_center ( self ): """ This method's goal is to to find the center by segmenting arcs and searching for the intersection of rays perpendicular to these segments. """ self.detect_ring_borders ( ) convergence_tries = 1 bisections = [ ] centers = [ ] while ( True ): if ( convergence_tries > 50 ): self.log.warning ( "Reached threshold for center convergence using chord perpendiculars." ) break if convergence_tries % 10 == 0: self.log.debug ( "convergence_tries = %d" % convergence_tries ) chord_bisector_0 = self.get_random_chord_bisector ( ) chord_bisector_1 = self.get_random_chord_bisector ( ) while ( sympy.Line.is_parallel ( chord_bisector_0, chord_bisector_1 ) ): chord_bisector_0 = self.get_random_chord_bisector ( ) chord_bisector_1 = self.get_random_chord_bisector ( ) self.log.debug ( "o_chord_bisector_0 = %s, slope = %s" % ( str ( chord_bisector_0.equation ( ) ), str ( float ( chord_bisector_0.slope.evalf ( ) ) ) ) ) self.log.debug ( "o_chord_bisector_1 = %s, slope = %s" % ( str ( chord_bisector_1.equation ( ) ), str ( float ( chord_bisector_1.slope.evalf ( ) ) ) ) ) bisections.append ( ( chord_bisector_0, chord_bisector_1, convergence_tries * 10 ) ) # sometimes intesection fails because one of the bisectors ain't a GeometryEntity. try: intersection = sympy.geometry.intersection ( chord_bisector_0, chord_bisector_1 ) except ValueError as e: self.log.warning ( "ValueError %s. Ignoring this intersection." % ( e ) ) convergence_tries += 1 continue if ( len ( intersection ) != 0 ): point_center = intersection [ 0 ] intersect_row = int ( point_center.x.evalf ( ) ) intersect_col = int ( point_center.y.evalf ( ) ) self.log.debug ( "o_point_center = %s" % str ( ( intersect_row, intersect_col ) ) ) self.log.debug ( "i_convergence_tries * 10 = %s" % str ( convergence_tries * 10 ) ) centers.append ( ( int ( intersect_row ), int ( intersect_col ) ) ) sum_row = 0 sum_col = 0 for center in range ( len ( centers ) ): sum_row += centers [ center ] [ 0 ] sum_col += centers [ center ] [ 1 ] center = ( int ( sum_row / len ( centers ) ), int ( sum_col / len ( centers ) ) ) self.log.debug ( "center possibly at %s" % str ( center ) ) convergence_tries += 1 if ( convergence_tries > 10 ): if ( center == ( intersect_row, intersect_col ) ): self.log.debug ( "t_center converged at %s" % str ( center ) ) break self.__center_row = center [ 0 ] self.__center_col = center [ 1 ]
[docs] def get_center ( self ): """ This method's goal is to access the calculated center coordinates. Will trigger a find if the center is yet unknown. Returns: * unnamed variable : tuple of 2 integers Containing the column and row of the found center. """ if ( ( self.__center_row is not None ) and ( self.__center_col is not None ) ): return ( self.__center_row, self.__center_col ) else: self.find_center ( ) return ( self.__center_row, self.__center_col )
[docs] def get_most_distant_points ( self, tl_points = [ ] ): """ This method's goal is to find the pair of points that are most distant in a list of points, by calculating the distance between each possible pair. Parameters: * tl_points : list Containing tuples of 2 integers each. Returns: * unnamed variable : list Containing the first two points that are separated by the largest distance. """ max_distance = 0 max_origin = 0 max_destiny = 0 for origin in range ( len ( tl_points ) ): for destiny in range ( origin + 1, len ( tl_points ) ): distance = sqrt ( ( tl_points [ origin ] [ 0 ] - tl_points [ destiny ] [ 0 ] ) ** 2 + ( tl_points [ origin ] [ 1 ] - tl_points [ destiny ] [ 1 ] ) ** 2 ) if ( distance >= max_distance ): max_distance = distance max_origin = origin max_destiny = destiny return [ tl_points [ max_origin ], tl_points [ max_destiny ] ]
[docs] def get_random_chord_bisector ( self ): """ This method's goal is to find a perpendicular bisector for line segments determined from two random points from the current border. Returns: * unnamed variable : sympy.Line Will be None if no line bisects the chord. """ random_points = set ( [ ] ) while ( len ( random_points ) < 3 ): self.log.debug ( "len ( random_points ) = %d" % len ( random_points ) ) self.log.debug ( "random_points = %s" % str ( random_points ) ) random_points . update ( [ self.get_random_point_in_border ( ) ] ) max_distance_points = self.get_most_distant_points ( list ( random_points ) ) point_0 = sympy.Point ( max_distance_points [ 0 ] [ 0 ], max_distance_points [ 0 ] [ 1 ] ) point_1 = sympy.Point ( max_distance_points [ 1 ] [ 0 ], max_distance_points [ 1 ] [ 1 ] ) self.log.debug ( "debug: point_0, point_1 = %s, %s" % ( str ( point_0 ), str ( point_1 ) ) ) chord_segment = sympy.Segment ( point_0, point_1 ) if ( type ( chord_segment ) is sympy.Point ): self.log.error ( "Segmentation created a point." ) return None return chord_segment.perpendicular_bisector ( )
[docs] def get_random_point_in_border ( self ): """ This method's goal is to access a random point from the current border. Returns: * unnamed variable : tuple of 2 integers Corresponding to the column and row of a random position in the border. """ random_row = random.randint ( 0, self.wrapped.shape [ 0 ] - 1 ) random_col = random.randint ( 0, self.wrapped.shape [ 1 ] - 1 ) random_color = self.__ring_borders [ random_row ] [ random_col ] while ( random_color == 0 ): random_row = random.randint ( 0, self.wrapped.shape [ 0 ] - 1 ) random_col = random.randint ( 0, self.wrapped.shape [ 1 ] - 1 ) random_color = self.__ring_borders [ random_row ] [ random_col ] return ( random_row, random_col )