Source code for sas.sascalc.corfunc.transform_thread
from sas.sascalc.data_util.calcthread import CalcThread
from sas.sascalc.dataloader.data_info import Data1D
from scipy.fftpack import dct
from scipy.integrate import trapz, cumtrapz
import numpy as np
from time import sleep
[docs]class FourierThread(CalcThread):
[docs] def __init__(self, raw_data, extrapolated_data, bg, updatefn=None,
completefn=None):
CalcThread.__init__(self, updatefn=updatefn, completefn=completefn)
self.data = raw_data
self.background = bg
self.extrapolation = extrapolated_data
[docs] def check_if_cancelled(self):
if self.isquit():
self.update("Fourier transform cancelled.")
self.complete(transforms=None)
return True
return False
[docs] def compute(self):
qs = self.extrapolation.x
iqs = self.extrapolation.y
q = self.data.x
background = self.background
xs = np.pi*np.arange(len(qs),dtype=np.float32)/(q[1]-q[0])/len(qs)
self.ready(delay=0.0)
self.update(msg="Fourier transform in progress.")
self.ready(delay=0.0)
if self.check_if_cancelled(): return
try:
# ----- 1D Correlation Function -----
gamma1 = dct((iqs-background)*qs**2)
Q = gamma1.max()
gamma1 /= Q
if self.check_if_cancelled(): return
# ----- 3D Correlation Function -----
# gamma3(R) = 1/R int_{0}^{R} gamma1(x) dx
# numerical approximation for increasing R using the trapezium rule
# Note: SasView 4.x series limited the range to xs <= 1000.0
gamma3 = cumtrapz(gamma1, xs)/xs[1:]
gamma3 = np.hstack((1.0, gamma3)) # gamma3(0) is defined as 1
if self.check_if_cancelled(): return
# ----- Interface Distribution function -----
idf = dct(-qs**4 * (iqs-background))
if self.check_if_cancelled(): return
# Manually calculate IDF(0.0), since scipy DCT tends to give us a
# very large negative value.
# IDF(x) = int_0^inf q^4 * I(q) * cos(q*x) * dq
# => IDF(0) = int_0^inf q^4 * I(q) * dq
idf[0] = trapz(-qs**4 * (iqs-background), qs)
idf /= Q # Normalise using scattering invariant
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(e)
self.update(msg="Fourier transform failed.")
self.complete(transforms=None)
return
if self.isquit():
return
self.update(msg="Fourier transform completed.")
transform1 = Data1D(xs, gamma1)
transform3 = Data1D(xs, gamma3)
idf = Data1D(xs, idf)
transforms = (transform1, transform3, idf)
self.complete(transforms=transforms)
[docs]class HilbertThread(CalcThread):
[docs] def __init__(self, raw_data, extrapolated_data, bg, updatefn=None,
completefn=None):
CalcThread.__init__(self, updatefn=updatefn, completefn=completefn)
self.data = raw_data
self.background = bg
self.extrapolation = extrapolated_data
[docs] def compute(self):
qs = self.extrapolation.x
iqs = self.extrapolation.y
q = self.data.x
background = self.background
self.ready(delay=0.0)
self.update(msg="Starting Hilbert transform.")
self.ready(delay=0.0)
if self.isquit():
return
# TODO: Implement hilbert transform
self.update(msg="Hilbert transform completed.")
self.complete(transforms=None)