"""
This module's scope covers operations related to the FITS file format.
As much as possible, operations are deferred to astropy.io.fits.
"""
__version__ = "0.1.0"
__changelog__ = {
"0.1.0" : { "Tuna" : "0.16.0", "Change" : "Added debug messages during metadata parsing. Fixed metdata not reading entries from multiple HDU lists." }
}
import astropy.io.fits as astrofits
import copy
import logging
import math
import numpy
import sys
from tuna.io.file_reader import file_reader
import tuna
import warnings
[docs]class fits ( file_reader ):
"""
This class' responsibility is to operate on FITS files.
Its constructors signature is:
Parameters:
* array : numpy.ndarray : defaults to None
Contains the data to be written to a file.
* file_name : string : defaults to None
Contains the full location of a file to be read, or to be written to.
* metadata : dictionary : defaults to { }
Contains metadata to be written as a FITS header, or the metadata read from a FITS header.
* photons : dictionary : defaults to None
Contains the table of photon counts and positions. It is either supplied to be saved to a file, or generated from the data on a file.
Example::
import tuna
import numpy
zeros_array = numpy.zeros ( shape = ( 3, 3, 3 ) )
zeros_file = tuna.io.fits ( array = zeros_array )
zeros_file.write ( file_name = "test_file.fits" )
fits_file = tuna.io.fits ( file_name = "test_file.fits" )
fits_file.read ( )
fits_file.get_array ( )
fits_file.get_metadata ( )
"""
def __init__ ( self,
array = None,
file_name = None,
metadata = { },
photons = None ):
super ( fits, self ).__init__ ( )
self.log = logging.getLogger ( __name__ )
self.__file_name = file_name
self.__array = array
self.__metadata = metadata
self.__photons = photons
[docs] def get_array ( self ):
"""
This method's goal is to access the current array in this object.
Returns:
* self.__array : numpy.ndarray
Contains the current data stored in this object's array.
"""
return self.__array
[docs] def read ( self ):
"""
This method's goal is to read the file specified in the constructor's file_name as a FITS file.
It will inspect the FITS header and try to do the "best thing" according to how many image HDU lists it finds; if there is only one list, that will be the data. If there are multiple lists, and these lists can be arranged as a mosaic (i.e., there are a "quadratic" number of images - 4, 9, 25, etc) they will. This is done in a counter-clockwise order, if we assume the 0th row and column is the top left of the image.
"""
self.log.debug ( tuna.log.function_header ( ) )
found = False
if self.__file_name == None:
self.log.debug ( "No file_name for FITS read." )
self._is_readable = False
self.log.debug ( "Trying to read file %s as FITS file." % self.__file_name )
try:
with warnings.catch_warnings ( ):
warnings.simplefilter ( "error" )
hdu_list = astrofits.open ( self.__file_name )
self.log.debug ( "File %s opened as a FITS file." % self.__file_name )
# The file might have many HDU lists. If there are no arrays, then the file is invalid.
hdu_info = hdu_list.info ( output = False )
self.log.debug ( "hdu_info = {}".format ( hdu_info ) )
if len ( hdu_info ) == 1:
self.__array = hdu_list [ 0 ].data
found = True
invalid_flag = True
for entry in hdu_info:
if entry [ 4 ] != ( ):
invalid_flag = False
break
if not found and invalid_flag:
self.log.error ( "File is invalid: no dimensions found in its HDU list." )
return
# If the file has a single array, that is its data.
possible_arrays = [ ]
for entry_index in range ( len ( hdu_info ) ):
if hdu_info [ entry_index ] [ 4 ] != ( ):
possible_arrays.append ( entry_index )
if len ( possible_arrays ) == 1:
self.__array = hdu_list [ possible_arrays [ 0 ] ].data
# If the file has several arrays, then possibly a mosaic is its data.
if not found and len ( possible_arrays ) % 4 == 0:
mosaic = True
common_ndims = hdu_info [ possible_arrays [ 0 ] ] [ 4 ]
for entry_index in possible_arrays:
if hdu_info [ entry_index ] [ 4 ] != common_ndims:
self.log.info ( "Cannot build a mosaic since HDU lists have different sizes." )
mosaic = False
if mosaic:
self.log.debug ( "Creating a mosaic from the multiple HDU lists of the file." )
multiplier = math.sqrt ( len ( possible_arrays ) )
cols = common_ndims [ 1 ] * multiplier
rows = common_ndims [ 0 ] * multiplier
self.__array = numpy.zeros ( shape = ( cols, rows ) )
cursor_col = 0
cursor_row = 0
for entry in possible_arrays:
self.log.debug ( "Adding HDU list {} to position ( {}, {} ).".format (
entry, cursor_col, cursor_row ) )
self.log.debug ( "Splicing array {} into {}, {}".format (
hdu_list [ entry ].data.shape, cursor_col, cursor_row ) )
self.__array [ cursor_col : cursor_col + common_ndims [ 1 ],
cursor_row : cursor_row + common_ndims [ 0 ] ] = hdu_list [ entry ].data
cursor_row += common_ndims [ 0 ]
if cursor_row >= common_ndims [ 0 ] * multiplier:
cursor_row = 0
cursor_col += common_ndims [ 1 ]
if type ( self.__array ) == None:
if len ( possible_arrays ) > 1:
self.log.error ( "File has several distinct entries for data, and Tuna doesn't know how to parse it." )
self.log.error ( "Data section of the file is None!" )
else:
self.log.debug ( "Assigned data section of first HDU as the image ndarray." )
self.log.debug ( "self.__array.ndim == %d" % self.__array.ndim )
metadata = { }
for entry in possible_arrays:
self.log.debug ( "Processing header for hdu_list [ {} ].".format ( entry ) )
for key in hdu_list [ entry ].header.keys ( ):
metadata_value = hdu_list [ entry ].header [ key ]
metadata_comment = hdu_list [ entry ].header.comments [ key ]
if key in list ( self.__metadata.keys ( ) ):
if ( metadata_value, metadata_comment ) == self.__metadata [ key ]:
self.log.debug ( "Ignoring duplicated metadata entry with key {}.".format ( key ) )
continue
else:
self.log.warning ( "Replacing metadata {} : ( {}, {} ) with new entry ( {}, {} ).".format ( key, self.__metadata [ key ] [ 0 ], self.__metadata [ key ] [ 1 ], metadata_value, metadata_comment ) )
self.__metadata [ key ] = ( metadata_value, metadata_comment )
self.log.debug ( "{}: value = {}, comment = {}.".format ( key, metadata_value, metadata_comment ) )
self._is_readable = True
except OSError as e:
self.log.error ( "OSError: %s." % e )
self._is_readable = False
[docs] def write ( self, file_name = None ):
"""
This method's goal is to write the object's current array and metadata as a FITS file named file_name.
Parameters:
* file_name: string
Contains a valid path for an yet non-existing file.
"""
self.log.debug ( tuna.log.function_header ( ) )
if self.__file_name:
header = astrofits.Header ( )
if self.__metadata:
columns = { }
key_list = [ ]
for key in self.__metadata.keys ( ):
comment = self.__metadata [ key ] [ 1 ]
value = ""
for metadata_value in self.__metadata [ key ] [ 0 ]:
if ( value == "" ):
value += str ( metadata_value )
else:
value += ", " + str ( metadata_value )
if len ( key ) > 8:
fits_key = key [ : 8 ]
comment += "original key = " + key
else:
fits_key = key
fits_key = fits_key.replace ( ' ', '_' )
self.log.debug ( "fits_key = %s" % fits_key )
self.log.debug ( "len ( value ) = %d" % len ( value ) )
self.log.debug ( "len ( comment ) = %d" % len ( comment ) )
card = astrofits.Card ( fits_key, value, comment )
self.log.debug ( "str ( card ) = %s" % str ( card ) )
header.append ( card )
hdu = astrofits.PrimaryHDU ( self.__array, header )
hdu_list = astrofits.HDUList ( [ hdu ] )
try:
hdu_list.writeto ( self.__file_name )
except OSError as e:
if e == "File '" + self.__file_name + "' already exists.":
self.log.error ( "File %s already exists." % self.__file_name )
sys.exit ( 1 )
[docs] def write_photons_table ( self ):
"""
This method's goal is to write the object's photons dictionary as a FITS table file.
It will write the file at the path "photons\_" + self.__file_name.
"""
self.log.debug ( tuna.log.function_header ( ) )
if self.__photons == None:
return
columns = { }
columns [ 'channel' ] = [ [ ], "I2" ]
columns [ 'x' ] = [ [ ], "I3" ]
columns [ 'y' ] = [ [ ], "I3" ]
columns [ 'photons' ] = [ [ ], "I5" ]
for entry in self.__photons:
columns [ 'channel' ] [ 0 ] .append ( self.__photons [ entry ] [ 'channel' ] )
columns [ 'x' ] [ 0 ] .append ( self.__photons [ entry ] [ 'x' ] )
columns [ 'y' ] [ 0 ] .append ( self.__photons [ entry ] [ 'y' ] )
columns [ 'photons' ] [ 0 ] .append ( self.__photons [ entry ] [ 'photons' ] )
fits_columns = [ ]
for key in columns.keys ( ):
fits_columns . append ( astrofits . Column ( name = key,
array = columns [ key ] [ 0 ],
format = columns [ key ] [ 1 ] ) )
fits_columns_definition = astrofits . ColDefs ( fits_columns )
# The new_table method will be deprecated, when it is, use the commented line below.
fits_table_hdu = astrofits . new_table ( fits_columns_definition )
#fits_table_hdu = astrofits . BinTableHDU . from_columns ( fits_columns_definition )
primary_hdu = astrofits.PrimaryHDU ( )
hdu_list = astrofits.HDUList ( [ primary_hdu, fits_table_hdu ] )
hdu_list.writeto ( "photons_" + self.__file_name )