Source code for tuna.io.adhoc

# -*- coding: utf-8 -*-
"""
This module scope coves ADHOC files' operations. 

ADHOC files typically have the .AD2 (for two-dimensional data) and .AD3 (for tri-dimensional data).

It is based on a module provided by BenoƮt Epinat, and integrated into Tuna in 2015.
"""

import logging
import os
import numpy
import numpy as np
import pyfits as pf
from pyfits import Column
import sys
from time import sleep

from .file_reader import file_reader
import tuna

#verifier l'organisation des cubes fits si on les ouvre en python
#faire un programme readad qui voit l'extension pour savoir comment ouvir (soit ad2, ad3, ad1...)
#gestion des NaN a inclure (input/output)!!!

[docs]class adhoc ( file_reader ): """ This class' responsibilities include: reading files in one of the ADHOC file formats (AD2 or AD3). First implemented by Benoit Epinat from LAM. The ADHOC file formats were developed for use with the ADHOC software solution, developed at LAM by Jacques Boulesteix. It inherits from :ref:`tuna_io_file_reader_label`. Its constructor has the following signature: Parameters: * adhoc_type : int : defaults to None. Valid types are 2 and 3. * adhoc_trailer : numpy.ndarray : defaults to None. The trailer of an ADHOC file are the last 256 bytes of the file, and contain metadata. * file_name : string : defaults to None. Must correspond to an existing ADHOC file. * array : numpy.ndarray : defaults to None. Will be read from the file, and its size is the file size minus 256 bytes, and each field has 32 bytes and is encoded as a float. Example usage:: import tuna raw = tuna.io.adhoc ( file_name = "tuna/tuna/test/unit/unit_io/adhoc.ad3" ) raw.read ( ) raw.get_array ( ) raw.get_trailer ( ) """ def __init__ ( self, adhoc_type = None, adhoc_trailer = None, file_name = None, array = None ): super ( adhoc, self ).__init__ ( ) self.log = logging.getLogger ( __name__ ) self.log.setLevel ( logging.INFO ) self.__version__ = "0.2.0" self.changelog = { '0.2.0' : "Tuna 0.14.0 : improved docstrings.", '0.1.2' : "Documentation: module, class and function docstrings.", '0.1.1' : "Improved docstrings.", '0.1.0' : "Initial changelog." } self.__adhoc_type = adhoc_type self.__adhoc_trailer = adhoc_trailer self.__file_name = file_name self.__array = array self.__file_object = None def _discover_adhoc_type ( self ): """ This method's goal is to open the file named file_name, get the first 256 bytes as its trailer, and cast its remaining contents into a numpy array of numpy.float32 values. Since the trailer contains information regarding the dimensionality of the data, this information is also retrieved from the file. """ self.log.debug ( tuna.log.function_header ( ) ) if self.__file_name: try: self.__file_object = open ( self.__file_name, "rb" ) except OSError as e: self.log.error ( "OSError: %s" % str ( e ) ) raise self.__file_object.seek ( 0, os.SEEK_END ) if self.__file_object.tell ( ) < 256: self.log.error ( "File does not contain valid numpy array." ) self.__adhoc_type = None return self.__array_size = ( self.__file_object.tell ( ) - 256 ) / 4 self.__file_object.close ( ) try: adhoc_file_type = numpy.dtype ( [ ( 'image_data', numpy.float32, self.__array_size ), ( 'trailer', [ ( 'number_of_dimensions', numpy.int32, 1 ), ( 'parameters', numpy.int8, 252 ) ] ) ] ) except ValueError as e: self.log.error ( "ValueError: %s." % str ( e ) ) self.log.warning ( "Impossible to guess adhoc type from file." ) self.__adhoc_type = None return adhoc_file = numpy.fromfile ( self.__file_name, dtype = adhoc_file_type ) if adhoc_file['trailer']['number_of_dimensions'] not in ( [2], [3], [-3] ): self.log.warning ( "Unrecognized number of dimensions in file %s." % str ( self.__file_name ) ) return self.__adhoc_type = adhoc_file['trailer']['number_of_dimensions'][0]
[docs] def get_array ( self ): """ This method's goal is to return the current value of the data array. Returns: * self.__array : numpy.ndarray Containing the current data array. """ return self.__array
[docs] def read ( self ): """ This method's goal is to discover the ADHOC type (which corresponds to the data array dimensionality), and when possible call the appropriate method to read its contents. """ self.log.debug ( tuna.log.function_header ( ) ) if self.__file_name == None: self.log.error ( "No file name selected, aborting read operation." ) return if self.__adhoc_type == None: self._discover_adhoc_type ( ) if self.__adhoc_type == None: self._is_readable = False return else: self._is_readable = True if self.__file_object: self.__file_object.close ( ) self.__file_object = open ( self.__file_name, "rb" ) if self.__array_size == None: self.__array_size = ( self.__file_object.tell ( ) - 256 ) / 4 if self.__adhoc_type == 2: self._read_adhoc_2d ( ) if self.__adhoc_type == 3 or self.__adhoc_type == -3: self._read_adhoc_3d ( )
def _read_adhoc_2d ( self ): """ This method's goal is to read the contents of self.__file_object as a 2D ADHOC file. """ self.log.debug ( tuna.log.function_header ( ) ) adhoc_2d_file_type = numpy.dtype ( [ ( 'data', np.float32, self.__array_size ), ( 'trailer', [ ( 'nbdim', np.int32 ), ( 'id', np.int8, 8 ), ( 'lx', np.int32 ), ( 'ly', np.int32 ), ( 'lz', np.int32 ), ( 'scale', np.float32 ), ( 'ix0', np.int32 ), ( 'iy0', np.int32 ), ( 'zoom', np.float32 ), ( 'modevis', np.int32 ), ( 'thrshld', np.float32 ), ( 'step', np.float32 ), ( 'nbiso', np.int32 ), ( 'pal', np.int32 ), ( 'cdelt1', np.float64 ), ( 'cdelt2', np.float64 ), ( 'crval1', np.float64 ), ( 'crval2', np.float64 ), ( 'crpix1', np.float32 ), ( 'crpix2', np.float32 ), ( 'crota2', np.float32 ), ( 'equinox', np.float32 ), ( 'x_mirror', np.int8 ), ( 'y_mirror', np.int8 ), ( 'was_compressed', np.int8 ), ( 'none2', np.int8, 1 ), ( 'none', np.int32, 4 ), ( 'comment', np.int8, 128 ) ] ) ] ) numpy_data = numpy.fromfile ( self.__file_name, dtype = adhoc_2d_file_type ) if ( numpy_data['trailer']['lx'] >= 32768 ) | ( numpy_data['trailer']['ly'] >= 32768 ): self.log.debug ( 'critical: lx or ly seems to be invalid: (' + numpy.str ( numpy_data['trailer']['lx'][0] ) + ', ' + numpy.str ( numpy_data['trailer']['ly'][0] ) + ')' ) self.log.debug ( 'critical: If you want to allow arrays as large as this, modify the code!' ) return try: self.__array = numpy_data['data'][0].reshape ( numpy_data['trailer']['ly'], numpy_data['trailer']['lx'] ) except ValueError as e: self.log.debug ( "%s" % str ( e ) ) raise self.__array [ numpy.where ( numpy_data == -3.1E38 ) ] = numpy.nan self.__adhoc_trailer = numpy_data['trailer'] self.log.info ( "Successfully read adhoc 2d object from file %s." % str ( self.__file_name ) ) def _read_adhoc_3d ( self, xyz = True ): """ This method's goal is to read the contents of self.__file_object as a tri-dimensional ADHOC file. Parameters: * xyz : boolean : defaults to True. False to return data in standard zxy adhoc format, True to return data in xyz format (default). """ self.log.debug ( tuna.log.function_header ( ) ) data = self.__file_object data.seek ( 0, 2 ) sz = ( data.tell ( ) - 256 ) / 4 dt = np.dtype ( [ ( 'data', np.float32, sz ), ( 'trailer', [ ( 'nbdim', np.int32 ), ( 'id', np.int8, 8 ), ( 'lx', np.int32 ), ( 'ly', np.int32 ), ( 'lz', np.int32 ), ( 'scale', np.float32 ), ( 'ix0', np.int32 ), ( 'iy0', np.int32 ), ( 'zoom', np.float32 ), ( 'xl1', np.float32 ), ( 'xi1', np.float32 ), ( 'vr0', np.float32), ( 'corrv', np.float32 ), ( 'p0', np.float32 ), ( 'xlp', np.float32 ), ( 'xl0', np.float32 ), ( 'vr1', np.float32 ), ( 'xik', np.float32 ), ( 'cdelt1', np.float64 ), ( 'cdelt2', np.float64 ), ( 'crval1', np.float64 ), ( 'crval2', np.float64 ), ( 'crpix1', np.float32 ), ( 'crpix2', np.float32 ), ( 'crota2', np.float32 ), ( 'equinox', np.float32 ), ( 'x_mirror', np.int8 ), ( 'y_mirror', np.int8 ), ( 'was_compressed', np.int8 ), ( 'none2', np.int8, 1 ), ( 'comment', np.int8, 128 ) ] ) ] ) ad3 = np.fromfile ( self.__file_name, dtype = dt ) if ( ad3['trailer']['lx'][0] * ad3['trailer']['ly'][0] * ad3['trailer']['lz'][0] >= 250 * 1024 * 1024 ): self.log.debug ( 'critical: lx or ly or lz seems to be invalid: (' + np.str(ad3['trailer']['lx'][0]) + ', ' + np.str(ad3['trailer']['ly'][0]) + ', ' + np.str(ad3['trailer']['lz'][0]) + ')') self.log.debug ( 'critical: If you want to allow arrays as large as this, modify the code!') return if ad3['trailer']['nbdim'] == -3: # nbdim ? data = ad3['data'][0].reshape(ad3['trailer']['lz'], ad3['trailer']['ly'], ad3['trailer']['lx']) # else: data = ad3['data'][0].reshape(ad3['trailer']['ly'], ad3['trailer']['lx'], ad3['trailer']['lz']) if xyz & (ad3['trailer']['nbdim'] == 3): #return the data ordered in z, y, x data = data.transpose(2, 0, 1) if (not xyz) & (ad3['trailer']['nbdim'] == -3): #return data ordered in y, x, z data = data.transpose(1, 2, 0) data[np.where(data == -3.1E38)] = np.nan #ad3 = dtu(data, ad3['trailer'][0], filename) self.__array = data self.__trailer = ad3['trailer'][0] self.log.info ( "Successfully read adhoc 3d object from file %s." % str ( self.__file_name ) )
[docs] def get_trailer ( self ): """ This method's goal is to return the current trailer. Returns: * self.__trailer : numpy.ndarray Containing the current values for the trailer. """ return self.__trailer