"""
Python implementation of the P(r) inversion
Pinvertor is the base class for the Invertor class
and provides the underlying computations.
"""
import math
import logging
import timeit
import numpy as np
from . import calc
logger = logging.getLogger(__name__)
[docs]class Pinvertor(object):
#q data
x = np.empty(0, dtype=np.float)
#I(q) data
y = np.empty(0, dtype=np.float)
#dI(q) data
err = np.empty(0, dtype=np.float)
#Number of q points
npoints = 0
#Number of I(q) points
ny = 0
#Number of dI(q) points
nerr = 0
#Slit height in units of q [A-1]
slit_height = 0.0
#Slit width in units of q [A-1]
slit_width = 0.0
[docs] def __init__(self):
#Maximum distance between any two points in the system
self.set_dmax(180)
#Minimum q to include in inversion
self.set_qmin(-1.0)
#Maximum q to include in inversion
self.set_qmax(-1.0)
#Flag for whether or not to evaluate a constant background
#while inverting
self.set_est_bck(0)
self.set_alpha(0.0)
[docs] def residuals(self, pars):
"""
Function to call to evaluate the residuals for P(r) inversion.
:param pars: input parameters.
:return: residuals - list of residuals.
"""
pars = np.float64(pars)
residuals = []
nslice = 25
regterm = calc.reg_term(pars, self.d_max, nslice)
resid = (self.y[0:self.npoints] - calc.iq(pars, self.d_max, self.x))/self.err
return list(resid**2 + self.alpha*regterm)
[docs] def pr_residuals(self, pars):
"""
Function to call to evaluate the residuals for P(r) minimization (for testing purposes).
:param pars: input parameters.
:return: residuals - list of residuals.
"""
pars = np.float64(pars)
residuals = []
nslice = 25
regterm = calc.reg_term(pars, self.d_max, nslice)
resid = (self.y[0:npoints] - calc.pr(pars, self.d_max, self.x))/self.err
return list(resid**2 + self.alpha * regterm)
[docs] def set_x(self, data):
"""
Function to set the x data.
:param data: Array of doubles to set x to.
:return: npoints - Number of entries found, the size of x.
"""
data = np.float64(data)
ndata = len(data)
self.__dict__['x'] = data
self.__dict__['npoints'] = int(ndata)
return self.npoints
[docs] def get_x(self, data):
"""
Function to get the x data.
:param data: Array to place x into
:return: npoints - Number of entries found.
"""
data = np.float64(data)
ndata = len(data)
if ndata < self.npoints:
logger.error("Pinvertor.get_x: input array too short for data.")
return None
data[:] = self.x
return self.npoints
[docs] def set_y(self, data):
"""
Function to set the y data.
:param data: Array of doubles to set y to.
:return: ny - Number of entries found.
"""
data = np.float64(data)
ndata = len(data)
self.__dict__['y'] = data
self.__dict__['ny'] = int(ndata)
return self.ny
[docs] def get_y(self, data):
"""
Function to get the y data.
:param data: Array of doubles to place y into.
:return: npoints - Number of entries found.
"""
data = np.float64(data)
ndata = len(data)
if ndata < self.ny:
logger.error("Pinvertor.get_y: input array too short for data.")
return None
data[:] = self.y
return self.npoints
[docs] def set_err(self, data):
"""
Function to set the err data.
:param data: Array of doubles to set err to.
:return: nerr - Number of entries found.
"""
data = np.float64(data)
ndata = len(data)
self.__dict__['err'] = data
self.__dict__['nerr'] = int(ndata)
return self.nerr
[docs] def get_err(self, data):
"""
Function to get the err data.
:param data: Array of doubles to place err into.
:return: npoints - number of entries found
"""
ndata = len(data)
data = np.float64(data)
if ndata < self.nerr:
logger.error("Pinvertor.get_err: input array too short for data.")
return None
data[:] = self.err
return self.npoints
[docs] def set_dmax(self, d_max):
"""
Sets the maximum distance.
:param d_max: float to set d_max to.
:return: d_max
"""
self._d_max = np.float64(d_max)
return self._d_max
[docs] def get_dmax(self):
"""
Gets the maximum distance.
:return: d_max.
"""
return self._d_max
[docs] def set_qmin(self, min_q):
"""
Sets the minimum q.
:param min_q: float to set q_min to.
:return: q_min.
"""
self._q_min = np.float64(min_q)
return self._q_min
[docs] def get_qmin(self):
"""
Gets the minimum q.
:return: q_min.
"""
return self._q_min
[docs] def set_qmax(self, max_q):
"""
Sets the maximum q.
:param max_q: float to set q_max to.
:return: q_max.
"""
self._q_max = np.float64(max_q)
return self._q_max
[docs] def get_qmax(self):
"""
Gets the maximum q.
:return: q_max.
"""
return self._q_max
[docs] def set_alpha(self, alpha):
"""
Sets the alpha parameter.
:param alpha_: float to set alpha to.
:return: alpha.
"""
self._alpha = np.float64(alpha)
return self._alpha
[docs] def get_alpha(self):
"""
Gets the alpha parameter.
:return: alpha.
"""
return self._alpha
[docs] def set_slit_width(self, slit_width):
"""
Sets the slit width in units of q [A-1].
:param slit_width: float to set slit_width to.
:return: slit_width.
"""
self.__dict__['slit_width'] = np.float64(slit_width)
return self.slit_width
[docs] def get_slit_width(self):
"""
Gets the slit width.
:return: slit_width.
"""
return self.slit_width
[docs] def set_slit_height(self, slit_height):
"""
Sets the slit height in units of q [A-1].
:param slit_height: float to set slit-height to.
:return: slit_height.
"""
self.__dict__['slit_height'] = np.float64(slit_height)
return self.slit_height
[docs] def get_slit_height(self):
"""
Gets the slit height.
:return: slit_height.
"""
return self.slit_height
[docs] def set_est_bck(self, est_bck):
"""
Sets background flag.
:param est_bck: int to set est_bck to.
:return: est_bck.
"""
self.__dict__['est_bck'] = int(est_bck)
return self.est_bck
[docs] def get_est_bck(self):
"""
Gets background flag.
:return: est_bck.
"""
return self.est_bck
[docs] def get_nx(self):
"""
Gets the number of x points.
:return: npoints.
"""
return self.npoints
[docs] def get_ny(self):
"""
Gets the number of y points.
:return: ny.
"""
return self.ny
[docs] def get_nerr(self):
"""
Gets the number of error points.
:return: nerr.
"""
return self.nerr
[docs] def iq(self, pars, q):
"""
Function to call to evaluate the scattering intensity.
:param pars: c-parameters
:param q: q, scalar or vector.
:return: I(q)
"""
q = np.float64(q)
pars = np.float64(pars)
pars = np.atleast_1d(pars)
q = np.atleast_1d(q)
iq_val = calc.iq(pars, self.d_max, q)
if iq_val.shape[0] == 1:
return np.asscalar(iq_val)
return iq_val
[docs] def get_iq_smeared(self, pars, q):
"""
Function to call to evaluate the scattering intensity.
The scattering intensity is slit-smeared.
:param pars: c-parameters
:param q: q, scalar or vector.
:return: I(q), either scalar or vector depending on q.
"""
q = np.float64(q)
q = np.atleast_1d(q)
pars = np.float64(pars)
npts = 21
iq_val = calc.iq_smeared(pars, q, self.d_max, self.slit_height, self.slit_width, npts)
#If q was a scalar
if iq_val.shape[0] == 1:
return np.asscalar(iq_val)
return iq_val
[docs] def pr(self, pars, r):
"""
Function to call to evaluate P(r).
:param pars: c-parameters.
:param r: r-value to evaluate P(r) at.
:return: P(r)
"""
r = np.float64(r)
pars = np.float64(pars)
pars = np.atleast_1d(pars)
r = np.atleast_1d(r)
pr_val = calc.pr(pars, self.d_max, r)
if len(pr_val) == 1:
#scalar
return np.asscalar(pr_val)
return pr_val
[docs] def get_pr_err(self, pars, pars_err, r):
"""
Function to call to evaluate P(r) with errors.
:param pars: c-parameters.
:param pars_err: pars_err.
:param r: r-value.
:return: (P(r), dP(r))
"""
pars = np.atleast_1d(np.float64(pars))
r = np.atleast_1d(np.float64(r))
if pars_err is None:
return calc.pr(pars, self.d_max, r), 0.0
else:
pars_err = np.float64(pars_err)
return calc.pr_err(pars, pars_err, self.d_max, r)
[docs] def is_valid(self):
"""
Check the validity of the stored data.
:return: Returns the number of points if it's all good, -1 otherwise.
"""
if self.npoints == self.ny and self.npoints == self.nerr:
return self.npoints
else:
return -1
[docs] def basefunc_ft(self, d_max, n, q):
"""
Returns the value of the nth Fourier transformed base function.
:param d_max: d_max.
:param n: n.
:param q: q, scalar or vector.
:return: nth Fourier transformed base function, evaluated at q.
"""
d_max = np.float64(d_max)
n = int(n)
q = np.float64(q)
q = np.atleast_1d(q)
ortho_val = calc.ortho_transformed(q, d_max, n)
if ortho_val.shape[0] == 1:
#If the q input was scalar.
return np.asscalar(ortho_val)
return ortho_val
[docs] def oscillations(self, pars):
"""
Returns the value of the oscillation figure of merit for
the given set of coefficients. For a sphere, the oscillation
figure of merit is 1.1.
:param pars: c-parameters.
:return: oscillation figure of merit.
"""
nslice = 100
pars = np.float64(pars)
pars = np.atleast_1d(pars)
oscill = calc.reg_term(pars, self.d_max, nslice)
norm = calc.int_pr_square(pars, self.d_max, nslice)
ret_val = np.sqrt(oscill/norm) / np.pi * self.d_max
return ret_val
[docs] def get_peaks(self, pars):
"""
Returns the number of peaks in the output P(r) distribution
for the given set of coefficients.
:param pars: c-parameters.
:return: number of P(r) peaks.
"""
nslice = 100
pars = np.float64(pars)
count = calc.npeaks(pars, self.d_max, nslice)
return count
[docs] def get_positive(self, pars):
"""
Returns the fraction of P(r) that is positive over
the full range of r for the given set of coefficients.
:param pars: c-parameters.
:return: fraction of P(r) that is positive.
"""
nslice = 100
pars = np.float64(pars)
pars = np.atleast_1d(pars)
fraction = calc.positive_integral(pars, self.d_max, nslice)
return fraction
[docs] def get_pos_err(self, pars, pars_err):
"""
Returns the fraction of P(r) that is 1 standard deviation
above zero over the full range of r for the given set of coefficients.
:param pars: c-parameters.
:param pars_err: pars_err.
:return: fraction of P(r) that is positive.
"""
nslice = 51
pars = np.float64(pars)
pars = np.atleast_1d(pars)
pars_err = np.float64(pars_err)
fraction = calc.positive_errors(pars, pars_err, self.d_max, nslice)
return fraction
[docs] def rg(self, pars):
"""
Returns the value of the radius of gyration Rg.
:param pars: c-parameters.
:return: Rg.
"""
nslice = 101
pars = np.float64(pars)
pars = np.atleast_1d(pars)
val = calc.rg(pars, self.d_max, nslice)
return val
[docs] def iq0(self, pars):
"""
Returns the value of I(q=0).
:param pars: c-parameters.
:return: I(q=0)
"""
nslice = 101
pars = np.float64(pars)
pars = np.atleast_1d(pars)
val = np.float64(4.0 * np.pi * calc.int_pr(pars, self.d_max, nslice))
return val
[docs] def accept_q(self, q):
"""
Check whether a q-value is within acceptable limits.
:return: 1 if accepted, 0 if rejected.
"""
if self.get_qmin() <= 0 and self.get_qmax() <= 0:
return True
return (q >= self.get_qmin()) & (q <= self.get_qmax())
[docs] def check_for_zero(self, x):
return (x == 0).any()
[docs] def _get_matrix(self, nfunc, nr):
"""
Returns A matrix and b vector for least square problem.
:param nfunc: number of base functions.
:param nr: number of r-points used when evaluating reg term.
:param a: A array to fill.
:param b: b vector to fill.
:return: 0
"""
nfunc = int(nfunc)
nr = int(nr)
a_obj = np.zeros([self.npoints + nr, nfunc])
b_obj = np.zeros(self.npoints + nr)
sqrt_alpha = np.sqrt(self.alpha)
pi = np.pi
offset = (1, 0)[self.est_bck == 1]
if self.check_for_zero(self.err):
raise RuntimeError("Pinvertor.get_matrix: Some I(Q) points have no error.")
#Compute A
#Whether or not to use ortho_transformed_smeared.
smeared = False
if self.slit_width > 0 or self.slit_height > 0:
smeared = True
npts = 21
#Get accept_q vector across all q.
q_accept_x = self.accept_q(self.x)
if isinstance(q_accept_x, bool):
#In the case of q_min and q_max <= 0, so returns scalar, and returns True
q_accept_x = np.ones(self.npoints, dtype=bool)
#The x and a that will be used for the first part of 'a' calculation, given to ortho_transformed
x_use = self.x[q_accept_x]
a_use = a_obj[0:self.npoints, :]
for j in range(nfunc):
if self.est_bck == 1 and j == 0:
a_use[q_accept_x, j] = 1.0/self.err[q_accept_x]
elif smeared:
a_use[q_accept_x, j] = calc.ortho_transformed_smeared(x_use, self.d_max, j+offset,
self.slit_height, self.slit_width, npts)/self.err[q_accept_x]
else:
a_use[q_accept_x, j] = calc.ortho_transformed(x_use, self.d_max, j+offset)/self.err[q_accept_x]
a_obj[0:self.npoints, :] = a_use
for j in range(nfunc):
i_r = np.arange(nr, dtype=np.float64)
#Implementing second stage A as a python vector operation with shape = [nr]
r = (self.d_max / nr) * i_r
tmp = pi * (j+offset) / self.d_max
res = (2.0 * sqrt_alpha * self.d_max/nr * tmp) * (2.0 * np.cos(tmp*r) + tmp * r * np.sin(tmp*r))
#Res should now be np vector size i_r.
a_obj[self.npoints:self.npoints+nr, j] = res
#Compute B
x_accept_index = self.accept_q(self.x)
#The part of b used for the vector operations, of the accepted q values.
b_used = b_obj[0:self.npoints]
b_used[x_accept_index] = self.y[x_accept_index] / self.err[x_accept_index]
b_obj[0:self.npoints] = b_used
return a_obj, b_obj
[docs] def _get_invcov_matrix(self, nfunc, nr, a_obj):
"""
Compute the inverse covariance matrix, defined as inv_cov = a_transposed x a.
:param nfunc: number of base functions.
:param nr: number of r-points used when evaluating reg term.
:param a: A array to fill.
:param inv_cov: inverse covariance array to be filled.
:return: 0
"""
nfunc = int(nfunc)
nr = int(nr)
cov_obj = np.zeros([nfunc, nfunc])
n_a = a_obj.size
n_cov = cov_obj.size
if not n_a >= (nfunc * (nr + self.npoints)):
raise RuntimeError("Pinvertor._get_invcov_matrix: a array too small.")
size = nr + self.npoints
cov_obj[:, :] = np.dot(a_obj.T, a_obj)
return cov_obj
[docs] def _get_reg_size(self, nfunc, nr, a_obj):
"""
Computes sum_sig and sum_reg of input array given.
:param nfunc: number of base functions.
:param nr: number of r-points used when evaluating reg term.
:param a_obj: Array to compute sum_sig and sum_reg of.
:return: Tuple of (sum_sig, sum_reg)
"""
nfunc = int(nfunc)
nr = int(nr)
if not a_obj.size >= nfunc * (nr + self.npoints):
raise RuntimeError("Pinvertor._get_reg_size: input array too short for data.")
a_pass = self.accept_q(self.x)
a_use = a_obj[0:self.npoints, :]
a_use = a_use[a_pass, :]
sum_sig = np.sum(a_use ** 2)
sum_reg = np.sum(a_obj[self.npoints:self.npoints+nr, :] ** 2)
return sum_sig, sum_reg