"""
BumpsFitting module runs the bumps optimizer.
"""
import os
from datetime import timedelta, datetime
import numpy
from bumps import fitters
from bumps.mapper import SerialMapper, MPMapper
from bumps import parameter
from bumps.fitproblem import FitProblem
from sas.fit.AbstractFitEngine import FitEngine
from sas.fit.AbstractFitEngine import FResult
from sas.fit.expression import compile_constraints
[docs]class Progress(object):
def __init__(self, history, max_step, pars, dof):
remaining_time = int(history.time[0]*(float(max_step)/history.step[0]-1))
# Depending on the time remaining, either display the expected
# time of completion, or the amount of time remaining. Use precision
# appropriate for the duration.
if remaining_time >= 1800:
completion_time = datetime.now() + timedelta(seconds=remaining_time)
if remaining_time >= 36000:
time = completion_time.strftime('%Y-%m-%d %H:%M')
else:
time = completion_time.strftime('%H:%M')
else:
if remaining_time >= 3600:
time = '%dh %dm'%(remaining_time//3600, (remaining_time%3600)//60)
elif remaining_time >= 60:
time = '%dm %ds'%(remaining_time//60, remaining_time%60)
else:
time = '%ds'%remaining_time
chisq = "%.3g"%(2*history.value[0]/dof)
step = "%d of %d"%(history.step[0], max_step)
header = "=== Steps: %s chisq: %s ETA: %s\n"%(step, chisq, time)
parameters = ["%15s: %-10.3g%s"%(k,v,("\n" if i%3==2 else " | "))
for i,(k,v) in enumerate(zip(pars,history.point[0]))]
self.msg = "".join([header]+parameters)
def __str__(self):
return self.msg
[docs]class BumpsMonitor(object):
def __init__(self, handler, max_step, pars, dof):
self.handler = handler
self.max_step = max_step
self.pars = pars
self.dof = dof
[docs] def config_history(self, history):
history.requires(time=1, value=2, point=1, step=1)
def __call__(self, history):
if self.handler is None: return
self.handler.set_result(Progress(history, self.max_step, self.pars, self.dof))
self.handler.progress(history.step[0], self.max_step)
if len(history.step)>1 and history.step[1] > history.step[0]:
self.handler.improvement()
self.handler.update_fit()
[docs]class ConvergenceMonitor(object):
"""
ConvergenceMonitor contains population summary statistics to show progress
of the fit. This is a list [ (best, 0%, 25%, 50%, 75%, 100%) ] or
just a list [ (best, ) ] if population size is 1.
"""
def __init__(self):
self.convergence = []
[docs] def config_history(self, history):
history.requires(value=1, population_values=1)
def __call__(self, history):
best = history.value[0]
try:
p = history.population_values[0]
n,p = len(p), numpy.sort(p)
QI,Qmid, = int(0.2*n),int(0.5*n)
self.convergence.append((best, p[0],p[QI],p[Qmid],p[-1-QI],p[-1]))
except:
self.convergence.append((best, best,best,best,best,best))
# Note: currently using bumps parameters for each parameter object so that
# a SasFitness can be used directly in bumps with the usual semantics.
# The disadvantage of this technique is that we need to copy every parameter
# back into the model each time the function is evaluated. We could instead
# define reference parameters for each sas parameter, but then we would not
# be able to express constraints using python expressions in the usual way
# from bumps, and would instead need to use string expressions.
[docs]class SasFitness(object):
"""
Wrap SAS model as a bumps fitness object
"""
def __init__(self, model, data, fitted=[], constraints={},
initial_values=None, **kw):
self.name = model.name
self.model = model.model
self.data = data
self._define_pars()
self._init_pars(kw)
if initial_values is not None:
self._reset_pars(fitted, initial_values)
self.constraints = dict(constraints)
self.set_fitted(fitted)
self.update()
def _reset_pars(self, names, values):
for k,v in zip(names, values):
self._pars[k].value = v
def _define_pars(self):
self._pars = {}
for k in self.model.getParamList():
name = ".".join((self.name,k))
value = self.model.getParam(k)
bounds = self.model.details.get(k,["",None,None])[1:3]
self._pars[k] = parameter.Parameter(value=value, bounds=bounds,
fixed=True, name=name)
#print parameter.summarize(self._pars.values())
def _init_pars(self, kw):
for k,v in kw.items():
# dispersion parameters initialized with _field instead of .field
if k.endswith('_width'): k = k[:-6]+'.width'
elif k.endswith('_npts'): k = k[:-5]+'.npts'
elif k.endswith('_nsigmas'): k = k[:-7]+'.nsigmas'
elif k.endswith('_type'): k = k[:-5]+'.type'
if k not in self._pars:
formatted_pars = ", ".join(sorted(self._pars.keys()))
raise KeyError("invalid parameter %r for %s--use one of: %s"
%(k, self.model, formatted_pars))
if '.' in k and not k.endswith('.width'):
self.model.setParam(k, v)
elif isinstance(v, parameter.BaseParameter):
self._pars[k] = v
elif isinstance(v, (tuple,list)):
low, high = v
self._pars[k].value = (low+high)/2
self._pars[k].range(low,high)
else:
self._pars[k].value = v
[docs] def set_fitted(self, param_list):
"""
Flag a set of parameters as fitted parameters.
"""
for k,p in self._pars.items():
p.fixed = (k not in param_list or k in self.constraints)
self.fitted_par_names = [k for k in param_list if k not in self.constraints]
self.computed_par_names = [k for k in param_list if k in self.constraints]
self.fitted_pars = [self._pars[k] for k in self.fitted_par_names]
self.computed_pars = [self._pars[k] for k in self.computed_par_names]
# ===== Fitness interface ====
[docs] def parameters(self):
return self._pars
[docs] def update(self):
for k,v in self._pars.items():
#print "updating",k,v,v.value
self.model.setParam(k,v.value)
self._dirty = True
def _recalculate(self):
if self._dirty:
self._residuals, self._theory = self.data.residuals(self.model.evalDistribution)
self._dirty = False
[docs] def numpoints(self):
return numpy.sum(self.data.idx) # number of fitted points
[docs] def nllf(self):
return 0.5*numpy.sum(self.residuals()**2)
[docs] def theory(self):
self._recalculate()
return self._theory
[docs] def residuals(self):
self._recalculate()
return self._residuals
# Not implementing the data methods for now:
#
# resynth_data/restore_data/save/plot
[docs]class ParameterExpressions(object):
def __init__(self, models):
self.models = models
self._setup()
def _setup(self):
exprs = {}
for M in self.models:
exprs.update((".".join((M.name, k)), v) for k, v in M.constraints.items())
if exprs:
symtab = dict((".".join((M.name, k)), p)
for M in self.models
for k,p in M.parameters().items())
self.update = compile_constraints(symtab, exprs)
else:
self.update = lambda: 0
def __call__(self):
self.update()
def __getstate__(self):
return self.models
def __setstate__(self, state):
self.models = state
self._setup()
[docs]class BumpsFit(FitEngine):
"""
Fit a model using bumps.
"""
def __init__(self):
"""
Creates a dictionary (self.fit_arrange_dict={})of FitArrange elements
with Uid as keys
"""
FitEngine.__init__(self)
self.curr_thread = None
[docs] def fit(self, msg_q=None,
q=None, handler=None, curr_thread=None,
ftol=1.49012e-8, reset_flag=False):
# Build collection of bumps fitness calculators
models = [SasFitness(model=M.get_model(),
data=M.get_data(),
constraints=M.constraints,
fitted=M.pars,
initial_values=M.vals if reset_flag else None)
for M in self.fit_arrange_dict.values()
if M.get_to_fit()]
if len(models) == 0:
raise RuntimeError("Nothing to fit")
problem = FitProblem(models)
# TODO: need better handling of parameter expressions and bounds constraints
# so that they are applied during polydispersity calculations. This
# will remove the immediate need for the setp_hook in bumps, though
# bumps may still need something similar, such as a sane class structure
# which allows a subclass to override setp.
problem.setp_hook = ParameterExpressions(models)
# Run the fit
result = run_bumps(problem, handler, curr_thread)
if handler is not None:
handler.update_fit(last=True)
# TODO: shouldn't reference internal parameters of fit problem
varying = problem._parameters
# collect the results
all_results = []
for M in problem.models:
fitness = M.fitness
fitted_index = [varying.index(p) for p in fitness.fitted_pars]
R = FResult(model=fitness.model, data=fitness.data,
param_list=fitness.fitted_par_names+fitness.computed_par_names)
R.theory = fitness.theory()
R.residuals = fitness.residuals()
R.index = fitness.data.idx
R.fitter_id = self.fitter_id
# TODO: should scale stderr by sqrt(chisq/DOF) if dy is unknown
R.stderr = numpy.hstack((result['stderr'][fitted_index],
numpy.NaN*numpy.ones(len(fitness.computed_pars))))
R.pvec = numpy.hstack((result['value'][fitted_index],
[p.value for p in fitness.computed_pars]))
R.success = result['success']
R.fitness = numpy.sum(R.residuals**2)/(fitness.numpoints() - len(fitted_index))
R.convergence = result['convergence']
if result['uncertainty'] is not None:
R.uncertainty_state = result['uncertainty']
all_results.append(R)
if q is not None:
q.put(all_results)
return q
else:
return all_results
[docs]def run_bumps(problem, handler, curr_thread):
def abort_test():
if curr_thread is None: return False
try: curr_thread.isquit()
except KeyboardInterrupt:
if handler is not None:
handler.stop("Fitting: Terminated!!!")
return True
return False
fitopts = fitters.FIT_OPTIONS[fitters.FIT_DEFAULT]
fitclass = fitopts.fitclass
options = fitopts.options.copy()
max_step = fitopts.options.get('steps', 0) + fitopts.options.get('burn', 0)
pars = [p.name for p in problem._parameters]
options['monitors'] = [
BumpsMonitor(handler, max_step, pars, problem.dof),
ConvergenceMonitor(),
]
fitdriver = fitters.FitDriver(fitclass, problem=problem,
abort_test=abort_test, **options)
omp_threads = int(os.environ.get('OMP_NUM_THREADS','0'))
mapper = MPMapper if omp_threads == 1 else SerialMapper
fitdriver.mapper = mapper.start_mapper(problem, None)
#import time; T0 = time.time()
try:
best, fbest = fitdriver.fit()
except:
import traceback; traceback.print_exc()
raise
finally:
mapper.stop_mapper(fitdriver.mapper)
convergence_list = options['monitors'][-1].convergence
convergence = (2*numpy.asarray(convergence_list)/problem.dof
if convergence_list else numpy.empty((0,1),'d'))
return {
'value': best,
'stderr': fitdriver.stderr(),
'success': True, # better success reporting in bumps
'convergence': convergence,
'uncertainty': getattr(fitdriver.fitter, 'state', None),
}