ScipyFitting module contains FitArrange , ScipyFit,
Parameter classes.All listed classes work together to perform a
simple fit with scipy optimizer.
import sys
import copy
import numpy
from sas.fit.AbstractFitEngine import FitEngine
from sas.fit.AbstractFitEngine import FResult
_SMALLVALUE = 1.0e-10
[docs]class SasAssembly:
Sas Assembly class a class wrapper to be call in optimizer.leastsq method
def __init__(self, paramlist, model=None, data=None, fitresult=None,
handler=None, curr_thread=None, msg_q=None):
:param Model: the model wrapper fro sas -model
:param Data: the data wrapper for sas data
self.model = model
self.data = data
self.paramlist = paramlist
self.msg_q = msg_q
self.curr_thread = curr_thread
self.handler = handler
self.fitresult = fitresult
self.res = []
self.true_res = []
self.func_name = "Functor"
self.theory = None
[docs] def chisq(self):
Calculates chi^2
:param params: list of parameter values
:return: chi^2
total = 0
for item in self.true_res:
total += item * item
if len(self.true_res) == 0:
return None
return total / (len(self.true_res) - len(self.paramlist))
def __call__(self, params):
Compute residuals
:param params: value of parameters to fit
#import thread
self.model.set_params(self.paramlist, params)
#print "params", params
self.true_res, theory = self.data.residuals(self.model.eval)
self.theory = copy.deepcopy(theory)
# check parameters range
if self.check_param_range():
# if the param value is outside of the bound
# just silent return res = inf
return self.res
self.res = self.true_res
if self.fitresult is not None:
self.fitresult.residuals = self.true_res
self.fitresult.iterations += 1
self.fitresult.theory = theory
#fitness = self.chisq(params=params)
fitness = self.chisq()
self.fitresult.pvec = params
if self.msg_q is not None:
if self.handler is not None:
if self.curr_thread != None:
#msg = "Fitting: Terminated... Note: Forcing to stop "
#msg += "fitting may cause a 'Functor error message' "
#msg += "being recorded in the log file....."
return self.res
[docs] def check_param_range(self):
Check the lower and upper bound of the parameter value
and set res to the inf if the value is outside of the
:limitation: the initial values must be within range.
is_outofbound = False
# loop through the fit parameters
model = self.model.model
for p in self.paramlist:
value = model.getParam(p)
low,high = model.details[p][1:3]
if low is not None and numpy.isfinite(low):
if p.value == 0:
# This value works on Scipy
# Do not change numbers below
# For leastsq, it needs a bit step back from the boundary
val = low - value * _SMALLVALUE
if value < val:
self.res *= 1e+6
is_outofbound = True
if high is not None and numpy.isfinite(high):
# This value works on Scipy
# Do not change numbers below
if value == 0:
# For leastsq, it needs a bit step back from the boundary
val = high + value * _SMALLVALUE
if value > val:
self.res *= 1e+6
is_outofbound = True
return is_outofbound
[docs]class ScipyFit(FitEngine):
ScipyFit performs the Fit.This class can be used as follow:
#Do the fit SCIPY
create an engine: engine = ScipyFit()
Use data must be of type plottable
Use a sas model
Add data with a dictionnary of FitArrangeDict where Uid is a key and data
is saved in FitArrange object.
Set model parameter "M1"= model.name add {model.parameter.name:value}.
:note: Set_param() if used must always preceded set_model()
for the fit to be performed.In case of Scipyfit set_param is called in
fit () automatically.
engine.set_param( model,"M1", {'A':2,'B':4})
Add model with a dictionnary of FitArrangeDict{} where Uid is a key and model
is save in FitArrange object.
engine.fit return chisqr,[model.parameter 1,2,..],[[err1....][..err2...]]
chisqr1, out1, cov1=engine.fit({model.parameter.name:value},qmin,qmax)
def __init__(self):
Creates a dictionary (self.fit_arrange_dict={})of FitArrange elements
with Uid as keys
self.curr_thread = None
#def fit(self, *args, **kw):
# return profile(self._fit, *args, **kw)
[docs] def fit(self, msg_q=None,
q=None, handler=None, curr_thread=None,
ftol=1.49012e-8, reset_flag=False):
fitproblem = []
for fproblem in self.fit_arrange_dict.itervalues():
if fproblem.get_to_fit() == 1:
if len(fitproblem) > 1 :
msg = "Scipy can't fit more than a single fit problem at a time."
raise RuntimeError, msg
elif len(fitproblem) == 0 :
raise RuntimeError, "No Assembly scheduled for Scipy fitting."
model = fitproblem[0].get_model()
pars = fitproblem[0].pars
if reset_flag:
# reset the initial value; useful for batch
for name in fitproblem[0].pars:
ind = fitproblem[0].pars.index(name)
model.model.setParam(name, fitproblem[0].vals[ind])
listdata = []
listdata = fitproblem[0].get_data()
# Concatenate dList set (contains one or more data)before fitting
data = listdata
self.curr_thread = curr_thread
ftol = ftol
# Check the initial value if it is within range
_check_param_range(model.model, pars)
result = FResult(model=model.model, data=data, param_list=pars)
result.fitter_id = self.fitter_id
if handler is not None:
functor = SasAssembly(paramlist=pars,
# This import must be here; otherwise it will be confused when more
# than one thread exist.
from scipy import optimize
out, cov_x, _, mesg, success = optimize.leastsq(functor,
if hasattr(sys, 'last_type') and sys.last_type == KeyboardInterrupt:
if handler is not None:
msg = "Fitting: Terminated!!!"
raise KeyboardInterrupt, msg
chisqr = functor.chisq()
if cov_x is not None and numpy.isfinite(cov_x).all():
stderr = numpy.sqrt(numpy.diag(cov_x))
stderr = []
result.index = data.idx
result.fitness = chisqr
result.stderr = stderr
result.pvec = out
result.success = success
result.theory = functor.theory
if handler is not None:
if q is not None:
return q
if success < 1 or success > 5:
result.fitness = None
return [result]
def _check_param_range(model, pars):
Check parameter range and set the initial value inside
if it is out of range.
: model: park model object
# loop through parameterset
for p in pars:
value = model.getParam(p)
low,high = model.details.setdefault(p,["",None,None])[1:3]
# if the range was defined, check the range
if low is not None and value <= low:
value = low + _get_zero_shift(low)
if high is not None and value > high:
value = high - _get_zero_shift(high)
# Check one more time if the new value goes below
# the low bound, If so, re-evaluate the value
# with the mean of the range.
if low is not None and value < low:
value = 0.5 * (low+high)
model.setParam(p, value)
def _get_zero_shift(limit):
Get 10% shift of the param value = 0 based on the range value
: param range: min or max value of the bounds
return 0.1 * (limit if limit != 0.0 else 1.0)
#def profile(fn, *args, **kw):
# import cProfile, pstats, os
# global call_result
# def call():
# global call_result
# call_result = fn(*args, **kw)
# cProfile.runctx('call()', dict(call=call), {}, 'profile.out')
# stats = pstats.Stats('profile.out')
# stats.sort_stats('time')
# stats.sort_stats('calls')
# stats.print_stats()
# os.unlink('profile.out')
# return call_result