Source code for tuna.io.can

"""
This module's scope covers operations related to the can file format.

The Tuna can is a image and metadata file format. It consists of a serializable object (instantiated from the tuna.io.can.can class), where convenience methods (such as algebraic procedures on its arrays) are defined.
"""

from .adhoc import adhoc
from .adhoc_ada import ada
from .file_reader import file_reader
from .fits import fits

import logging
import numpy
import sys
import time
import tuna

[docs]class can ( file_reader ): """ This class' responsibilities are to create and operate upon Tuna can files. It inherits from :ref:`tuna_io_file_reader_label`. Its constructor signature is: Parameters: * array : numpy.ndarray : defaults to None The data to be stored in the can. * file_name : string : defaults to None The name of the file containing the data. * interference_order : integer : defaults to None The value of the interference order of the observed light on the data. * interference_order_wavelength : integer : defaults to None The wavelength, in Angstroms, of the observed light on the data. * photons : dictionary : defaults to None A dictionary containing the description of each photon count on the data. The Tuna can is the preferred internal format for Tuna. Therefore, when most modules are used, they return their result in a can. Example usage:: import tuna raw = tuna.io.read ( file_name = "tuna/tuna/test/unit/unit_io/adhoc.ad3" ) type ( raw ) Out[3]: tuna.io.can.can raw2 = raw + raw raw_copy = raw2 - raw raw.flipud ( ) raw.fliplr ( ) raw.convert_ndarray_into_table ( ) raw.update ( ) """ def __init__ ( self, array = None, file_name = None, interference_order = None, interference_reference_wavelength = None, photons = None ): super ( can, self ).__init__ ( ) self.log = logging.getLogger ( __name__ ) self.log.setLevel ( logging.INFO ) self.__version__ = "0.1.5" self.changelog = { "0.1.5" : "Tuna 0.14.0 : improved docstrings.", "0.1.4" : "Updated docstring to new documentation style.", "0.1.3" : "Docstrings added.", "0.1.2" : "Do not update db if that is going to update a file_name to None.", "0.1.1" : "Feed info into db upon update, added file_type property.", "0.1.0" : "Initial changelogged version." } self.array = array self.file_name = file_name self.interference_order = interference_order self.interference_reference_wavelength = interference_reference_wavelength self.photons = photons self.digest = None self.file_type = None self.ndim = None self.shape = None self.planes = None self.rows = None self.cols = None self.metadata = None self.update ( ) def __add__ ( self, summand ): self.log.debug ( tuna.log.function_header ( ) ) sum_array = self.array + summand.array result = can ( array = sum_array ) return result def __sub__ ( self, subtrahend ): self.log.debug ( tuna.log.function_header ( ) ) subtraction_array = self.array - subtrahend.array result = can ( array = subtraction_array ) return result
[docs] def convert_ndarray_into_table ( self ): """ This method's goal is to convert a numpy.ndarray into a photon table, where the value contained in the array, for each voxel, is considered as a photon count. The result is saved in self.photons, which has the following structure (example):: self.photons = [ { 'channel' : 10, 'row' : 128, 'col' : 1, 'photons' : 1024 }, { 'channel' : 11, 'row' : 128, 'col' : 1, 'photons' : 700 }, ... { 'channel' : 30, 'row' : 128, 'col' : 128, 'photons' : 0 } ] """ self.log.debug ( tuna.log.function_header ( ) ) start = time.time ( ) photons = [ ] self.log.debug ( "Parsing image into photon table 0% done." ) last_percentage_logged = 0 for plane in range ( self.planes ): percentage = 10 * int ( plane / self.planes * 10 ) if percentage > last_percentage_logged: self.log.debug ( "Parsing image into photon table %d%% done." % ( percentage ) ) last_percentage_logged = percentage for row in range ( self.rows ): for col in range ( self.cols ): photon = { } photon [ 'channel' ] = plane + 1 photon [ 'row' ] = row photon [ 'col' ] = col if self.ndim == 3: photon [ 'photons' ] = self.array [ plane ] [ row ] [ col ] elif self.ndim == 2: photon [ 'photons' ] = self.array [ row ] [ col ] photons.append ( photon ) self.log.debug ( "info: parsing image into photon table 100% done." ) self.photons = photons self.log.debug ( "debug: convert_ndarray_into_table() took %ds." % ( time.time ( ) - start ) )
[docs] def convert_table_into_ndarray ( self ): """ This method's goal is to accumulate values from a table (required to be in the same structure as specified in the method convert_ndarray_into_table) into a numpy array. It will create an array with the minimal dimensions necessary to hold all photons; therefore if you have "photonless" regions in a data cube, and convert it to a table, then back into a cube, you might end with a numpy.ndarray with a different shape than you started. """ self.log.debug ( tuna.log.function_header ( ) ) planes = 0 rows = 0 cols = 0 for photon in self.photon: planes = max ( planes, photon [ 'channel' ] + 1 ) rows = max ( rows, photon [ 'row' ] ) cols = max ( cols, photon [ 'col' ] ) self.ndim = 3 self.planes = planes self.rows = rows self.cols = cols self.shape = ( planes, rows, cols ) array = numpy.zeros ( shape = self.shape ) for photon in self.photons: array [ photon [ 'channel ' ] - 1 ] [ photon [ 'row' ] ] [ photon [ 'col' ] ] = photon [ 'photons' ] self.array = array
def _database_refresh ( self ): """ Supposing both the can and the db connection are fine, check if there is an entry on db about this can's array, and create / update it as appropriate. """ if not self.digest: self.digest = tuna.tools.get_hash_from_array ( self.array ) records, sql_success = tuna.db.select_record ( 'datasets', { 'hash' : self.digest } ) if not sql_success: self.log.debug ( "At database_refresh sql_success == False." ) return record = None if records: record = records [ 0 ] function = tuna.db.insert_record file_name = self.file_name file_type = self.file_type if record: self.log.debug ( "Can is already on db." ) function = tuna.db.update_record if ( record [ 'file_name' ] != "None" and self.file_name == None ): file_name = record [ 'file_name' ] file_type = record [ 'file_type' ] function ( 'datasets', { 'hash' : self.digest, 'file_name' : file_name, 'file_type' : file_type } )
[docs] def fliplr ( self ): """ This method's goal is to wrap around numpy.fliplr, which flips a 2D array from left to right (the rightmost column becomes the leftmost one). This is applied to each plane of a cube, or the single plane of a planar image. """ result = numpy.ndarray ( shape = self.array.shape ) for plane in range ( self.array.shape [ 0 ] ): result [ plane ] = numpy.fliplr ( self.array [ plane ] ) self.array = result
[docs] def flipud ( self ): """ This method's goal is to wrap around numpy.flipud, which flips a 2D array from up to down (the last line becomes the first). This is applied to each plane of a cube, or the single plane of a planar image. """ result = numpy.ndarray ( shape = self.array.shape ) for plane in range ( self.array.shape [ 0 ] ): result [ plane ] = numpy.flipud ( self.array [ plane ] ) self.array = result
[docs] def info ( self ): """ This method's goal is to output to the current logging.info handler some metadata about the current can. """ self.log.debug ( tuna.log.function_header ( ) ) self.log.info ( "file_name = %s" % self.file_name ) self.log.info ( "shape = %s" % str ( self.shape ) ) self.log.info ( "ndim = %d" % self.ndim ) self.log.info ( "planes = %d" % self.planes ) self.log.info ( "rows = %d" % self.rows ) self.log.info ( "cols = %d" % self.cols ) self.log.info ( "interference_order = %s" % str ( self.interference_order ) ) self.log.info ( "interference_reference_wavelength = %s" % str ( self.interference_reference_wavelength ) )
[docs] def read ( self ): """ This method's goal is to read a file content's into a can. Will sequentially attempt to read the file as an .ADT, .fits, .AD2 and .AD3 formatted file. The first attempt to succeed is used. """ self.log.debug ( tuna.log.function_header ( ) ) self.log.debug ( "line %d, before attempting to read file, %s" % ( tuna.log.line_number ( ), tuna.io.system.status ( ) ) ) if self.file_name: if ( self.file_name.startswith ( ".ADT", -4 ) or self.file_name.startswith ( ".adt", -4 ) ): ada_object = ada ( file_name = self.file_name ) ada_object.read ( ) self.array = ada_object.get_array ( ) self.metadata = ada_object.get_metadata ( ) self.__d_photons = ada_object.get_photons ( ) self.file_type = "adt" self.update ( ) elif ( self.file_name.startswith ( ".fits", -5 ) or self.file_name.startswith ( ".FITS", -5 ) ): fits_object = fits ( file_name = self.file_name ) fits_object.read ( ) self.array = fits_object.get_array ( ) self.metadata = fits_object.get_metadata ( ) self.file_type = "fits" self.update ( ) elif ( self.file_name.startswith ( ".ad2", -4 ) or self.file_name.startswith ( ".AD2", -4 ) or self.file_name.startswith ( ".ad3", -4 ) or self.file_name.startswith ( ".AD3", -4 ) ): adhoc_object = adhoc ( file_name = self.file_name ) adhoc_object.read ( ) self.array = adhoc_object.get_array ( ) #self.metadata = adhoc_object.__trailer self.file_type = "ada" self.update ( ) self.log.debug ( "After attempting to read file, " + tuna.io.system.status ( ) )
[docs] def update ( self ): """ This method's goal is to clears current metadata, and regenerate this information based on the current contents of the can's array and photon table. """ self.log.debug ( tuna.log.function_header ( ) ) if ( ( not isinstance ( self.array, numpy.ndarray ) ) and self.photons == None ): self.log.debug ( "Empty Tuna can." ) self.metadata = None self.ndim = None self.shape = None self.planes = None self.rows = None self.cols = None return if ( not isinstance ( self.array, numpy.ndarray ) ): self.convert_table_into_ndarray ( ) return self.ndim = self.array.ndim self.shape = self.array.shape self.log.debug ( "can.update: self.array.ndim == %d, self.ndim == %d." % ( self.array.ndim, self.ndim ) ) if self.ndim == 3: self.planes = self.array.shape [ 0 ] self.rows = self.array.shape [ 1 ] self.cols = self.array.shape [ 2 ] elif self.ndim == 2: self.planes = 1 self.rows = self.array.shape [ 0 ] self.cols = self.array.shape [ 1 ] if ( self.ndim < 2 or self.ndim > 3 ): self.log.warning ( "ndarray has either less than 2 or more than 3 dimensions." ) self._database_refresh ( )