"""
CanSAS 2D data reader for reading HDF5 formatted CanSAS files.
"""
import h5py
import numpy as np
import re
import os
import sys
from ..data_info import plottable_1D, plottable_2D,\
Data1D, Data2D, DataInfo, Process, Aperture, Collimation, \
TransmissionSpectrum, Detector
from ..data_info import combine_data_info_with_plottable
from ..loader_exceptions import FileContentsException, DefaultReaderException
from ..file_reader_base_class import FileReader, decode
[docs]def h5attr(node, key, default=None):
return decode(node.attrs.get(key, default))
[docs]class Reader(FileReader):
"""
A class for reading in CanSAS v2.0 data files. The existing iteration opens
Mantid generated HDF5 formatted files with file extension .h5/.H5. Any
number of data sets may be present within the file and any dimensionality
of data may be used. Currently 1D and 2D SAS data sets are supported, but
future implementations will include 1D and 2D SESANS data.
Any number of SASdata sets may be present in a SASentry and the data within
can be either 1D I(Q) or 2D I(Qx, Qy).
Also supports reading NXcanSAS formatted HDF5 files
:Dependencies:
The CanSAS HDF5 reader requires h5py => v2.5.0 or later.
"""
# CanSAS version
cansas_version = 2.0
# Logged warnings or messages
logging = None
# List of errors for the current data set
errors = None
# Raw file contents to be processed
raw_data = None
# List of plottable1D objects that should be linked to the current_datainfo
data1d = None
# List of plottable2D objects that should be linked to the current_datainfo
data2d = None
# Data type name
type_name = "CanSAS 2.0"
# Wildcards
type = ["CanSAS 2.0 HDF5 Files (*.h5)|*.h5"]
# List of allowed extensions
ext = ['.h5', '.H5']
# Flag to bypass extension check
allow_all = True
[docs] def get_file_contents(self):
"""
This is the general read method that all SasView data_loaders must have.
:param filename: A path for an HDF5 formatted CanSAS 2D data file.
:return: List of Data1D/2D objects and/or a list of errors.
"""
# Reinitialize when loading a new data file to reset all class variables
self.reset_state()
filename = self.f_open.name
self.f_open.close() # IO handled by h5py
# Check that the file exists
if os.path.isfile(filename):
basename = os.path.basename(filename)
_, extension = os.path.splitext(basename)
# If the file type is not allowed, return empty list
if extension in self.ext or self.allow_all:
# Load the data file
try:
self.raw_data = h5py.File(filename, 'r')
except Exception as e:
if extension not in self.ext:
msg = "CanSAS2.0 HDF5 Reader could not load file {}".format(basename + extension)
raise DefaultReaderException(msg)
raise FileContentsException(e.message)
try:
# Read in all child elements of top level SASroot
self.read_children(self.raw_data, [])
# Add the last data set to the list of outputs
self.add_data_set()
except Exception as exc:
raise FileContentsException(exc.message)
finally:
# Close the data file
self.raw_data.close()
for dataset in self.output:
if isinstance(dataset, Data1D):
if dataset.x.size < 5:
self.output = []
raise FileContentsException("Fewer than 5 data points found.")
[docs] def reset_state(self):
"""
Create the reader object and define initial states for class variables
"""
super(Reader, self).reset_state()
self.data1d = []
self.data2d = []
self.raw_data = None
self.errors = set()
self.logging = []
self.parent_class = u''
self.detector = Detector()
self.collimation = Collimation()
self.aperture = Aperture()
self.process = Process()
self.trans_spectrum = TransmissionSpectrum()
[docs] def read_children(self, data, parent_list):
"""
A recursive method for stepping through the hierarchical data file.
:param data: h5py Group object of any kind
:param parent: h5py Group parent name
"""
# Loop through each element of the parent and process accordingly
for key in data.keys():
# Get all information for the current key
value = data.get(key)
class_name = h5attr(value, u'canSAS_class')
if class_name is None:
class_name = h5attr(value, u'NX_class')
if class_name is not None:
class_prog = re.compile(class_name)
else:
class_prog = re.compile(value.name)
if isinstance(value, h5py.Group):
# Set parent class before recursion
self.parent_class = class_name
parent_list.append(key)
# If a new sasentry, store the current data sets and create
# a fresh Data1D/2D object
if class_prog.match(u'SASentry'):
self.add_data_set(key)
elif class_prog.match(u'SASdata'):
self._initialize_new_data_set(parent_list)
# Recursion step to access data within the group
self.read_children(value, parent_list)
# Reset parent class when returning from recursive method
self.parent_class = class_name
self.add_intermediate()
parent_list.remove(key)
elif isinstance(value, h5py.Dataset):
# If this is a dataset, store the data appropriately
data_set = data[key][:]
unit = self._get_unit(value)
# I and Q Data
if key == u'I':
if isinstance(self.current_dataset, plottable_2D):
self.current_dataset.data = data_set
self.current_dataset.zaxis("Intensity", unit)
else:
self.current_dataset.y = data_set.flatten()
self.current_dataset.yaxis("Intensity", unit)
continue
elif key == u'Idev':
if isinstance(self.current_dataset, plottable_2D):
self.current_dataset.err_data = data_set.flatten()
else:
self.current_dataset.dy = data_set.flatten()
continue
elif key == u'Q':
self.current_dataset.xaxis("Q", unit)
if isinstance(self.current_dataset, plottable_2D):
self.current_dataset.q = data_set.flatten()
else:
self.current_dataset.x = data_set.flatten()
continue
elif key == u'Qdev':
self.current_dataset.dx = data_set.flatten()
continue
elif key == u'dQw':
self.current_dataset.dxw = data_set.flatten()
continue
elif key == u'dQl':
self.current_dataset.dxl = data_set.flatten()
continue
elif key == u'Qy':
self.current_dataset.yaxis("Q_y", unit)
self.current_dataset.qy_data = data_set.flatten()
continue
elif key == u'Qydev':
self.current_dataset.dqy_data = data_set.flatten()
continue
elif key == u'Qx':
self.current_dataset.xaxis("Q_x", unit)
self.current_dataset.qx_data = data_set.flatten()
continue
elif key == u'Qxdev':
self.current_dataset.dqx_data = data_set.flatten()
continue
elif key == u'Mask':
self.current_dataset.mask = data_set.flatten()
continue
# Transmission Spectrum
elif (key == u'T'
and self.parent_class == u'SAStransmission_spectrum'):
self.trans_spectrum.transmission = data_set.flatten()
continue
elif (key == u'Tdev'
and self.parent_class == u'SAStransmission_spectrum'):
self.trans_spectrum.transmission_deviation = \
data_set.flatten()
continue
elif (key == u'lambda'
and self.parent_class == u'SAStransmission_spectrum'):
self.trans_spectrum.wavelength = data_set.flatten()
continue
for data_point in data_set:
if isinstance(data_point, np.ndarray):
if data_point.dtype.char == 'S':
data_point = decode(bytes(data_point))
else:
data_point = decode(data_point)
# Top Level Meta Data
if key == u'definition':
self.current_datainfo.meta_data['reader'] = data_point
elif key == u'run':
self.current_datainfo.run.append(data_point)
try:
run_name = h5attr(value, 'name')
run_dict = {data_point: run_name}
self.current_datainfo.run_name = run_dict
except Exception:
pass
elif key == u'title':
self.current_datainfo.title = data_point
elif key == u'SASnote':
self.current_datainfo.notes.append(data_point)
# Sample Information
# CanSAS 2.0 format
elif key == u'Title' and self.parent_class == u'SASsample':
self.current_datainfo.sample.name = data_point
# NXcanSAS format
elif key == u'name' and self.parent_class == u'SASsample':
self.current_datainfo.sample.name = data_point
# NXcanSAS format
elif key == u'ID' and self.parent_class == u'SASsample':
self.current_datainfo.sample.name = data_point
elif (key == u'thickness'
and self.parent_class == u'SASsample'):
self.current_datainfo.sample.thickness = data_point
elif (key == u'temperature'
and self.parent_class == u'SASsample'):
self.current_datainfo.sample.temperature = data_point
elif (key == u'transmission'
and self.parent_class == u'SASsample'):
self.current_datainfo.sample.transmission = data_point
elif (key == u'x_position'
and self.parent_class == u'SASsample'):
self.current_datainfo.sample.position.x = data_point
elif (key == u'y_position'
and self.parent_class == u'SASsample'):
self.current_datainfo.sample.position.y = data_point
elif key == u'pitch' and self.parent_class == u'SASsample':
self.current_datainfo.sample.orientation.x = data_point
elif key == u'yaw' and self.parent_class == u'SASsample':
self.current_datainfo.sample.orientation.y = data_point
elif key == u'roll' and self.parent_class == u'SASsample':
self.current_datainfo.sample.orientation.z = data_point
elif (key == u'details'
and self.parent_class == u'SASsample'):
self.current_datainfo.sample.details.append(data_point)
# Instrumental Information
elif (key == u'name'
and self.parent_class == u'SASinstrument'):
self.current_datainfo.instrument = data_point
elif key == u'name' and self.parent_class == u'SASdetector':
self.detector.name = data_point
elif key == u'SDD' and self.parent_class == u'SASdetector':
self.detector.distance = float(data_point)
self.detector.distance_unit = unit
elif (key == u'slit_length'
and self.parent_class == u'SASdetector'):
self.detector.slit_length = float(data_point)
self.detector.slit_length_unit = unit
elif (key == u'x_position'
and self.parent_class == u'SASdetector'):
self.detector.offset.x = float(data_point)
self.detector.offset_unit = unit
elif (key == u'y_position'
and self.parent_class == u'SASdetector'):
self.detector.offset.y = float(data_point)
self.detector.offset_unit = unit
elif (key == u'pitch'
and self.parent_class == u'SASdetector'):
self.detector.orientation.x = float(data_point)
self.detector.orientation_unit = unit
elif key == u'roll' and self.parent_class == u'SASdetector':
self.detector.orientation.z = float(data_point)
self.detector.orientation_unit = unit
elif key == u'yaw' and self.parent_class == u'SASdetector':
self.detector.orientation.y = float(data_point)
self.detector.orientation_unit = unit
elif (key == u'beam_center_x'
and self.parent_class == u'SASdetector'):
self.detector.beam_center.x = float(data_point)
self.detector.beam_center_unit = unit
elif (key == u'beam_center_y'
and self.parent_class == u'SASdetector'):
self.detector.beam_center.y = float(data_point)
self.detector.beam_center_unit = unit
elif (key == u'x_pixel_size'
and self.parent_class == u'SASdetector'):
self.detector.pixel_size.x = float(data_point)
self.detector.pixel_size_unit = unit
elif (key == u'y_pixel_size'
and self.parent_class == u'SASdetector'):
self.detector.pixel_size.y = float(data_point)
self.detector.pixel_size_unit = unit
elif (key == u'distance'
and self.parent_class == u'SAScollimation'):
self.collimation.length = data_point
self.collimation.length_unit = unit
elif (key == u'name'
and self.parent_class == u'SAScollimation'):
self.collimation.name = data_point
elif (key == u'shape'
and self.parent_class == u'SASaperture'):
self.aperture.shape = data_point
elif (key == u'x_gap'
and self.parent_class == u'SASaperture'):
self.aperture.size.x = data_point
elif (key == u'y_gap'
and self.parent_class == u'SASaperture'):
self.aperture.size.y = data_point
# Process Information
elif (key == u'Title'
and self.parent_class == u'SASprocess'): # CanSAS 2.0
self.process.name = data_point
elif (key == u'name'
and self.parent_class == u'SASprocess'): # NXcanSAS
self.process.name = data_point
elif (key == u'description'
and self.parent_class == u'SASprocess'):
self.process.description = data_point
elif key == u'date' and self.parent_class == u'SASprocess':
self.process.date = data_point
elif key == u'term' and self.parent_class == u'SASprocess':
self.process.term = data_point
elif self.parent_class == u'SASprocess':
self.process.notes.append(data_point)
# Source
elif (key == u'wavelength'
and self.parent_class == u'SASdata'):
self.current_datainfo.source.wavelength = data_point
self.current_datainfo.source.wavelength_unit = unit
elif (key == u'incident_wavelength'
and self.parent_class == 'SASsource'):
self.current_datainfo.source.wavelength = data_point
self.current_datainfo.source.wavelength_unit = unit
elif (key == u'wavelength_max'
and self.parent_class == u'SASsource'):
self.current_datainfo.source.wavelength_max = data_point
self.current_datainfo.source.wavelength_max_unit = unit
elif (key == u'wavelength_min'
and self.parent_class == u'SASsource'):
self.current_datainfo.source.wavelength_min = data_point
self.current_datainfo.source.wavelength_min_unit = unit
elif (key == u'incident_wavelength_spread'
and self.parent_class == u'SASsource'):
self.current_datainfo.source.wavelength_spread = \
data_point
self.current_datainfo.source.wavelength_spread_unit = \
unit
elif (key == u'beam_size_x'
and self.parent_class == u'SASsource'):
self.current_datainfo.source.beam_size.x = data_point
self.current_datainfo.source.beam_size_unit = unit
elif (key == u'beam_size_y'
and self.parent_class == u'SASsource'):
self.current_datainfo.source.beam_size.y = data_point
self.current_datainfo.source.beam_size_unit = unit
elif (key == u'beam_shape'
and self.parent_class == u'SASsource'):
self.current_datainfo.source.beam_shape = data_point
elif (key == u'radiation'
and self.parent_class == u'SASsource'):
self.current_datainfo.source.radiation = data_point
elif (key == u'transmission'
and self.parent_class == u'SASdata'):
self.current_datainfo.sample.transmission = data_point
# Everything else goes in meta_data
else:
new_key = self._create_unique_key(
self.current_datainfo.meta_data, key)
self.current_datainfo.meta_data[new_key] = data_point
else:
# I don't know if this reachable code
self.errors.add("ShouldNeverHappenException")
[docs] def final_data_cleanup(self):
"""
Does some final cleanup and formatting on self.current_datainfo and
all data1D and data2D objects and then combines the data and info into
Data1D and Data2D objects
"""
# Type cast data arrays to float64
if len(self.current_datainfo.trans_spectrum) > 0:
spectrum_list = []
for spectrum in self.current_datainfo.trans_spectrum:
spectrum.transmission = np.delete(spectrum.transmission, [0])
spectrum.transmission = spectrum.transmission.astype(np.float64)
spectrum.transmission_deviation = np.delete(
spectrum.transmission_deviation, [0])
spectrum.transmission_deviation = \
spectrum.transmission_deviation.astype(np.float64)
spectrum.wavelength = np.delete(spectrum.wavelength, [0])
spectrum.wavelength = spectrum.wavelength.astype(np.float64)
if len(spectrum.transmission) > 0:
spectrum_list.append(spectrum)
self.current_datainfo.trans_spectrum = spectrum_list
# Append errors to dataset and reset class errors
self.current_datainfo.errors = self.errors
self.errors.clear()
# Combine all plottables with datainfo and append each to output
# Type cast data arrays to float64 and find min/max as appropriate
for dataset in self.data2d:
zeros = np.ones(dataset.data.size, dtype=bool)
try:
for i in range(0, dataset.mask.size - 1):
zeros[i] = dataset.mask[i]
except:
self.errors.add(sys.exc_value)
dataset.mask = zeros
# Calculate the actual Q matrix
try:
if dataset.q_data.size <= 1:
dataset.q_data = np.sqrt(dataset.qx_data
* dataset.qx_data
+ dataset.qy_data
* dataset.qy_data)
except:
dataset.q_data = None
if dataset.data.ndim == 2:
(n_rows, n_cols) = dataset.data.shape
dataset.y_bins = dataset.qy_data[0::n_cols]
dataset.x_bins = dataset.qx_data[:n_cols]
dataset.data = dataset.data.flatten()
self.current_dataset = dataset
self.send_to_output()
for dataset in self.data1d:
self.current_dataset = dataset
self.send_to_output()
[docs] def add_data_set(self, key=""):
"""
Adds the current_dataset to the list of outputs after preforming final
processing on the data and then calls a private method to generate a
new data set.
:param key: NeXus group name for current tree level
"""
if self.current_datainfo and self.current_dataset:
self.final_data_cleanup()
self.data1d = []
self.data2d = []
self.current_datainfo = DataInfo()
def _initialize_new_data_set(self, parent_list=None):
"""
A private class method to generate a new 1D or 2D data object based on
the type of data within the set. Outside methods should call
add_data_set() to be sure any existing data is stored properly.
:param parent_list: List of names of parent elements
"""
if parent_list is None:
parent_list = []
if self._find_intermediate(parent_list, "Qx"):
self.current_dataset = plottable_2D()
else:
x = np.array(0)
y = np.array(0)
self.current_dataset = plottable_1D(x, y)
self.current_datainfo.filename = self.raw_data.filename
def _find_intermediate(self, parent_list, basename=""):
"""
A private class used to find an entry by either using a direct key or
knowing the approximate basename.
:param parent_list: List of parents nodes in the HDF5 file
:param basename: Approximate name of an entry to search for
:return:
"""
entry = False
key_prog = re.compile(basename)
top = self.raw_data
for parent in parent_list:
top = top.get(parent)
for key in top.keys():
if key_prog.match(key):
entry = True
break
return entry
def _create_unique_key(self, dictionary, name, numb=0):
"""
Create a unique key value for any dictionary to prevent overwriting
Recurses until a unique key value is found.
:param dictionary: A dictionary with any number of entries
:param name: The index of the item to be added to dictionary
:param numb: The number to be appended to the name, starts at 0
:return: The new name for the dictionary entry
"""
if dictionary.get(name) is not None:
numb += 1
name = name.split("_")[0]
name += "_{0}".format(numb)
name = self._create_unique_key(dictionary, name, numb)
return name
def _get_unit(self, value):
"""
Find the unit for a particular value within the h5py dictionary
:param value: attribute dictionary for a particular value set
:return: unit for the value passed to the method
"""
unit = h5attr(value, u'units')
if unit is None:
unit = h5attr(value, u'unit')
# Convert the unit formats
if unit == "1/A":
unit = "A^{-1}"
elif unit == "1/cm":
unit = "cm^{-1}"
return unit