Source code for sas.sascalc.dataloader.readers.cansas_reader_HDF5

"""
    NXcanSAS data reader for reading HDF5 formatted CanSAS files.
"""

import logging
import h5py
import numpy as np
import re
import os
import traceback

from ..data_info import plottable_1D, plottable_2D,\
    Data1D, Data2D, DataInfo, Process, Aperture, Collimation, \
    TransmissionSpectrum, Detector
from ..loader_exceptions import FileContentsException, DefaultReaderException
from ..file_reader_base_class import FileReader, decode

try:
  basestring
except NameError:  # CRUFT: python 2 support
  basestring = str

logger = logging.getLogger(__name__)


[docs]def h5attr(node, key, default=None): value = node.attrs.get(key, default) #print("h5attr", node, key, value, type(value)) if isinstance(value, np.ndarray) and value.dtype.char == 'S': return [decode(el) for el in value] elif isinstance(value, list): return [decode(el) for el in value] else: return decode(value)
[docs]class Reader(FileReader): """ A class for reading in NXcanSAS data files. The current implementation has been tested to load data generated by multiple facilities, all of which are known to produce NXcanSAS standards compliant data. Any number of data sets may be present within the file and any dimensionality of data may be used. Currently 1D and 2D SAS data sets are supported, but should be immediately extensible to SESANS data. Any number of SASdata groups may be present in a SASentry and the data within each SASdata group can be a single 1D I(Q), multi-framed 1D I(Q), 2D I(Qx, Qy) or multi-framed 2D I(Qx, Qy). :Dependencies: The NXcanSAS HDF5 reader requires h5py => v2.5.0 or later. """ # CanSAS version cansas_version = 2.0 # Data type name type_name = "NXcanSAS" # Wildcards type = ["NXcanSAS HDF5 Files (*.h5)|*.h5|"] # List of allowed extensions ext = ['.h5', '.H5'] # Flag to bypass extension check allow_all = True
[docs] def get_file_contents(self): """ This is the general read method that all SasView data_loaders must have. :param filename: A path for an HDF5 formatted CanSAS 2D data file. :return: List of Data1D/2D objects and/or a list of errors. """ # Reinitialize when loading a new data file to reset all class variables self.reset_state() filename = self.f_open.name self.f_open.close() # IO handled by h5py # Check that the file exists if os.path.isfile(filename): basename = os.path.basename(filename) _, extension = os.path.splitext(basename) # If the file type is not allowed, return empty list if extension in self.ext or self.allow_all: # Load the data file try: self.raw_data = h5py.File(filename, 'r') except Exception as e: if extension not in self.ext: msg = "NXcanSAS Reader could not load file {}".format( basename + extension) raise DefaultReaderException(msg) raise FileContentsException(e.message) try: # Read in all child elements of top level SASroot self.read_children(self.raw_data, []) # Add the last data set to the list of outputs self.add_data_set() except Exception as exc: raise FileContentsException(exc.message) finally: # Close the data file self.raw_data.close() for data_set in self.output: if isinstance(data_set, Data1D): if data_set.x.size < 5: exception = FileContentsException( "Fewer than 5 data points found.") data_set.errors.append(exception)
[docs] def reset_state(self): """ Create the reader object and define initial states for class variables """ super(Reader, self).reset_state() self.data1d = [] self.data2d = [] self.raw_data = None self.multi_frame = False self.data_frames = [] self.data_uncertainty_frames = [] self.errors = [] self.logging = [] self.q_names = [] self.mask_name = u'' self.i_name = u'' self.i_node = u'' self.i_uncertainties_name = u'' self.q_uncertainty_names = [] self.q_resolution_names = [] self.parent_class = u'' self.detector = Detector() self.collimation = Collimation() self.aperture = Aperture() self.process = Process() self.trans_spectrum = TransmissionSpectrum()
[docs] def read_children(self, data, parent_list): """ A recursive method for stepping through the hierarchical data file. :param data: h5py Group object of any kind :param parent: h5py Group parent name """ # Loop through each element of the parent and process accordingly for key in data.keys(): # Get all information for the current key value = data.get(key) class_name = h5attr(value, u'canSAS_class') if isinstance(class_name, (list, tuple, np.ndarray)): class_name = class_name[0] if class_name is None: class_name = h5attr(value, u'NX_class') if class_name is not None: class_prog = re.compile(class_name) else: class_prog = re.compile(value.name) if isinstance(value, h5py.Group): # Set parent class before recursion last_parent_class = self.parent_class self.parent_class = class_name parent_list.append(key) # If a new sasentry, store the current data sets and create # a fresh Data1D/2D object if class_prog.match(u'SASentry'): self.add_data_set(key) elif class_prog.match(u'SASdata'): self._find_data_attributes(value) self._initialize_new_data_set(value) # Recursion step to access data within the group try: self.read_children(value, parent_list) self.add_intermediate() except Exception as e: self.current_datainfo.errors.append(str(e)) logger.debug(traceback.format_exc()) # Reset parent class when returning from recursive method self.parent_class = last_parent_class parent_list.remove(key) elif isinstance(value, h5py.Dataset): # If this is a dataset, store the data appropriately data_set = value.value unit = self._get_unit(value) # Put scalars into lists to be sure they are iterable if np.isscalar(data_set): data_set = [data_set] for data_point in data_set: if isinstance(data_point, np.ndarray): if data_point.dtype.char == 'S': data_point = decode(bytes(data_point)) else: data_point = decode(data_point) # Top Level Meta Data if key == u'definition': if isinstance(data_set, basestring): self.current_datainfo.meta_data['reader'] = data_set break else: self.current_datainfo.meta_data[ 'reader'] = data_point # Run elif key == u'run': try: run_name = h5attr(value, 'name', default='name') run_dict = {data_point: run_name} self.current_datainfo.run_name = run_dict except Exception: pass if isinstance(data_set, basestring): self.current_datainfo.run.append(data_set) break else: self.current_datainfo.run.append(data_point) # Title elif key == u'title': if isinstance(data_set, basestring): self.current_datainfo.title = data_set break else: self.current_datainfo.title = data_point # Note elif key == u'SASnote': self.current_datainfo.notes.append(data_set) break # Sample Information elif self.parent_class == u'SASsample': self.process_sample(data_point, key) # Instrumental Information elif (key == u'name' and self.parent_class == u'SASinstrument'): self.current_datainfo.instrument = data_point # Detector elif self.parent_class == u'SASdetector': self.process_detector(data_point, key, unit) # Collimation elif self.parent_class == u'SAScollimation': self.process_collimation(data_point, key, unit) # Aperture elif self.parent_class == u'SASaperture': self.process_aperture(data_point, key) # Process Information elif self.parent_class == u'SASprocess': # CanSAS 2.0 self.process_process(data_point, key) # Source elif self.parent_class == u'SASsource': self.process_source(data_point, key, unit) # Everything else goes in meta_data elif self.parent_class == u'SASdata': if isinstance(self.current_dataset, plottable_2D): self.process_2d_data_object(data_set, key, unit) else: self.process_1d_data_object(data_set, key, unit) break elif self.parent_class == u'SAStransmission_spectrum': self.process_trans_spectrum(data_set, key) break else: new_key = self._create_unique_key( self.current_datainfo.meta_data, key) self.current_datainfo.meta_data[new_key] = data_point else: # I don't know if this reachable code self.errors.append("ShouldNeverHappenException")
[docs] def process_1d_data_object(self, data_set, key, unit): """ SASdata processor method for 1d data items :param data_set: data from HDF5 file :param key: canSAS_class attribute :param unit: unit attribute """ if key == self.i_name: if self.multi_frame: for x in range(0, data_set.shape[0]): self.data_frames.append(data_set[x].flatten()) else: self.current_dataset.y = data_set.flatten() self.current_dataset.yaxis("Intensity", unit) elif key == self.i_uncertainties_name: if self.multi_frame: for x in range(0, data_set.shape[0]): self.data_uncertainty_frames.append(data_set[x].flatten()) self.current_dataset.dy = data_set.flatten() elif key in self.q_names: self.current_dataset.xaxis("Q", unit) self.current_dataset.x = data_set.flatten() elif key in self.q_resolution_names: if (len(self.q_resolution_names) > 1 and np.where(self.q_resolution_names == key)[0] == 0): self.current_dataset.dxw = data_set.flatten() elif (len(self.q_resolution_names) > 1 and np.where(self.q_resolution_names == key)[0] == 1): self.current_dataset.dxl = data_set.flatten() else: self.current_dataset.dx = data_set.flatten() elif key in self.q_uncertainty_names: if (len(self.q_uncertainty_names) > 1 and np.where(self.q_uncertainty_names == key)[0] == 0): self.current_dataset.dxw = data_set.flatten() elif (len(self.q_uncertainty_names) > 1 and np.where(self.q_uncertainty_names == key)[0] == 1): self.current_dataset.dxl = data_set.flatten() else: self.current_dataset.dx = data_set.flatten() elif key == self.mask_name: self.current_dataset.mask = data_set.flatten() elif key == u'wavelength': self.current_datainfo.source.wavelength = data_set[0] self.current_datainfo.source.wavelength_unit = unit
[docs] def process_2d_data_object(self, data_set, key, unit): if key == self.i_name: self.current_dataset.data = data_set self.current_dataset.zaxis("Intensity", unit) elif key == self.i_uncertainties_name: self.current_dataset.err_data = data_set.flatten() elif key in self.q_names: self.current_dataset.xaxis("Q_x", unit) self.current_dataset.yaxis("Q_y", unit) if self.q_names[0] == self.q_names[1]: # All q data in a single array self.current_dataset.qx_data = data_set[0] self.current_dataset.qy_data = data_set[1] elif self.q_names.index(key) == 0: self.current_dataset.qx_data = data_set elif self.q_names.index(key) == 1: self.current_dataset.qy_data = data_set elif key in self.q_uncertainty_names or key in self.q_resolution_names: if ((self.q_uncertainty_names[0] == self.q_uncertainty_names[1]) or (self.q_resolution_names[0] == self.q_resolution_names[1])): # All q data in a single array self.current_dataset.dqx_data = data_set[0].flatten() self.current_dataset.dqy_data = data_set[1].flatten() elif (self.q_uncertainty_names.index(key) == 0 or self.q_resolution_names.index(key) == 0): self.current_dataset.dqx_data = data_set.flatten() elif (self.q_uncertainty_names.index(key) == 1 or self.q_resolution_names.index(key) == 1): self.current_dataset.dqy_data = data_set.flatten() self.current_dataset.yaxis("Q_y", unit) elif key == self.mask_name: self.current_dataset.mask = data_set.flatten() elif key == u'Qy': self.current_dataset.yaxis("Q_y", unit) self.current_dataset.qy_data = data_set.flatten() elif key == u'Qydev': self.current_dataset.dqy_data = data_set.flatten() elif key == u'Qx': self.current_dataset.xaxis("Q_x", unit) self.current_dataset.qx_data = data_set.flatten() elif key == u'Qxdev': self.current_dataset.dqx_data = data_set.flatten()
[docs] def process_trans_spectrum(self, data_set, key): """ SAStransmission_spectrum processor :param data_set: data from HDF5 file :param key: canSAS_class attribute """ if key == u'T': self.trans_spectrum.transmission = data_set.flatten() elif key == u'Tdev': self.trans_spectrum.transmission_deviation = data_set.flatten() elif key == u'lambda': self.trans_spectrum.wavelength = data_set.flatten()
[docs] def process_sample(self, data_point, key): """ SASsample processor :param data_point: Single point from an HDF5 data file :param key: class name data_point was taken from """ if key == u'Title': self.current_datainfo.sample.name = data_point elif key == u'name': self.current_datainfo.sample.name = data_point elif key == u'ID': self.current_datainfo.sample.name = data_point elif key == u'thickness': self.current_datainfo.sample.thickness = data_point elif key == u'temperature': self.current_datainfo.sample.temperature = data_point elif key == u'transmission': self.current_datainfo.sample.transmission = data_point elif key == u'x_position': self.current_datainfo.sample.position.x = data_point elif key == u'y_position': self.current_datainfo.sample.position.y = data_point elif key == u'pitch': self.current_datainfo.sample.orientation.x = data_point elif key == u'yaw': self.current_datainfo.sample.orientation.y = data_point elif key == u'roll': self.current_datainfo.sample.orientation.z = data_point elif key == u'details': self.current_datainfo.sample.details.append(data_point)
[docs] def process_detector(self, data_point, key, unit): """ SASdetector processor :param data_point: Single point from an HDF5 data file :param key: class name data_point was taken from :param unit: unit attribute from data set """ if key == u'name': self.detector.name = data_point elif key == u'SDD': self.detector.distance = float(data_point) self.detector.distance_unit = unit elif key == u'slit_length': self.detector.slit_length = float(data_point) self.detector.slit_length_unit = unit elif key == u'x_position': self.detector.offset.x = float(data_point) self.detector.offset_unit = unit elif key == u'y_position': self.detector.offset.y = float(data_point) self.detector.offset_unit = unit elif key == u'pitch': self.detector.orientation.x = float(data_point) self.detector.orientation_unit = unit elif key == u'roll': self.detector.orientation.z = float(data_point) self.detector.orientation_unit = unit elif key == u'yaw': self.detector.orientation.y = float(data_point) self.detector.orientation_unit = unit elif key == u'beam_center_x': self.detector.beam_center.x = float(data_point) self.detector.beam_center_unit = unit elif key == u'beam_center_y': self.detector.beam_center.y = float(data_point) self.detector.beam_center_unit = unit elif key == u'x_pixel_size': self.detector.pixel_size.x = float(data_point) self.detector.pixel_size_unit = unit elif key == u'y_pixel_size': self.detector.pixel_size.y = float(data_point) self.detector.pixel_size_unit = unit
[docs] def process_collimation(self, data_point, key, unit): """ SAScollimation processor :param data_point: Single point from an HDF5 data file :param key: class name data_point was taken from :param unit: unit attribute from data set """ if key == u'distance': self.collimation.length = data_point self.collimation.length_unit = unit elif key == u'name': self.collimation.name = data_point
[docs] def process_aperture(self, data_point, key): """ SASaperture processor :param data_point: Single point from an HDF5 data file :param key: class name data_point was taken from """ if key == u'shape': self.aperture.shape = data_point elif key == u'x_gap': self.aperture.size.x = data_point elif key == u'y_gap': self.aperture.size.y = data_point
[docs] def process_source(self, data_point, key, unit): """ SASsource processor :param data_point: Single point from an HDF5 data file :param key: class name data_point was taken from :param unit: unit attribute from data set """ if key == u'incident_wavelength': self.current_datainfo.source.wavelength = data_point self.current_datainfo.source.wavelength_unit = unit elif key == u'wavelength_max': self.current_datainfo.source.wavelength_max = data_point self.current_datainfo.source.wavelength_max_unit = unit elif key == u'wavelength_min': self.current_datainfo.source.wavelength_min = data_point self.current_datainfo.source.wavelength_min_unit = unit elif key == u'incident_wavelength_spread': self.current_datainfo.source.wavelength_spread = data_point self.current_datainfo.source.wavelength_spread_unit = unit elif key == u'beam_size_x': self.current_datainfo.source.beam_size.x = data_point self.current_datainfo.source.beam_size_unit = unit elif key == u'beam_size_y': self.current_datainfo.source.beam_size.y = data_point self.current_datainfo.source.beam_size_unit = unit elif key == u'beam_shape': self.current_datainfo.source.beam_shape = data_point elif key == u'radiation': self.current_datainfo.source.radiation = data_point elif key == u'type': self.current_datainfo.source.type = data_point elif key == u'probe': self.current_datainfo.source.probe = data_point
[docs] def process_process(self, data_point, key): """ SASprocess processor :param data_point: Single point from an HDF5 data file :param key: class name data_point was taken from """ term_match = re.compile(u'^term[0-9]+$') if key == u'Title': # CanSAS 2.0 self.process.name = data_point elif key == u'name': # NXcanSAS self.process.name = data_point elif key == u'description': self.process.description = data_point elif key == u'date': self.process.date = data_point elif term_match.match(key): self.process.term.append(data_point) else: self.process.notes.append(data_point)
[docs] def add_intermediate(self): """ This method stores any intermediate objects within the final data set after fully reading the set. :param parent: The NXclass name for the h5py Group object that just finished being processed """ if self.parent_class == u'SASprocess': self.current_datainfo.process.append(self.process) self.process = Process() elif self.parent_class == u'SASdetector': self.current_datainfo.detector.append(self.detector) self.detector = Detector() elif self.parent_class == u'SAStransmission_spectrum': self.current_datainfo.trans_spectrum.append(self.trans_spectrum) self.trans_spectrum = TransmissionSpectrum() elif self.parent_class == u'SAScollimation': self.current_datainfo.collimation.append(self.collimation) self.collimation = Collimation() elif self.parent_class == u'SASaperture': self.collimation.aperture.append(self.aperture) self.aperture = Aperture() elif self.parent_class == u'SASdata': if isinstance(self.current_dataset, plottable_2D): self.data2d.append(self.current_dataset) elif isinstance(self.current_dataset, plottable_1D): if self.multi_frame: for x in range(0, len(self.data_frames)): self.current_dataset.y = self.data_frames[x] if len(self.data_uncertainty_frames) > x: self.current_dataset.dy = \ self.data_uncertainty_frames[x] self.data1d.append(self.current_dataset) else: self.data1d.append(self.current_dataset)
[docs] def final_data_cleanup(self): """ Does some final cleanup and formatting on self.current_datainfo and all data1D and data2D objects and then combines the data and info into Data1D and Data2D objects """ # Type cast data arrays to float64 if len(self.current_datainfo.trans_spectrum) > 0: spectrum_list = [] for spectrum in self.current_datainfo.trans_spectrum: spectrum.transmission = spectrum.transmission.astype(np.float64) spectrum.transmission_deviation = \ spectrum.transmission_deviation.astype(np.float64) spectrum.wavelength = spectrum.wavelength.astype(np.float64) if len(spectrum.transmission) > 0: spectrum_list.append(spectrum) self.current_datainfo.trans_spectrum = spectrum_list # Append errors to dataset and reset class errors self.current_datainfo.errors = self.errors self.errors = [] # Combine all plottables with datainfo and append each to output # Type cast data arrays to float64 and find min/max as appropriate for dataset in self.data2d: # Calculate the actual Q matrix try: if dataset.q_data.size <= 1: dataset.q_data = np.sqrt(dataset.qx_data * dataset.qx_data + dataset.qy_data * dataset.qy_data).flatten() except: dataset.q_data = None if dataset.data.ndim == 2: dataset.y_bins = np.unique(dataset.qy_data.flatten()) dataset.x_bins = np.unique(dataset.qx_data.flatten()) dataset.data = dataset.data.flatten() dataset.qx_data = dataset.qx_data.flatten() dataset.qy_data = dataset.qy_data.flatten() try: iter(dataset.mask) dataset.mask = np.invert(np.asarray(dataset.mask, dtype=bool)) except TypeError: dataset.mask = np.ones(dataset.data.shape, dtype=bool) self.current_dataset = dataset self.send_to_output() for dataset in self.data1d: self.current_dataset = dataset self.send_to_output()
[docs] def add_data_set(self, key=""): """ Adds the current_dataset to the list of outputs after preforming final processing on the data and then calls a private method to generate a new data set. :param key: NeXus group name for current tree level """ if self.current_datainfo and self.current_dataset: self.final_data_cleanup() self.data_frames = [] self.data_uncertainty_frames = [] self.data1d = [] self.data2d = [] self.current_datainfo = DataInfo()
def _initialize_new_data_set(self, value=None): """ A private class method to generate a new 1D or 2D data object based on the type of data within the set. Outside methods should call add_data_set() to be sure any existing data is stored properly. :param parent_list: List of names of parent elements """ if self._is_2d_not_multi_frame(value): self.current_dataset = plottable_2D() else: x = np.array(0) y = np.array(0) self.current_dataset = plottable_1D(x, y) self.current_datainfo.filename = self.raw_data.filename
[docs] @staticmethod def as_list_or_array(iterable): """ Return value as a list if not already a list or array. :param iterable: :return: """ if not (isinstance(iterable, np.ndarray) or isinstance(iterable, list)): iterable = iterable.split(",") if isinstance(iterable, basestring)\ else [iterable] return iterable
def _find_data_attributes(self, value): """ A class to find the indices for Q, the name of the Qdev and Idev, and the name of the mask. :param value: SASdata/NXdata HDF5 Group """ # Initialize values to base types self.mask_name = u'' self.i_name = u'' self.i_node = u'' self.i_uncertainties_name = u'' self.q_names = [] self.q_uncertainty_names = [] self.q_resolution_names = [] # Get attributes signal = h5attr(value, "signal", "I") i_axes = h5attr(value, "I_axes", ["Q"]) q_indices = h5attr(value, "Q_indices", [0]) i_axes = self.as_list_or_array(i_axes) keys = value.keys() # Assign attributes to appropriate class variables self.q_names = [i_axes[int(v)] for v in self.as_list_or_array(q_indices)] self.mask_name = h5attr(value, "mask") self.i_name = signal self.i_node = value.get(self.i_name) for item in self.q_names: if item in keys: q_vals = value.get(item) uncertainties = h5attr(q_vals, "uncertainties") if uncertainties is None: uncertainties = h5attr(q_vals, "uncertainty") if isinstance(uncertainties, basestring): uncertainties = uncertainties.split(",") if uncertainties is not None: self.q_uncertainty_names = uncertainties resolutions = h5attr(q_vals, "resolutions") if isinstance(resolutions, basestring): resolutions = resolutions.split(",") if resolutions is not None: self.q_resolution_names = resolutions if self.i_name in keys: i_vals = value.get(self.i_name) self.i_uncertainties_name = h5attr(i_vals, "uncertainties") if self.i_uncertainties_name is None: self.i_uncertainties_name = h5attr(i_vals, "uncertainty") def _is_2d_not_multi_frame(self, value, i_base="", q_base=""): """ A private class to determine if the data set is 1d or 2d. :param value: Nexus/NXcanSAS data group :param basename: Approximate name of an entry to search for :return: True if 2D, otherwise false """ i_basename = i_base if i_base != "" else self.i_name i_vals = value.get(i_basename) q_basename = q_base if q_base != "" else self.q_names q_vals = value.get(q_basename[0]) self.multi_frame = (i_vals is not None and q_vals is not None and len(i_vals.shape) != 1 and len(q_vals.shape) == 1) return (i_vals is not None and len(i_vals.shape) != 1 and not self.multi_frame) def _create_unique_key(self, dictionary, name, numb=0): """ Create a unique key value for any dictionary to prevent overwriting Recurses until a unique key value is found. :param dictionary: A dictionary with any number of entries :param name: The index of the item to be added to dictionary :param numb: The number to be appended to the name, starts at 0 :return: The new name for the dictionary entry """ if dictionary.get(name) is not None: numb += 1 name = name.split("_")[0] name += "_{0}".format(numb) name = self._create_unique_key(dictionary, name, numb) return name def _get_unit(self, value): """ Find the unit for a particular value within the h5py dictionary :param value: attribute dictionary for a particular value set :return: unit for the value passed to the method """ unit = h5attr(value, u'units') if unit is None: unit = h5attr(value, u'unit') return unit