"""
Custom Models
-------------
This is a place holder for the custom models namespace. When models are
loaded from a file by :func:`sasmodels.generate.load_kernel_module` they are
loaded as if they exist in *sasmodels.custom*. This package needs to exist
for this to occur without error.
"""
from __future__ import division, print_function
import sys
import os
from os.path import basename, splitext, join as joinpath, exists, dirname
# pylint: disable=unused-import
try:
from typing import Dict, List, Tuple
from types import ModuleType
except ImportError:
pass
# pylint: enable=unused-import
try:
# Python 3.5 and up
from importlib.util import spec_from_file_location, module_from_spec # type: ignore
def load_module_from_path(fullname, path):
# type: (str, str) -> ModuleType
"""load module from *path* as *fullname*"""
spec = spec_from_file_location(fullname, os.path.expanduser(path))
module = module_from_spec(spec)
spec.loader.exec_module(module)
return module
except ImportError:
# CRUFT: python 2
import imp
[docs] def load_module_from_path(fullname, path):
# type: (str, str) -> ModuleType
"""load module from *path* as *fullname*"""
# Clear out old definitions, if any
if fullname in sys.modules:
del sys.modules[fullname]
if path.endswith(".py") and os.path.exists(path) and os.path.exists(path+"c"):
# remove automatic pyc file before loading a py file
os.unlink(path+"c")
module = imp.load_source(fullname, os.path.expanduser(path))
return module
_MODULE_CACHE = {} # type: Dict[str, Tuple[ModuleType, int]]
_MODULE_DEPENDS = {} # type: Dict[str, List[str]]
_MODULE_DEPENDS_STACK = [] # type: List[str]
[docs]def load_custom_kernel_module(path):
# type: (str) -> ModuleType
"""load SAS kernel from *path* as *sasmodels.custom.modelname*"""
# Pull off the last .ext if it exists; there may be others
name = basename(splitext(path)[0])
path = os.path.expanduser(path)
# Reload module if necessary.
if need_reload(path):
# Assume the module file is the only dependency
_MODULE_DEPENDS[path] = set([path])
# Load the module while pushing it onto the dependency stack. If
# this triggers any submodules, then they will add their dependencies
# to this module as the "working_on" parent. Pop the stack when the
# module is loaded.
_MODULE_DEPENDS_STACK.append(path)
module = load_module_from_path('sasmodels.custom.'+name, path)
_MODULE_DEPENDS_STACK.pop()
# Include external C code in the dependencies. We are looking
# for module.source and assuming that it is a list of C source files
# relative to the module itself. Any files that do not exist,
# such as those in the standard libraries, will be ignored.
# TODO: look in builtin module path for standard c sources
# TODO: share code with generate.model_sources
c_sources = getattr(module, 'source', None)
if isinstance(c_sources, (list, tuple)):
_MODULE_DEPENDS[path].update(_find_sources(path, c_sources))
# Cache the module, and tag it with the newest timestamp
timestamp = max(os.path.getmtime(f) for f in _MODULE_DEPENDS[path])
_MODULE_CACHE[path] = module, timestamp
#print("loading", os.path.basename(path), _MODULE_CACHE[path][1],
# [os.path.basename(p) for p in _MODULE_DEPENDS[path]])
# Add path and all its dependence to the parent module, if there is one.
if _MODULE_DEPENDS_STACK:
working_on = _MODULE_DEPENDS_STACK[-1]
_MODULE_DEPENDS[working_on].update(_MODULE_DEPENDS[path])
return _MODULE_CACHE[path][0]
[docs]def need_reload(path):
# type: (str) -> bool
"""
Return True if any path dependencies have a timestamp newer than the time
when the path was most recently loaded.
"""
# TODO: fails if a dependency has a modification time in the future
# If the newest dependency has a time stamp in the future, then this
# will be recorded as the cached time. When a second dependency
# is updated to the current time stamp, it will still be considered
# older than the current build and the reload will not be triggered.
# Could instead treat all future times as 0 here and in the code above
# which records the newest timestamp. This will force a reload when
# the future time is reached, but other than that should perform
# correctly. Probably not worth the extra code...
_, cache_time = _MODULE_CACHE.get(path, (None, -1))
depends = _MODULE_DEPENDS.get(path, [path])
#print("reload", any(cache_time < os.path.getmtime(p) for p in depends))
#for f in depends: print(">>> ", f, os.path.getmtime(f))
return any(cache_time < os.path.getmtime(p) for p in depends)
def _find_sources(path, source_list):
# type: (str, List[str]) -> List[str]
"""
Return a list of the sources relative to base file; ignore any that
are not found.
"""
root = dirname(path)
found = []
for source_name in source_list:
source_path = joinpath(root, source_name)
if exists(source_path):
found.append(source_path)
return found