"""
BumpsFitting module runs the bumps optimizer.
"""
import logging
import os
from datetime import timedelta, datetime
import traceback
import uncertainties
import numpy as np
import bumps
from bumps import fitters
try:
from bumps.options import FIT_CONFIG
# Default bumps to use the Levenberg-Marquardt optimizer
FIT_CONFIG.selected_id = fitters.MPFit.id
def get_fitter():
return FIT_CONFIG.selected_fitter, FIT_CONFIG.selected_values
except ImportError:
# CRUFT: Bumps changed its handling of fit options around 0.7.5.6
# Default bumps to use the Levenberg-Marquardt optimizer
fitters.FIT_DEFAULT = 'lm'
[docs] def get_fitter():
fitopts = fitters.FIT_OPTIONS[fitters.FIT_DEFAULT]
return fitopts.fitclass, fitopts.options.copy()
from bumps.mapper import SerialMapper, MPMapper
from bumps import parameter
from bumps.fitproblem import FitProblem
from sas.sascalc.fit.AbstractFitEngine import FitEngine
from sas.sascalc.fit.AbstractFitEngine import FResult
from sas.sascalc.fit.expression import compile_constraints
[docs]class Progress(object):
[docs] def __init__(self, history, max_step, pars, dof):
remaining_time = int(history.time[0]*(float(max_step)/history.step[0]-1))
# Depending on the time remaining, either display the expected
# time of completion, or the amount of time remaining. Use precision
# appropriate for the duration.
if remaining_time >= 1800:
completion_time = datetime.now() + timedelta(seconds=remaining_time)
if remaining_time >= 36000:
time = completion_time.strftime('%Y-%m-%d %H:%M')
else:
time = completion_time.strftime('%H:%M')
else:
if remaining_time >= 3600:
time = '%dh %dm'%(remaining_time//3600, (remaining_time%3600)//60)
elif remaining_time >= 60:
time = '%dm %ds'%(remaining_time//60, remaining_time%60)
else:
time = '%ds'%remaining_time
chisq = "%.3g"%(2*history.value[0]/dof)
step = "%d of %d"%(history.step[0], max_step)
header = "=== Steps: %s chisq: %s ETA: %s\n"%(step, chisq, time)
parameters = ["%15s: %-10.3g%s"%(k,v,("\n" if i%3==2 else " | "))
for i, (k, v) in enumerate(zip(pars, history.point[0]))]
self.msg = "".join([header]+parameters)
[docs] def __str__(self):
return self.msg
[docs]class BumpsMonitor(object):
[docs] def __init__(self, handler, max_step, pars, dof):
self.handler = handler
self.max_step = max_step
self.pars = pars
self.dof = dof
[docs] def config_history(self, history):
history.requires(time=1, value=2, point=1, step=1)
[docs] def __call__(self, history):
if self.handler is None: return
self.handler.set_result(Progress(history, self.max_step, self.pars, self.dof))
self.handler.progress(history.step[0], self.max_step)
if len(history.step) > 1 and history.step[1] > history.step[0]:
self.handler.improvement()
self.handler.update_fit()
[docs]class ConvergenceMonitor(object):
"""
ConvergenceMonitor contains population summary statistics to show progress
of the fit. This is a list [ (best, 0%, 25%, 50%, 75%, 100%) ] or
just a list [ (best, ) ] if population size is 1.
"""
[docs] def __init__(self):
self.convergence = []
[docs] def config_history(self, history):
history.requires(value=1, population_values=1)
[docs] def __call__(self, history):
best = history.value[0]
try:
p = history.population_values[0]
n, p = len(p), np.sort(p)
QI, Qmid = int(0.2*n), int(0.5*n)
self.convergence.append((best, p[0], p[QI], p[Qmid], p[-1-QI], p[-1]))
except Exception:
self.convergence.append((best, best, best, best, best, best))
# Note: currently using bumps parameters for each parameter object so that
# a SasFitness can be used directly in bumps with the usual semantics.
# The disadvantage of this technique is that we need to copy every parameter
# back into the model each time the function is evaluated. We could instead
# define reference parameters for each sas parameter, but then we would not
# be able to express constraints using python expressions in the usual way
# from bumps, and would instead need to use string expressions.
[docs]class SasFitness(object):
"""
Wrap SAS model as a bumps fitness object
"""
[docs] def __init__(self, model, data, fitted=[], constraints={},
initial_values=None, **kw):
self.name = model.name
self.model = model.model
self.data = data
if self.data.smearer is not None:
self.data.smearer.model = self.model
self._define_pars()
self._init_pars(kw)
if initial_values is not None:
self._reset_pars(fitted, initial_values)
self.constraints = dict(constraints)
self.set_fitted(fitted)
self.update()
[docs] def _reset_pars(self, names, values):
for k, v in zip(names, values):
self._pars[k].value = v
[docs] def _define_pars(self):
self._pars = {}
for k in self.model.getParamList():
name = ".".join((self.name, k))
value = self.model.getParam(k)
bounds = self.model.details.get(k, ["", None, None])[1:3]
self._pars[k] = parameter.Parameter(value=value, bounds=bounds,
fixed=True, name=name)
#print parameter.summarize(self._pars.values())
[docs] def _init_pars(self, kw):
for k, v in kw.items():
# dispersion parameters initialized with _field instead of .field
if k.endswith('_width'):
k = k[:-6]+'.width'
elif k.endswith('_npts'):
k = k[:-5]+'.npts'
elif k.endswith('_nsigmas'):
k = k[:-7]+'.nsigmas'
elif k.endswith('_type'):
k = k[:-5]+'.type'
if k not in self._pars:
formatted_pars = ", ".join(sorted(self._pars.keys()))
raise KeyError("invalid parameter %r for %s--use one of: %s"
%(k, self.model, formatted_pars))
if '.' in k and not k.endswith('.width'):
self.model.setParam(k, v)
elif isinstance(v, parameter.BaseParameter):
self._pars[k] = v
elif isinstance(v, (tuple, list)):
low, high = v
self._pars[k].value = (low+high)/2
self._pars[k].range(low, high)
else:
self._pars[k].value = v
[docs] def set_fitted(self, param_list):
"""
Flag a set of parameters as fitted parameters.
"""
for k, p in self._pars.items():
p.fixed = (k not in param_list or k in self.constraints)
self.fitted_par_names = [k for k in param_list if k not in self.constraints]
self.computed_par_names = [k for k in param_list if k in self.constraints]
self.fitted_pars = [self._pars[k] for k in self.fitted_par_names]
self.computed_pars = [self._pars[k] for k in self.computed_par_names]
# ===== Fitness interface ====
[docs] def parameters(self):
return self._pars
[docs] def update(self):
for k, v in self._pars.items():
#print "updating",k,v,v.value
self.model.setParam(k, v.value)
self._dirty = True
[docs] def _recalculate(self):
if self._dirty:
self._residuals, self._theory \
= self.data.residuals(self.model.evalDistribution)
self._dirty = False
[docs] def numpoints(self):
return np.sum(self.data.idx) # number of fitted points
[docs] def nllf(self):
return 0.5*np.sum(self.residuals()**2)
[docs] def theory(self):
self._recalculate()
return self._theory
[docs] def residuals(self):
self._recalculate()
return self._residuals
# Not implementing the data methods for now:
#
# resynth_data/restore_data/save/plot
[docs]class ParameterExpressions(object):
[docs] def __init__(self, models):
self.models = models
self._setup()
[docs] def _setup(self):
exprs = {}
for M in self.models:
exprs.update((".".join((M.name, k)), v) for k, v in M.constraints.items())
if exprs:
symtab = dict((".".join((M.name, k)), p)
for M in self.models
for k, p in M.parameters().items())
self.update = compile_constraints(symtab, exprs)
else:
self.update = lambda: 0
[docs] def __call__(self):
self.update()
[docs] def __getstate__(self):
return self.models
[docs] def __setstate__(self, state):
self.models = state
self._setup()
[docs]class BumpsFit(FitEngine):
"""
Fit a model using bumps.
"""
[docs] def __init__(self):
"""
Creates a dictionary (self.fit_arrange_dict={})of FitArrange elements
with Uid as keys
"""
FitEngine.__init__(self)
self.curr_thread = None
[docs] def fit(self, msg_q=None,
q=None, handler=None, curr_thread=None,
ftol=1.49012e-8, reset_flag=False):
# Build collection of bumps fitness calculators
models = [SasFitness(model=M.get_model(),
data=M.get_data(),
constraints=M.constraints,
fitted=M.pars,
initial_values=M.vals if reset_flag else None)
for M in self.fit_arrange_dict.values()
if M.get_to_fit()]
if len(models) == 0:
raise RuntimeError("Nothing to fit")
problem = FitProblem(models)
# TODO: need better handling of parameter expressions and bounds constraints
# so that they are applied during polydispersity calculations. This
# will remove the immediate need for the setp_hook in bumps, though
# bumps may still need something similar, such as a sane class structure
# which allows a subclass to override setp.
problem.setp_hook = ParameterExpressions(models)
# Run the fit
result = run_bumps(problem, handler, curr_thread)
if handler is not None:
handler.update_fit(last=True)
# TODO: shouldn't reference internal parameters of fit problem
varying = problem._parameters
values, errs, cov = result['value'], result['stderr'], result[
'covariance']
assert values is not None and errs is not None
# Propagate uncertainty through the parameter expressions
# We are going to abuse bumps a little here and stuff uncertainty
# objects into the parameter values, then update all the
# derived parameters with uncertainty propagation. We need to
# avoid triggering a model recalc since the uncertainty objects
# will not be working with sasmodels
if len(varying) < 2:
# Use the standard error as the error in the parameter
for param, val, err in zip(varying, values, errs):
# Convert all varying parameters to uncertainties objects
param.value = uncertainties.ufloat(val, err)
else:
try:
uncertainties.correlated_values(values, cov)
except:
# No convergance
for param, val, err in zip(varying, values, errs):
# Convert all varying parameters to uncertainties objects
param.value = uncertainties.ufloat(val, err)
else:
# Use the covariance matrix to calculate error in the parameter
fitted = uncertainties.correlated_values(values, cov)
for param, val in zip(varying, fitted):
param.value = val
# Propagate correlated uncertainty through constraints.
problem.setp_hook()
# collect the results
all_results = []
for fitting_module in problem.models:
fitness = fitting_module.fitness
pars = fitness.fitted_pars + fitness.computed_pars
par_names = fitness.fitted_par_names + fitness.computed_par_names
fitting_result = FResult(model=fitness.model, data=fitness.data, param_list=par_names)
fitting_result.theory = fitness.theory()
fitting_result.residuals = fitness.residuals()
fitting_result.index = fitness.data.idx
fitting_result.fitter_id = self.fitter_id
# TODO: should scale stderr by sqrt(chisq/DOF) if dy is unknown
fitting_result.success = result['success']
fitting_result.convergence = result['convergence']
if result['uncertainty'] is not None:
fitting_result.uncertainty_state = result['uncertainty']
if fitting_result.success:
pvec = list()
stderr = list()
for p in pars:
# If p is already defined as an uncertainties object it is not constrained based on another
# parameter
if isinstance(p.value, uncertainties.core.Variable) or \
isinstance(p.value, uncertainties.core.AffineScalarFunc):
# value.n returns value p
pvec.append(p.value.n)
# value.n returns error in p
stderr.append(p.value.s)
# p constrained based on another parameter
else:
# Details of p
param_model, param_name = p.name.split(".")[0], p.name.split(".")[1]
# Constraints applied on p, list comprehension most efficient method, will always return a
# list with 1 entry
constraints = [model.constraints for model in models if model.name == param_model][0]
# Parameters p is constrained on.
reference_params = [v for v in varying if str(v.name) in str(constraints[param_name])]
err_exp = str(constraints[param_name])
# Convert string entries into variable names within the code.
for i, index in enumerate(reference_params):
err_exp = err_exp.replace(reference_params[index].name, f"reference_params[{index}].value")
try:
# Evaluate a string containing constraints as if it where a line of code
pvec.append(eval(err_exp).n)
stderr.append(eval(err_exp).s)
except NameError as e:
pvec.append(p.value)
stderr.append(0)
# Get model causing error
name_error = e.args[0].split()[1].strip("'")
# Safety net if following code does not work
error_param = name_error
# Get parameter causing error
constraints_sections = constraints[param_name].split(".")
for i in range(len(constraints_sections)):
if name_error in constraints_sections[i]:
error_param = f"{name_error}.{constraints_sections[i+1]}"
logging.error(f"Constraints ordered incorrectly. Attempting to constrain {p}, based on "
f"{error_param}, however {error_param} is not defined itself. This is "
f"because {error_param} is also constrained.\n"
f"The fitting will continue, but {name_error} will be incorrect.")
logging.error(e)
except Exception as e:
logging.error(e)
pvec.append(p.value)
stderr.append(0)
fitting_result.pvec = (np.array(pvec))
fitting_result.stderr = (np.array(stderr))
DOF = max(1, fitness.numpoints() - len(fitness.fitted_pars))
fitting_result.fitness = np.sum(fitting_result.residuals ** 2) / DOF
else:
fitting_result.pvec = np.asarray([p.value for p in pars])
fitting_result.stderr = np.NaN * np.ones(len(pars))
fitting_result.fitness = np.NaN
all_results.append(fitting_result)
all_results[0].mesg = result['errors']
if q is not None:
q.put(all_results)
return q
else:
return all_results
[docs]def run_bumps(problem, handler, curr_thread):
def abort_test():
if curr_thread is None: return False
try: curr_thread.isquit()
except KeyboardInterrupt:
if handler is not None:
handler.stop("Fitting: Terminated!!!")
return True
return False
errors = []
fitclass, options = get_fitter()
steps = options.get('steps', 0)
if steps == 0:
pop = options.get('pop', 0)*len(problem._parameters)
samples = options.get('samples', 0)
steps = (samples+pop-1)/pop if pop != 0 else samples
max_step = steps + options.get('burn', 0)
pars = [p.name for p in problem._parameters]
#x0 = np.asarray([p.value for p in problem._parameters])
options['monitors'] = [
BumpsMonitor(handler, max_step, pars, problem.dof),
ConvergenceMonitor(),
]
fitdriver = fitters.FitDriver(fitclass, problem=problem,
abort_test=abort_test, **options)
clipped = fitdriver.clip()
if clipped:
errors.append(f"The initial value for {clipped} was outside the fitting range and was coerced.")
omp_threads = int(os.environ.get('OMP_NUM_THREADS', '0'))
mapper = MPMapper if omp_threads == 1 else SerialMapper
fitdriver.mapper = mapper.start_mapper(problem, None)
#import time; T0 = time.time()
try:
best, fbest = fitdriver.fit()
except Exception as exc:
best, fbest = None, np.NaN
errors.extend([str(exc), traceback.format_exc()])
finally:
mapper.stop_mapper(fitdriver.mapper)
convergence_list = options['monitors'][-1].convergence
convergence = (2*np.asarray(convergence_list)/problem.dof
if convergence_list else np.empty((0, 1), 'd'))
success = best is not None
try:
stderr = fitdriver.stderr() if success else None
cov = (fitdriver.cov() if not hasattr(fitdriver.fitter, 'state') else
np.cov(fitdriver.fitter.state.draw().points.T))
except Exception as exc:
errors.append(str(exc))
errors.append(traceback.format_exc())
stderr = None
cov = None
return {
'value': best if success else None,
'stderr': stderr,
'success': success,
'convergence': convergence,
'uncertainty': getattr(fitdriver.fitter, 'state', None),
'errors': '\n'.join(errors),
'covariance': cov,
}