"""
ScipyFitting module contains FitArrange , ScipyFit,
Parameter classes.All listed classes work together to perform a
simple fit with scipy optimizer.
"""
import sys
import copy
import numpy
from sas.fit.AbstractFitEngine import FitEngine
from sas.fit.AbstractFitEngine import FResult
_SMALLVALUE = 1.0e-10
[docs]class SasAssembly:
"""
Sas Assembly class a class wrapper to be call in optimizer.leastsq method
"""
def __init__(self, paramlist, model=None, data=None, fitresult=None,
handler=None, curr_thread=None, msg_q=None):
"""
:param Model: the model wrapper fro sas -model
:param Data: the data wrapper for sas data
"""
self.model = model
self.data = data
self.paramlist = paramlist
self.msg_q = msg_q
self.curr_thread = curr_thread
self.handler = handler
self.fitresult = fitresult
self.res = []
self.true_res = []
self.func_name = "Functor"
self.theory = None
[docs] def chisq(self):
"""
Calculates chi^2
:param params: list of parameter values
:return: chi^2
"""
total = 0
for item in self.true_res:
total += item * item
if len(self.true_res) == 0:
return None
return total / (len(self.true_res) - len(self.paramlist))
def __call__(self, params):
"""
Compute residuals
:param params: value of parameters to fit
"""
#import thread
self.model.set_params(self.paramlist, params)
#print "params", params
self.true_res, theory = self.data.residuals(self.model.eval)
self.theory = copy.deepcopy(theory)
# check parameters range
if self.check_param_range():
# if the param value is outside of the bound
# just silent return res = inf
return self.res
self.res = self.true_res
if self.fitresult is not None:
self.fitresult.set_model(model=self.model)
self.fitresult.residuals = self.true_res
self.fitresult.iterations += 1
self.fitresult.theory = theory
#fitness = self.chisq(params=params)
fitness = self.chisq()
self.fitresult.pvec = params
self.fitresult.set_fitness(fitness=fitness)
if self.msg_q is not None:
self.msg_q.put(self.fitresult)
if self.handler is not None:
self.handler.set_result(result=self.fitresult)
self.handler.update_fit()
if self.curr_thread != None:
try:
self.curr_thread.isquit()
except:
#msg = "Fitting: Terminated... Note: Forcing to stop "
#msg += "fitting may cause a 'Functor error message' "
#msg += "being recorded in the log file....."
#self.handler.stop(msg)
raise
return self.res
[docs] def check_param_range(self):
"""
Check the lower and upper bound of the parameter value
and set res to the inf if the value is outside of the
range
:limitation: the initial values must be within range.
"""
#time.sleep(0.01)
is_outofbound = False
# loop through the fit parameters
model = self.model.model
for p in self.paramlist:
value = model.getParam(p)
low,high = model.details[p][1:3]
if low is not None and numpy.isfinite(low):
if p.value == 0:
# This value works on Scipy
# Do not change numbers below
value = _SMALLVALUE
# For leastsq, it needs a bit step back from the boundary
val = low - value * _SMALLVALUE
if value < val:
self.res *= 1e+6
is_outofbound = True
break
if high is not None and numpy.isfinite(high):
# This value works on Scipy
# Do not change numbers below
if value == 0:
value = _SMALLVALUE
# For leastsq, it needs a bit step back from the boundary
val = high + value * _SMALLVALUE
if value > val:
self.res *= 1e+6
is_outofbound = True
break
return is_outofbound
[docs]class ScipyFit(FitEngine):
"""
ScipyFit performs the Fit.This class can be used as follow:
#Do the fit SCIPY
create an engine: engine = ScipyFit()
Use data must be of type plottable
Use a sas model
Add data with a dictionnary of FitArrangeDict where Uid is a key and data
is saved in FitArrange object.
engine.set_data(data,Uid)
Set model parameter "M1"= model.name add {model.parameter.name:value}.
:note: Set_param() if used must always preceded set_model()
for the fit to be performed.In case of Scipyfit set_param is called in
fit () automatically.
engine.set_param( model,"M1", {'A':2,'B':4})
Add model with a dictionnary of FitArrangeDict{} where Uid is a key and model
is save in FitArrange object.
engine.set_model(model,Uid)
engine.fit return chisqr,[model.parameter 1,2,..],[[err1....][..err2...]]
chisqr1, out1, cov1=engine.fit({model.parameter.name:value},qmin,qmax)
"""
def __init__(self):
"""
Creates a dictionary (self.fit_arrange_dict={})of FitArrange elements
with Uid as keys
"""
FitEngine.__init__(self)
self.curr_thread = None
#def fit(self, *args, **kw):
# return profile(self._fit, *args, **kw)
[docs] def fit(self, msg_q=None,
q=None, handler=None, curr_thread=None,
ftol=1.49012e-8, reset_flag=False):
"""
"""
fitproblem = []
for fproblem in self.fit_arrange_dict.itervalues():
if fproblem.get_to_fit() == 1:
fitproblem.append(fproblem)
if len(fitproblem) > 1 :
msg = "Scipy can't fit more than a single fit problem at a time."
raise RuntimeError, msg
elif len(fitproblem) == 0 :
raise RuntimeError, "No Assembly scheduled for Scipy fitting."
model = fitproblem[0].get_model()
pars = fitproblem[0].pars
if reset_flag:
# reset the initial value; useful for batch
for name in fitproblem[0].pars:
ind = fitproblem[0].pars.index(name)
model.model.setParam(name, fitproblem[0].vals[ind])
listdata = []
listdata = fitproblem[0].get_data()
# Concatenate dList set (contains one or more data)before fitting
data = listdata
self.curr_thread = curr_thread
ftol = ftol
# Check the initial value if it is within range
_check_param_range(model.model, pars)
result = FResult(model=model.model, data=data, param_list=pars)
result.fitter_id = self.fitter_id
if handler is not None:
handler.set_result(result=result)
functor = SasAssembly(paramlist=pars,
model=model,
data=data,
handler=handler,
fitresult=result,
curr_thread=curr_thread,
msg_q=msg_q)
try:
# This import must be here; otherwise it will be confused when more
# than one thread exist.
from scipy import optimize
out, cov_x, _, mesg, success = optimize.leastsq(functor,
model.get_params(pars),
ftol=ftol,
full_output=1)
except:
if hasattr(sys, 'last_type') and sys.last_type == KeyboardInterrupt:
if handler is not None:
msg = "Fitting: Terminated!!!"
handler.stop(msg)
raise KeyboardInterrupt, msg
else:
raise
chisqr = functor.chisq()
if cov_x is not None and numpy.isfinite(cov_x).all():
stderr = numpy.sqrt(numpy.diag(cov_x))
else:
stderr = []
result.index = data.idx
result.fitness = chisqr
result.stderr = stderr
result.pvec = out
result.success = success
result.theory = functor.theory
if handler is not None:
handler.set_result(result=result)
handler.update_fit(last=True)
if q is not None:
q.put(result)
return q
if success < 1 or success > 5:
result.fitness = None
return [result]
def _check_param_range(model, pars):
"""
Check parameter range and set the initial value inside
if it is out of range.
: model: park model object
"""
# loop through parameterset
for p in pars:
value = model.getParam(p)
low,high = model.details.setdefault(p,["",None,None])[1:3]
# if the range was defined, check the range
if low is not None and value <= low:
value = low + _get_zero_shift(low)
if high is not None and value > high:
value = high - _get_zero_shift(high)
# Check one more time if the new value goes below
# the low bound, If so, re-evaluate the value
# with the mean of the range.
if low is not None and value < low:
value = 0.5 * (low+high)
model.setParam(p, value)
def _get_zero_shift(limit):
"""
Get 10% shift of the param value = 0 based on the range value
: param range: min or max value of the bounds
"""
return 0.1 * (limit if limit != 0.0 else 1.0)
#def profile(fn, *args, **kw):
# import cProfile, pstats, os
# global call_result
# def call():
# global call_result
# call_result = fn(*args, **kw)
# cProfile.runctx('call()', dict(call=call), {}, 'profile.out')
# stats = pstats.Stats('profile.out')
# stats.sort_stats('time')
# stats.sort_stats('calls')
# stats.print_stats()
# os.unlink('profile.out')
# return call_result