"""
Module that contains classes to hold information read from
reduced data files.
A good description of the data members can be found in
the CanSAS 1D XML data format:
http://www.smallangles.net/wgwiki/index.php/cansas1d_documentation
"""
#####################################################################
# This software was developed by the University of Tennessee as part of the
# Distributed Data Analysis of Neutron Scattering Experiments (DANSE)
# project funded by the US National Science Foundation.
# See the license text in license.txt
# copyright 2008, University of Tennessee
######################################################################
# TODO: Keep track of data manipulation in the 'process' data structure.
# TODO: This module should be independent of plottables. We should write
# an adapter class for plottables when needed.
from __future__ import print_function
import math
from math import fabs
import copy
import numpy as np
from sas.sascalc.data_util.uncertainty import Uncertainty
[docs]class plottable_1D(object):
"""
Data1D is a place holder for 1D plottables.
"""
# The presence of these should be mutually
# exclusive with the presence of Qdev (dx)
x = None
y = None
dx = None
dy = None
# Slit smearing length
dxl = None
# Slit smearing width
dxw = None
# SESANS specific params (wavelengths for spin echo length calculation)
lam = None
dlam = None
# Units
_xaxis = ''
_xunit = ''
_yaxis = ''
_yunit = ''
def __init__(self, x, y, dx=None, dy=None, dxl=None, dxw=None,
lam=None, dlam=None):
self.x = np.asarray(x)
self.y = np.asarray(y)
if dx is not None:
self.dx = np.asarray(dx)
if dy is not None:
self.dy = np.asarray(dy)
if dxl is not None:
self.dxl = np.asarray(dxl)
if dxw is not None:
self.dxw = np.asarray(dxw)
if lam is not None:
self.lam = np.asarray(lam)
if dlam is not None:
self.dlam = np.asarray(dlam)
[docs] def xaxis(self, label, unit):
"""
set the x axis label and unit
"""
self._xaxis = label
self._xunit = unit
[docs] def yaxis(self, label, unit):
"""
set the y axis label and unit
"""
self._yaxis = label
self._yunit = unit
[docs]class plottable_2D(object):
"""
Data2D is a place holder for 2D plottables.
"""
xmin = None
xmax = None
ymin = None
ymax = None
data = None
qx_data = None
qy_data = None
q_data = None
err_data = None
dqx_data = None
dqy_data = None
mask = None
x_bins = None
y_bins = None
# Units
_xaxis = ''
_xunit = ''
_yaxis = ''
_yunit = ''
_zaxis = ''
_zunit = ''
def __init__(self, data=None, err_data=None, qx_data=None,
qy_data=None, q_data=None, mask=None,
dqx_data=None, dqy_data=None, xmin=None, xmax=None,
ymin=None, ymax=None, zmin=None, zmax=None,
x_bins=None, y_bins=None):
self.data = np.asarray(data)
self.qx_data = np.asarray(qx_data)
self.qy_data = np.asarray(qy_data)
self.q_data = np.asarray(q_data)
if mask is not None:
self.mask = np.asarray(mask)
else:
self.mask = np.ones(self.data.shape, dtype=bool)
if err_data is not None:
self.err_data = np.asarray(err_data)
if dqx_data is not None:
self.dqx_data = np.asarray(dqx_data)
if dqy_data is not None:
self.dqy_data = np.asarray(dqy_data)
# plot limits
self.xmin = xmin
self.xmax = xmax
self.ymin = ymin
self.ymax = ymax
self.zmin = zmin
self.zmax = zmax
self.y_bins = x_bins if x_bins else []
self.x_bins = y_bins if y_bins else []
[docs] def xaxis(self, label, unit):
"""
set the x axis label and unit
"""
self._xaxis = label
self._xunit = unit
[docs] def yaxis(self, label, unit):
"""
set the y axis label and unit
"""
self._yaxis = label
self._yunit = unit
[docs] def zaxis(self, label, unit):
"""
set the z axis label and unit
"""
self._zaxis = label
self._zunit = unit
[docs]class Vector(object):
"""
Vector class to hold multi-dimensional objects
"""
# x component
x = None
# y component
y = None
# z component
z = None
def __init__(self, x=None, y=None, z=None):
"""
Initialization. Components that are not
set a set to None by default.
:param x: x component
:param y: y component
:param z: z component
"""
self.x = x
self.y = y
self.z = z
def __str__(self):
msg = "x = %s\ty = %s\tz = %s" % (str(self.x), str(self.y), str(self.z))
return msg
[docs]class Detector(object):
"""
Class to hold detector information
"""
# Name of the instrument [string]
name = None
# Sample to detector distance [float] [mm]
distance = None
distance_unit = 'mm'
# Offset of this detector position in X, Y,
# (and Z if necessary) [Vector] [mm]
offset = None
offset_unit = 'm'
# Orientation (rotation) of this detector in roll,
# pitch, and yaw [Vector] [degrees]
orientation = None
orientation_unit = 'degree'
# Center of the beam on the detector in X and Y
# (and Z if necessary) [Vector] [mm]
beam_center = None
beam_center_unit = 'mm'
# Pixel size in X, Y, (and Z if necessary) [Vector] [mm]
pixel_size = None
pixel_size_unit = 'mm'
# Slit length of the instrument for this detector.[float] [mm]
slit_length = None
slit_length_unit = 'mm'
def __init__(self):
"""
Initialize class attribute that are objects...
"""
self.offset = Vector()
self.orientation = Vector()
self.beam_center = Vector()
self.pixel_size = Vector()
def __str__(self):
_str = "Detector:\n"
_str += " Name: %s\n" % self.name
_str += " Distance: %s [%s]\n" % \
(str(self.distance), str(self.distance_unit))
_str += " Offset: %s [%s]\n" % \
(str(self.offset), str(self.offset_unit))
_str += " Orientation: %s [%s]\n" % \
(str(self.orientation), str(self.orientation_unit))
_str += " Beam center: %s [%s]\n" % \
(str(self.beam_center), str(self.beam_center_unit))
_str += " Pixel size: %s [%s]\n" % \
(str(self.pixel_size), str(self.pixel_size_unit))
_str += " Slit length: %s [%s]\n" % \
(str(self.slit_length), str(self.slit_length_unit))
return _str
[docs]class Aperture(object):
# Name
name = None
# Type
type = None
# Size name
size_name = None
# Aperture size [Vector]
size = None
size_unit = 'mm'
# Aperture distance [float]
distance = None
distance_unit = 'mm'
def __init__(self):
self.size = Vector()
[docs]class Collimation(object):
"""
Class to hold collimation information
"""
# Name
name = None
# Length [float] [mm]
length = None
length_unit = 'mm'
# Aperture
aperture = None
def __init__(self):
self.aperture = []
def __str__(self):
_str = "Collimation:\n"
_str += " Length: %s [%s]\n" % \
(str(self.length), str(self.length_unit))
for item in self.aperture:
_str += " Aperture size:%s [%s]\n" % \
(str(item.size), str(item.size_unit))
_str += " Aperture_dist:%s [%s]\n" % \
(str(item.distance), str(item.distance_unit))
return _str
[docs]class Source(object):
"""
Class to hold source information
"""
# Name
name = None
# Generic radiation type (Type and probe give more specific info) [string]
radiation = None
# Type and probe are only written to by the NXcanSAS reader
# Specific radiation type (Synchotron X-ray, Reactor neutron, etc) [string]
type = None
# Radiation probe (generic probe such as neutron, x-ray, muon, etc) [string]
probe = None
# Beam size name
beam_size_name = None
# Beam size [Vector] [mm]
beam_size = None
beam_size_unit = 'mm'
# Beam shape [string]
beam_shape = None
# Wavelength [float] [Angstrom]
wavelength = None
wavelength_unit = 'A'
# Minimum wavelength [float] [Angstrom]
wavelength_min = None
wavelength_min_unit = 'nm'
# Maximum wavelength [float] [Angstrom]
wavelength_max = None
wavelength_max_unit = 'nm'
# Wavelength spread [float] [Angstrom]
wavelength_spread = None
wavelength_spread_unit = 'percent'
def __init__(self):
self.beam_size = Vector()
def __str__(self):
_str = "Source:\n"
radiation = self.radiation
if self.radiation is None and self.type and self.probe:
radiation = self.type + " " + self.probe
_str += " Radiation: %s\n" % str(radiation)
_str += " Shape: %s\n" % str(self.beam_shape)
_str += " Wavelength: %s [%s]\n" % \
(str(self.wavelength), str(self.wavelength_unit))
_str += " Waveln_min: %s [%s]\n" % \
(str(self.wavelength_min), str(self.wavelength_min_unit))
_str += " Waveln_max: %s [%s]\n" % \
(str(self.wavelength_max), str(self.wavelength_max_unit))
_str += " Waveln_spread:%s [%s]\n" % \
(str(self.wavelength_spread), str(self.wavelength_spread_unit))
_str += " Beam_size: %s [%s]\n" % \
(str(self.beam_size), str(self.beam_size_unit))
return _str
"""
Definitions of radiation types
"""
NEUTRON = 'neutron'
XRAY = 'x-ray'
MUON = 'muon'
ELECTRON = 'electron'
[docs]class Sample(object):
"""
Class to hold the sample description
"""
# Short name for sample
name = ''
# ID
ID = ''
# Thickness [float] [mm]
thickness = None
thickness_unit = 'mm'
# Transmission [float] [fraction]
transmission = None
# Temperature [float] [No Default]
temperature = None
temperature_unit = None
# Position [Vector] [mm]
position = None
position_unit = 'mm'
# Orientation [Vector] [degrees]
orientation = None
orientation_unit = 'degree'
# Details
details = None
# SESANS zacceptance
zacceptance = (0,"")
yacceptance = (0,"")
def __init__(self):
self.position = Vector()
self.orientation = Vector()
self.details = []
def __str__(self):
_str = "Sample:\n"
_str += " ID: %s\n" % str(self.ID)
_str += " Transmission: %s\n" % str(self.transmission)
_str += " Thickness: %s [%s]\n" % \
(str(self.thickness), str(self.thickness_unit))
_str += " Temperature: %s [%s]\n" % \
(str(self.temperature), str(self.temperature_unit))
_str += " Position: %s [%s]\n" % \
(str(self.position), str(self.position_unit))
_str += " Orientation: %s [%s]\n" % \
(str(self.orientation), str(self.orientation_unit))
_str += " Details:\n"
for item in self.details:
_str += " %s\n" % item
return _str
[docs]class Process(object):
"""
Class that holds information about the processes
performed on the data.
"""
name = ''
date = ''
description = ''
term = None
notes = None
def __init__(self):
self.term = []
self.notes = []
[docs] def is_empty(self):
"""
Return True if the object is empty
"""
return (len(self.name) == 0 and len(self.date) == 0
and len(self.description) == 0 and len(self.term) == 0
and len(self.notes) == 0)
[docs] def single_line_desc(self):
"""
Return a single line string representing the process
"""
return "%s %s %s" % (self.name, self.date, self.description)
def __str__(self):
_str = "Process:\n"
_str += " Name: %s\n" % self.name
_str += " Date: %s\n" % self.date
_str += " Description: %s\n" % self.description
for item in self.term:
_str += " Term: %s\n" % item
for item in self.notes:
_str += " Note: %s\n" % item
return _str
[docs]class TransmissionSpectrum(object):
"""
Class that holds information about transmission spectrum
for white beams and spallation sources.
"""
name = ''
timestamp = ''
# Wavelength (float) [A]
wavelength = None
wavelength_unit = 'A'
# Transmission (float) [unit less]
transmission = None
transmission_unit = ''
# Transmission Deviation (float) [unit less]
transmission_deviation = None
transmission_deviation_unit = ''
def __init__(self):
self.wavelength = []
self.transmission = []
self.transmission_deviation = []
def __str__(self):
_str = "Transmission Spectrum:\n"
_str += " Name: \t{0}\n".format(self.name)
_str += " Timestamp: \t{0}\n".format(self.timestamp)
_str += " Wavelength unit: \t{0}\n".format(self.wavelength_unit)
_str += " Transmission unit:\t{0}\n".format(self.transmission_unit)
_str += " Trans. Dev. unit: \t{0}\n".format(
self.transmission_deviation_unit)
length_list = [len(self.wavelength), len(self.transmission),
len(self.transmission_deviation)]
_str += " Number of Pts: \t{0}\n".format(max(length_list))
return _str
[docs]class DataInfo(object):
"""
Class to hold the data read from a file.
It includes four blocks of data for the
instrument description, the sample description,
the data itself and any other meta data.
"""
# Title
title = ''
# Run number
run = None
# Run name
run_name = None
# File name
filename = ''
# Notes
notes = None
# Processes (Action on the data)
process = None
# Instrument name
instrument = ''
# Detector information
detector = None
# Sample information
sample = None
# Source information
source = None
# Collimation information
collimation = None
# Transmission Spectrum INfo
trans_spectrum = None
# Additional meta-data
meta_data = None
# Loading errors
errors = None
# SESANS data check
isSesans = None
def __init__(self):
"""
Initialization
"""
# Title
self.title = ''
# Run number
self.run = []
self.run_name = {}
# File name
self.filename = ''
# Notes
self.notes = []
# Processes (Action on the data)
self.process = []
# Instrument name
self.instrument = ''
# Detector information
self.detector = []
# Sample information
self.sample = Sample()
# Source information
self.source = Source()
# Collimation information
self.collimation = []
# Transmission Spectrum
self.trans_spectrum = []
# Additional meta-data
self.meta_data = {}
# Loading errors
self.errors = []
# SESANS data check
self.isSesans = False
[docs] def append_empty_process(self):
"""
"""
self.process.append(Process())
[docs] def add_notes(self, message=""):
"""
Add notes to datainfo
"""
self.notes.append(message)
def __str__(self):
"""
Nice printout
"""
_str = f"File: {self.filename}\n"
_str += f"Title: {self.title}\n"
_str += f"Run: {self.run}\n"
_str += f"SESANS: {self.isSesans}\n"
_str += f"Instrument: {self.instrument}\n"
_str += f"{str(self.sample)}\n"
_str += f"{str(self.source)}\n"
for item in self.detector:
_str += f"{str(item)}\n"
for item in self.collimation:
_str += f"{str(item)}\n"
for item in self.process:
_str += f"{str(item)}\n"
for item in self.notes:
_str += f"{str(item)}\n"
for item in self.trans_spectrum:
_str += f"{str(item)}\n"
return _str
# Private method to perform operation. Not implemented for DataInfo,
# but should be implemented for each data class inherited from DataInfo
# that holds actual data (ex.: Data1D)
def _perform_operation(self, other, operation):
"""
Private method to perform operation. Not implemented for DataInfo,
but should be implemented for each data class inherited from DataInfo
that holds actual data (ex.: Data1D)
"""
return NotImplemented
def _perform_union(self, other):
"""
Private method to perform union operation. Not implemented for DataInfo,
but should be implemented for each data class inherited from DataInfo
that holds actual data (ex.: Data1D)
"""
return NotImplemented
def __add__(self, other):
"""
Add two data sets
:param other: data set to add to the current one
:return: new data set
:raise ValueError: raised when two data sets are incompatible
"""
def operation(a, b):
return a + b
return self._perform_operation(other, operation)
def __radd__(self, other):
"""
Add two data sets
:param other: data set to add to the current one
:return: new data set
:raise ValueError: raised when two data sets are incompatible
"""
def operation(a, b):
return b + a
return self._perform_operation(other, operation)
def __sub__(self, other):
"""
Subtract two data sets
:param other: data set to subtract from the current one
:return: new data set
:raise ValueError: raised when two data sets are incompatible
"""
def operation(a, b):
return a - b
return self._perform_operation(other, operation)
def __rsub__(self, other):
"""
Subtract two data sets
:param other: data set to subtract from the current one
:return: new data set
:raise ValueError: raised when two data sets are incompatible
"""
def operation(a, b):
return b - a
return self._perform_operation(other, operation)
def __mul__(self, other):
"""
Multiply two data sets
:param other: data set to subtract from the current one
:return: new data set
:raise ValueError: raised when two data sets are incompatible
"""
def operation(a, b):
return a * b
return self._perform_operation(other, operation)
def __rmul__(self, other):
"""
Multiply two data sets
:param other: data set to subtract from the current one
:return: new data set
:raise ValueError: raised when two data sets are incompatible
"""
def operation(a, b):
return b * a
return self._perform_operation(other, operation)
def __truediv__(self, other):
"""
Divided a data set by another
:param other: data set that the current one is divided by
:return: new data set
:raise ValueError: raised when two data sets are incompatible
"""
def operation(a, b):
return a/b
return self._perform_operation(other, operation)
__div__ = __truediv__
def __rtruediv__(self, other):
"""
Divided a data set by another
:param other: data set that the current one is divided by
:return: new data set
:raise ValueError: raised when two data sets are incompatible
"""
def operation(a, b):
return b/a
return self._perform_operation(other, operation)
__rdiv__ = __rtruediv__
def __or__(self, other):
"""
Union a data set with another
:param other: data set to be unified
:return: new data set
:raise ValueError: raised when two data sets are incompatible
"""
return self._perform_union(other)
def __ror__(self, other):
"""
Union a data set with another
:param other: data set to be unified
:return: new data set
:raise ValueError: raised when two data sets are incompatible
"""
return self._perform_union(other)
[docs]class Data1D(plottable_1D, DataInfo):
"""
1D data class
"""
def __init__(self, x=None, y=None, dx=None, dy=None,
lam=None, dlam=None, isSesans=False):
DataInfo.__init__(self)
plottable_1D.__init__(self, x, y, dx, dy, None, None, lam, dlam)
self.isSesans = isSesans
try:
if self.isSesans: # the data is SESANS
self.x_unit = 'A'
self.y_unit = 'pol'
elif not self.isSesans: # the data is SANS
self.x_unit = '1/A'
self.y_unit = '1/cm'
except Exception: # the data is not recognized, notifying user
raise TypeError('Check documentation for supported 1D data formats')
def __str__(self):
"""
Nice printout
"""
_str = "%s\n" % DataInfo.__str__(self)
_str += "Data:\n"
_str += " Type: %s\n" % self.__class__.__name__
_str += " X-axis: %s\t[%s]\n" % (self._xaxis, self._xunit)
_str += " Y-axis: %s\t[%s]\n" % (self._yaxis, self._yunit)
_str += " Length: %g\n" % len(self.x)
return _str
[docs] def is_slit_smeared(self):
"""
Check whether the data has slit smearing information
:return: True is slit smearing info is present, False otherwise
"""
def _check(v):
return ((v.__class__ == list or v.__class__ == np.ndarray)
and len(v) > 0 and min(v) > 0)
return _check(self.dxl) or _check(self.dxw)
[docs] def clone_without_data(self, length=0, clone=None):
"""
Clone the current object, without copying the data (which
will be filled out by a subsequent operation).
The data arrays will be initialized to zero.
:param length: length of the data array to be initialized
:param clone: if provided, the data will be copied to clone
"""
from copy import deepcopy
if clone is None or not issubclass(clone.__class__, Data1D):
x = np.zeros(length)
dx = np.zeros(length)
y = np.zeros(length)
dy = np.zeros(length)
lam = np.zeros(length)
dlam = np.zeros(length)
clone = Data1D(x, y, lam=lam, dx=dx, dy=dy, dlam=dlam)
clone.title = self.title
clone.run = self.run
clone.filename = self.filename
clone.instrument = self.instrument
clone.notes = deepcopy(self.notes)
clone.process = deepcopy(self.process)
clone.detector = deepcopy(self.detector)
clone.sample = deepcopy(self.sample)
clone.source = deepcopy(self.source)
clone.collimation = deepcopy(self.collimation)
clone.trans_spectrum = deepcopy(self.trans_spectrum)
clone.meta_data = deepcopy(self.meta_data)
clone.errors = deepcopy(self.errors)
return clone
[docs] def copy_from_datainfo(self, data1d):
"""
copy values of Data1D of type DataLaoder.Data_info
"""
self.x = copy.deepcopy(data1d.x)
self.y = copy.deepcopy(data1d.y)
self.dy = copy.deepcopy(data1d.dy)
if hasattr(data1d, "dx"):
self.dx = copy.deepcopy(data1d.dx)
if hasattr(data1d, "dxl"):
self.dxl = copy.deepcopy(data1d.dxl)
if hasattr(data1d, "dxw"):
self.dxw = copy.deepcopy(data1d.dxw)
self.xaxis(data1d._xaxis, data1d._xunit)
self.yaxis(data1d._yaxis, data1d._yunit)
self.title = data1d.title
def _validity_check(self, other):
"""
Checks that the data lengths are compatible.
Checks that the x vectors are compatible.
Returns errors vectors equal to original
errors vectors if they were present or vectors
of zeros when none was found.
:param other: other data set for operation
:return: dy for self, dy for other [numpy arrays]
:raise ValueError: when lengths are not compatible
"""
dy_other = None
if isinstance(other, Data1D):
# Check that data lengths are the same
if len(self.x) != len(other.x) or len(self.y) != len(other.y):
msg = "Unable to perform operation: data length are not equal"
raise ValueError(msg)
# Here we could also extrapolate between data points
TOLERANCE = 0.01
for i in range(len(self.x)):
if fabs(self.x[i] - other.x[i]) > self.x[i]*TOLERANCE:
msg = "Incompatible data sets: x-values do not match"
raise ValueError(msg)
# Check that the other data set has errors, otherwise
# create zero vector
dy_other = other.dy
if other.dy is None or (len(other.dy) != len(other.y)):
dy_other = np.zeros(len(other.y))
# Check that we have errors, otherwise create zero vector
dy = self.dy
if self.dy is None or (len(self.dy) != len(self.y)):
dy = np.zeros(len(self.y))
return dy, dy_other
def _perform_operation(self, other, operation):
"""
"""
# First, check the data compatibility
dy, dy_other = self._validity_check(other)
result = self.clone_without_data(len(self.x))
if self.dxw is None:
result.dxw = None
else:
result.dxw = np.zeros(len(self.x))
if self.dxl is None:
result.dxl = None
else:
result.dxl = np.zeros(len(self.x))
for i in range(len(self.x)):
result.x[i] = self.x[i]
if self.dx is not None and len(self.x) == len(self.dx):
result.dx[i] = self.dx[i]
if self.dxw is not None and len(self.x) == len(self.dxw):
result.dxw[i] = self.dxw[i]
if self.dxl is not None and len(self.x) == len(self.dxl):
result.dxl[i] = self.dxl[i]
a = Uncertainty(self.y[i], dy[i]**2)
if isinstance(other, Data1D):
b = Uncertainty(other.y[i], dy_other[i]**2)
if other.dx is not None:
result.dx[i] *= self.dx[i]
result.dx[i] += (other.dx[i]**2)
result.dx[i] /= 2
result.dx[i] = math.sqrt(result.dx[i])
if result.dxl is not None and other.dxl is not None:
result.dxl[i] *= self.dxl[i]
result.dxl[i] += (other.dxl[i]**2)
result.dxl[i] /= 2
result.dxl[i] = math.sqrt(result.dxl[i])
else:
b = other
output = operation(a, b)
result.y[i] = output.x
result.dy[i] = math.sqrt(math.fabs(output.variance))
return result
def _validity_check_union(self, other):
"""
Checks that the data lengths are compatible.
Checks that the x vectors are compatible.
Returns errors vectors equal to original
errors vectors if they were present or vectors
of zeros when none was found.
:param other: other data set for operation
:return: bool
:raise ValueError: when data types are not compatible
"""
if not isinstance(other, Data1D):
msg = "Unable to perform operation: different types of data set"
raise ValueError(msg)
return True
def _perform_union(self, other):
"""
"""
# First, check the data compatibility
self._validity_check_union(other)
result = self.clone_without_data(len(self.x) + len(other.x))
if self.dy is None or other.dy is None:
result.dy = None
else:
result.dy = np.zeros(len(self.x) + len(other.x))
if self.dx is None or other.dx is None:
result.dx = None
else:
result.dx = np.zeros(len(self.x) + len(other.x))
if self.dxw is None or other.dxw is None:
result.dxw = None
else:
result.dxw = np.zeros(len(self.x) + len(other.x))
if self.dxl is None or other.dxl is None:
result.dxl = None
else:
result.dxl = np.zeros(len(self.x) + len(other.x))
result.x = np.append(self.x, other.x)
# argsorting
ind = np.argsort(result.x)
result.x = result.x[ind]
result.y = np.append(self.y, other.y)
result.y = result.y[ind]
if result.dy is not None:
result.dy = np.append(self.dy, other.dy)
result.dy = result.dy[ind]
if result.dx is not None:
result.dx = np.append(self.dx, other.dx)
result.dx = result.dx[ind]
if result.dxw is not None:
result.dxw = np.append(self.dxw, other.dxw)
result.dxw = result.dxw[ind]
if result.dxl is not None:
result.dxl = np.append(self.dxl, other.dxl)
result.dxl = result.dxl[ind]
return result
[docs]class Data2D(plottable_2D, DataInfo):
"""
2D data class
"""
# Units for Q-values
Q_unit = '1/A'
# Units for I(Q) values
I_unit = '1/cm'
# No 2D SESANS data as of yet. Always set it to False
isSesans = False
def __init__(self, data=None, err_data=None, qx_data=None,
qy_data=None, q_data=None, mask=None,
dqx_data=None, dqy_data=None,
xmin=None, xmax=None, ymin=None, ymax=None,
zmin=None, zmax=None):
DataInfo.__init__(self)
plottable_2D.__init__(self, data=data, err_data=err_data,
qx_data=qx_data, qy_data=qy_data,
dqx_data=dqx_data, dqy_data=dqy_data,
q_data=q_data, mask=mask, xmin=xmin, xmax=xmax,
ymin=ymin, ymax=ymax, zmin=zmin, zmax=zmax)
if len(self.detector) > 0:
raise RuntimeError("Data2D: Detector bank already filled at init")
def __str__(self):
_str = "%s\n" % DataInfo.__str__(self)
_str += "Data:\n"
_str += " Type: %s\n" % self.__class__.__name__
_str += " X-axis: %s\t[%s]\n" % (self._xaxis, self._xunit)
_str += " Y-axis: %s\t[%s]\n" % (self._yaxis, self._yunit)
_str += " Z-axis: %s\t[%s]\n" % (self._zaxis, self._zunit)
_str += " Length: %g \n" % (len(self.data))
_str += " Shape: (%d, %d)\n" % (len(self.y_bins),
len(self.x_bins))
return _str
[docs] def clone_without_data(self, length=0, clone=None):
"""
Clone the current object, without copying the data (which
will be filled out by a subsequent operation).
The data arrays will be initialized to zero.
:param length: length of the data array to be initialized
:param clone: if provided, the data will be copied to clone
"""
from copy import deepcopy
if clone is None or not issubclass(clone.__class__, Data2D):
data = np.zeros(length)
err_data = np.zeros(length)
qx_data = np.zeros(length)
qy_data = np.zeros(length)
q_data = np.zeros(length)
mask = np.zeros(length)
clone = Data2D(data=data, err_data=err_data,
qx_data=qx_data, qy_data=qy_data,
q_data=q_data, mask=mask)
clone._xaxis = self._xaxis
clone._yaxis = self._yaxis
clone._zaxis = self._zaxis
clone._xunit = self._xunit
clone._yunit = self._yunit
clone._zunit = self._zunit
clone.x_bins = self.x_bins
clone.y_bins = self.y_bins
clone.title = self.title
clone.run = self.run
clone.filename = self.filename
clone.instrument = self.instrument
clone.notes = deepcopy(self.notes)
clone.process = deepcopy(self.process)
clone.detector = deepcopy(self.detector)
clone.sample = deepcopy(self.sample)
clone.source = deepcopy(self.source)
clone.collimation = deepcopy(self.collimation)
clone.trans_spectrum = deepcopy(self.trans_spectrum)
clone.meta_data = deepcopy(self.meta_data)
clone.errors = deepcopy(self.errors)
return clone
[docs] def copy_from_datainfo(self, data2d):
"""
copy value of Data2D of type DataLoader.data_info
"""
self.data = copy.deepcopy(data2d.data)
self.qx_data = copy.deepcopy(data2d.qx_data)
self.qy_data = copy.deepcopy(data2d.qy_data)
self.q_data = copy.deepcopy(data2d.q_data)
self.mask = copy.deepcopy(data2d.mask)
self.err_data = copy.deepcopy(data2d.err_data)
self.x_bins = copy.deepcopy(data2d.x_bins)
self.y_bins = copy.deepcopy(data2d.y_bins)
if data2d.dqx_data is not None:
self.dqx_data = copy.deepcopy(data2d.dqx_data)
if data2d.dqy_data is not None:
self.dqy_data = copy.deepcopy(data2d.dqy_data)
self.xmin = data2d.xmin
self.xmax = data2d.xmax
self.ymin = data2d.ymin
self.ymax = data2d.ymax
if hasattr(data2d, "zmin"):
self.zmin = data2d.zmin
if hasattr(data2d, "zmax"):
self.zmax = data2d.zmax
self.xaxis(data2d._xaxis, data2d._xunit)
self.yaxis(data2d._yaxis, data2d._yunit)
self.title = data2d.title
def _validity_check(self, other):
"""
Checks that the data lengths are compatible.
Checks that the x vectors are compatible.
Returns errors vectors equal to original
errors vectors if they were present or vectors
of zeros when none was found.
:param other: other data set for operation
:return: dy for self, dy for other [numpy arrays]
:raise ValueError: when lengths are not compatible
"""
err_other = None
TOLERANCE = 0.01
msg_base = "Incompatible data sets: q-values do not match: "
if isinstance(other, Data2D):
# Check that data lengths are the same
if (len(self.data) != len(other.data)
or len(self.qx_data) != len(other.qx_data)
or len(self.qy_data) != len(other.qy_data)):
msg = "Unable to perform operation: data length are not equal"
raise ValueError(msg)
for ind in range(len(self.data)):
if (fabs(self.qx_data[ind] - other.qx_data[ind])
> fabs(self.qx_data[ind])*TOLERANCE):
msg = f"{msg_base}{self.qx_data[ind]} {other.qx_data[ind]}"
raise ValueError(msg)
if (fabs(self.qy_data[ind] - other.qy_data[ind])
> fabs(self.qy_data[ind])*TOLERANCE):
msg = f"{msg_base}{self.qy_data[ind]} {other.qy_data[ind]}"
raise ValueError(msg)
# Check that the scales match
err_other = other.err_data
if (other.err_data is None
or (len(other.err_data) != len(other.data))):
err_other = np.zeros(len(other.data))
# Check that we have errors, otherwise create zero vector
err = self.err_data
if self.err_data is None or (len(self.err_data) != len(self.data)):
err = np.zeros(len(other.data))
return err, err_other
def _perform_operation(self, other, operation):
"""
Perform 2D operations between data sets
:param other: other data set
:param operation: function defining the operation
"""
# First, check the data compatibility
dy, dy_other = self._validity_check(other)
result = self.clone_without_data(np.size(self.data))
if self.dqx_data is None or self.dqy_data is None:
result.dqx_data = None
result.dqy_data = None
else:
result.dqx_data = np.zeros(len(self.data))
result.dqy_data = np.zeros(len(self.data))
for i in range(np.size(self.data)):
result.data[i] = self.data[i]
if (self.err_data is not None
and np.size(self.data) == np.size(self.err_data)):
result.err_data[i] = self.err_data[i]
if self.dqx_data is not None:
result.dqx_data[i] = self.dqx_data[i]
if self.dqy_data is not None:
result.dqy_data[i] = self.dqy_data[i]
result.qx_data[i] = self.qx_data[i]
result.qy_data[i] = self.qy_data[i]
result.q_data[i] = self.q_data[i]
result.mask[i] = self.mask[i]
a = Uncertainty(self.data[i], dy[i]**2)
if isinstance(other, Data2D):
b = Uncertainty(other.data[i], dy_other[i]**2)
if other.dqx_data is not None and result.dqx_data is not None:
result.dqx_data[i] *= self.dqx_data[i]
result.dqx_data[i] += (other.dqx_data[i]**2)
result.dqx_data[i] /= 2
result.dqx_data[i] = math.sqrt(result.dqx_data[i])
if other.dqy_data is not None and result.dqy_data is not None:
result.dqy_data[i] *= self.dqy_data[i]
result.dqy_data[i] += (other.dqy_data[i]**2)
result.dqy_data[i] /= 2
result.dqy_data[i] = math.sqrt(result.dqy_data[i])
else:
b = other
output = operation(a, b)
result.data[i] = output.x
result.err_data[i] = math.sqrt(math.fabs(output.variance))
return result
@staticmethod
def _validity_check_union(self, other):
"""
Checks that the data lengths are compatible.
Checks that the x vectors are compatible.
Returns errors vectors equal to original
errors vectors if they were present or vectors
of zeros when none was found.
:param other: other data set for operation
:return: bool
:raise ValueError: when data types are not compatible
"""
if not isinstance(other, Data2D):
msg = "Unable to perform operation: different types of data set"
raise ValueError(msg)
return True
def _perform_union(self, other):
"""
Perform 2D operations between data sets
:param other: other data set
:param operation: function defining the operation
"""
# First, check the data compatibility
self._validity_check_union(other)
result = self.clone_without_data(np.size(self.data)
+ np.size(other.data))
result.xmin = self.xmin
result.xmax = self.xmax
result.ymin = self.ymin
result.ymax = self.ymax
if (self.dqx_data is None or self.dqy_data is None
or other.dqx_data is None or other.dqy_data is None):
result.dqx_data = None
result.dqy_data = None
else:
result.dqx_data = np.zeros(len(self.data) + np.size(other.data))
result.dqy_data = np.zeros(len(self.data) + np.size(other.data))
result.data = np.append(self.data, other.data)
result.qx_data = np.append(self.qx_data, other.qx_data)
result.qy_data = np.append(self.qy_data, other.qy_data)
result.q_data = np.append(self.q_data, other.q_data)
result.mask = np.append(self.mask, other.mask)
if result.err_data is not None:
result.err_data = np.append(self.err_data, other.err_data)
if self.dqx_data is not None:
result.dqx_data = np.append(self.dqx_data, other.dqx_data)
if self.dqy_data is not None:
result.dqy_data = np.append(self.dqy_data, other.dqy_data)
return result
[docs]def combine_data_info_with_plottable(data, datainfo):
"""
A function that combines the DataInfo data in self.current_datainto with a
plottable_1D or 2D data object.
:param data: A plottable_1D or plottable_2D data object
:param datainfo: A DataInfo object to be combined with the plottable
:return: A fully specified Data1D or Data2D object
"""
if isinstance(data, plottable_1D):
final_dataset = Data1D(data.x, data.y, isSesans=datainfo.isSesans)
final_dataset.dx = data.dx
final_dataset.dy = data.dy
final_dataset.dxl = data.dxl
final_dataset.dxw = data.dxw
final_dataset.x_unit = data._xunit
final_dataset.y_unit = data._yunit
final_dataset.xaxis(data._xaxis, data._xunit)
final_dataset.yaxis(data._yaxis, data._yunit)
elif isinstance(data, plottable_2D):
final_dataset = Data2D(data.data, data.err_data, data.qx_data,
data.qy_data, data.q_data, data.mask,
data.dqx_data, data.dqy_data)
final_dataset.xaxis(data._xaxis, data._xunit)
final_dataset.yaxis(data._yaxis, data._yunit)
final_dataset.zaxis(data._zaxis, data._zunit)
final_dataset.x_bins = data.x_bins
final_dataset.y_bins = data.y_bins
else:
return_string = ("Should Never Happen: _combine_data_info_with_plottabl"
"e input is not a plottable1d or plottable2d data "
"object")
return return_string
if hasattr(data, "xmax"):
final_dataset.xmax = data.xmax
if hasattr(data, "ymax"):
final_dataset.ymax = data.ymax
if hasattr(data, "xmin"):
final_dataset.xmin = data.xmin
if hasattr(data, "ymin"):
final_dataset.ymin = data.ymin
final_dataset.isSesans = datainfo.isSesans
final_dataset.title = datainfo.title
final_dataset.run = datainfo.run
final_dataset.run_name = datainfo.run_name
final_dataset.filename = datainfo.filename
final_dataset.notes = datainfo.notes
final_dataset.process = datainfo.process
final_dataset.instrument = datainfo.instrument
final_dataset.detector = datainfo.detector
final_dataset.sample = datainfo.sample
final_dataset.source = datainfo.source
final_dataset.collimation = datainfo.collimation
final_dataset.trans_spectrum = datainfo.trans_spectrum
final_dataset.meta_data = datainfo.meta_data
final_dataset.errors = datainfo.errors
return final_dataset