"""
File handler to support different file extensions.
Uses reflectometry's registry utility.
The default readers are found in the 'readers' sub-module
and registered by default at initialization time.
To add a new default reader, one must register it in
the register_readers method found in readers/__init__.py.
A utility method (find_plugins) is available to inspect
a directory (for instance, a user plug-in directory) and
look for new readers/writers.
"""
#####################################################################
#This software was developed by the University of Tennessee as part of the
#Distributed Data Analysis of Neutron Scattering Experiments (DANSE)
#project funded by the US National Science Foundation.
#See the license text in license.txt
#copyright 2008, University of Tennessee
######################################################################
import os
import sys
import logging
import time
from zipfile import ZipFile
from sas.data_util.registry import ExtensionRegistry
# Default readers are defined in the readers sub-module
import readers
from readers import ascii_reader
from readers import cansas_reader
[docs]class Registry(ExtensionRegistry):
"""
Registry class for file format extensions.
Readers and writers are supported.
"""
def __init__(self):
super(Registry, self).__init__()
## Writers
self.writers = {}
## List of wildcards
self.wildcards = ['All (*.*)|*.*']
## Creation time, for testing
self._created = time.time()
# Register default readers
readers.read_associations(self)
#TODO: remove the following line when ready to switch to
#the new default readers
#readers.register_readers(self._identify_plugin)
# Look for plug-in readers
#self.find_plugins('plugins')
[docs] def load(self, path, format=None):
"""
Call the loader for the file type of path.
:param path: file path
:param format: explicit extension, to force the use
of a particular reader
Defaults to the ascii (multi-column) reader
if no reader was registered for the file's
extension.
"""
try:
return super(Registry, self).load(path, format=format)
except:
try:
# No reader was found. Default to the ascii reader.
ascii_loader = ascii_reader.Reader()
return ascii_loader.read(path)
except:
cansas_loader = cansas_reader.Reader()
return cansas_loader.read(path)
[docs] def find_plugins(self, dir):
"""
Find readers in a given directory. This method
can be used to inspect user plug-in directories to
find new readers/writers.
:param dir: directory to search into
:return: number of readers found
"""
readers_found = 0
temp_path = os.path.abspath(dir)
if not os.path.isdir(temp_path):
temp_path = os.path.join(os.getcwd(), dir)
if not os.path.isdir(temp_path):
temp_path = os.path.join(os.path.dirname(__file__), dir)
if not os.path.isdir(temp_path):
temp_path = os.path.join(os.path.dirname(sys.path[0]), dir)
dir = temp_path
# Check whether the directory exists
if not os.path.isdir(dir):
msg = "DataLoader couldn't locate DataLoader plugin folder."
msg += """ "%s" does not exist""" % dir
logging.warning(msg)
return readers_found
for item in os.listdir(dir):
full_path = os.path.join(dir, item)
if os.path.isfile(full_path):
# Process python files
if item.endswith('.py'):
toks = os.path.splitext(os.path.basename(item))
try:
sys.path.insert(0, os.path.abspath(dir))
module = __import__(toks[0], globals(), locals())
if self._identify_plugin(module):
readers_found += 1
except:
msg = "Loader: Error importing "
msg += "%s\n %s" % (item, sys.exc_value)
logging.error(msg)
# Process zip files
elif item.endswith('.zip'):
try:
# Find the modules in the zip file
zfile = ZipFile(item)
nlist = zfile.namelist()
sys.path.insert(0, item)
for mfile in nlist:
try:
# Change OS path to python path
fullname = mfile.replace('/', '.')
fullname = os.path.splitext(fullname)[0]
module = __import__(fullname, globals(),
locals(), [""])
if self._identify_plugin(module):
readers_found += 1
except:
msg = "Loader: Error importing"
msg += " %s\n %s" % (mfile, sys.exc_value)
logging.error(msg)
except:
msg = "Loader: Error importing "
msg += " %s\n %s" % (item, sys.exc_value)
logging.error(msg)
return readers_found
[docs] def associate_file_type(self, ext, module):
"""
Look into a module to find whether it contains a
Reader class. If so, APPEND it to readers and (potentially)
to the list of writers for the given extension
:param ext: file extension [string]
:param module: module object
"""
reader_found = False
if hasattr(module, "Reader"):
try:
# Find supported extensions
loader = module.Reader()
if ext not in self.loaders:
self.loaders[ext] = []
# Append the new reader to the list
self.loaders[ext].append(loader.read)
reader_found = True
# Keep track of wildcards
type_name = module.__name__
if hasattr(loader, 'type_name'):
type_name = loader.type_name
wcard = "%s files (*%s)|*%s" % (type_name, ext.lower(),
ext.lower())
if wcard not in self.wildcards:
self.wildcards.append(wcard)
# Check whether writing is supported
if hasattr(loader, 'write'):
if ext not in self.writers:
self.writers[ext] = []
# Append the new writer to the list
self.writers[ext].append(loader.write)
except:
msg = "Loader: Error accessing"
msg += " Reader in %s\n %s" % (module.__name__, sys.exc_value)
logging.error(msg)
return reader_found
[docs] def associate_file_reader(self, ext, loader):
"""
Append a reader object to readers
:param ext: file extension [string]
:param module: reader object
"""
reader_found = False
try:
# Find supported extensions
if ext not in self.loaders:
self.loaders[ext] = []
# Append the new reader to the list
self.loaders[ext].append(loader.read)
reader_found = True
# Keep track of wildcards
if hasattr(loader, 'type_name'):
type_name = loader.type_name
wcard = "%s files (*%s)|*%s" % (type_name, ext.lower(),
ext.lower())
if wcard not in self.wildcards:
self.wildcards.append(wcard)
except:
msg = "Loader: Error accessing Reader "
msg += "in %s\n %s" % (loader.__name__, sys.exc_value)
logging.error(msg)
return reader_found
def _identify_plugin(self, module):
"""
Look into a module to find whether it contains a
Reader class. If so, add it to readers and (potentially)
to the list of writers.
:param module: module object
"""
reader_found = False
if hasattr(module, "Reader"):
try:
# Find supported extensions
loader = module.Reader()
for ext in loader.ext:
if ext not in self.loaders:
self.loaders[ext] = []
# When finding a reader at run time,
# treat this reader as the new default
self.loaders[ext].insert(0, loader.read)
reader_found = True
# Keep track of wildcards
type_name = module.__name__
if hasattr(loader, 'type_name'):
type_name = loader.type_name
wcard = "%s files (*%s)|*%s" % (type_name, ext.lower(),
ext.lower())
if wcard not in self.wildcards:
self.wildcards.append(wcard)
# Check whether writing is supported
if hasattr(loader, 'write'):
for ext in loader.ext:
if ext not in self.writers:
self.writers[ext] = []
self.writers[ext].insert(0, loader.write)
except:
msg = "Loader: Error accessing Reader"
msg += " in %s\n %s" % (module.__name__, sys.exc_value)
logging.error(msg)
return reader_found
[docs] def lookup_writers(self, path):
"""
:return: the loader associated with the file type of path.
:Raises ValueError: if file type is not known.
"""
# Find matching extensions
extlist = [ext for ext in self.extensions() if path.endswith(ext)]
# Sort matching extensions by decreasing order of length
extlist.sort(lambda a, b: len(a) < len(b))
# Combine loaders for matching extensions into one big list
writers = []
for L in [self.writers[ext] for ext in extlist]:
writers.extend(L)
# Remove duplicates if they exist
if len(writers) != len(set(writers)):
result = []
for L in writers:
if L not in result:
result.append(L)
writers = L
# Raise an error if there are no matching extensions
if len(writers) == 0:
raise ValueError, "Unknown file type for " + path
# All done
return writers
[docs] def save(self, path, data, format=None):
"""
Call the writer for the file type of path.
Raises ValueError if no writer is available.
Raises KeyError if format is not available.
May raise a writer-defined exception if writer fails.
"""
if format is None:
writers = self.lookup_writers(path)
else:
writers = self.writers[format]
for fn in writers:
try:
return fn(path, data)
except:
pass # give other loaders a chance to succeed
# If we get here it is because all loaders failed
raise # reraises last exception
[docs]class Loader(object):
"""
Utility class to use the Registry as a singleton.
"""
## Registry instance
__registry = Registry()
[docs] def associate_file_type(self, ext, module):
"""
Look into a module to find whether it contains a
Reader class. If so, append it to readers and (potentially)
to the list of writers for the given extension
:param ext: file extension [string]
:param module: module object
"""
return self.__registry.associate_file_type(ext, module)
[docs] def associate_file_reader(self, ext, loader):
"""
Append a reader object to readers
:param ext: file extension [string]
:param module: reader object
"""
return self.__registry.associate_file_reader(ext, loader)
[docs] def load(self, file, format=None):
"""
Load a file
:param file: file name (path)
:param format: specified format to use (optional)
:return: DataInfo object
"""
return self.__registry.load(file, format)
[docs] def save(self, file, data, format):
"""
Save a DataInfo object to file
:param file: file name (path)
:param data: DataInfo object
:param format: format to write the data in
"""
return self.__registry.save(file, data, format)
def _get_registry_creation_time(self):
"""
Internal method used to test the uniqueness
of the registry object
"""
return self.__registry._created
[docs] def find_plugins(self, dir):
"""
Find plugins in a given directory
:param dir: directory to look into to find new readers/writers
"""
return self.__registry.find_plugins(dir)
[docs] def get_wildcards(self):
return self.__registry.wildcards