"""
configure -- configuration machinery on top of YAML
===================================================
"""
import sys
from os import path, mkdir
from inspect import getargspec
from types import FunctionType
from re import compile as re_compile
from collections import MutableMapping, Mapping
from datetime import timedelta
try:
from yaml import Loader as Loader
except ImportError:
from yaml import Loader
__all__ = (
"Configuration", "ConfigurationError", "configure_logging",
"format_config", "print_config", "import_string", "ImportStringError")
[docs]class ConfigurationError(ValueError):
""" Configuration error"""
[docs]class Configuration(MutableMapping):
""" Configuration object
You should never instantiate this object but use ``from_file``,
``from_string`` or ``from_dict`` classmethods instead. Implements
:class:`collections.MutableMapping` protocol.
"""
_constructors = {}
_multi_constructors = {}
def __init__(self, struct=None, pwd=None, parent=None):
self._pwd = pwd or "."
self._parent = parent
self.__struct = struct
[docs] def merge(self, config):
""" Produce new configuration by merging ``config`` object into this
one"""
new = self.__class__({}, parent=self._parent, pwd=self._pwd)
new._merge(self)
new._merge(config)
return new
@property
def _root(self):
c = self
while c._parent is not None:
c = c._parent
return c
# Iterable
def __iter__(self):
if self.__struct is None:
raise ConfigurationError("unconfigured")
return iter(self.__struct)
# Container
def __contains__(self, name):
if self.__struct is None:
raise ConfigurationError("unconfigured")
return name in self.__struct
# Sized
def __len__(self):
if self.__struct is None:
raise ConfigurationError("unconfigured")
return len(self.__struct)
# Mapping
def __getitem__(self, name):
if self.__struct is None:
raise ConfigurationError("unconfigured")
data = self.__struct[name]
if isinstance(data, dict):
return self.__class__(data, parent=self, pwd=self._pwd)
return data
# MutableMapping
def __setitem__(self, name, value):
if self.__struct is None:
raise ConfigurationError("unconfigured")
self.__struct[name] = value
if isinstance(value, Configuration):
value._parent = self
# MutableMapping
def __delitem__(self, name):
if self.__struct is None:
raise ConfigurationError("unconfigured")
self.__struct.__delitem__(name)
def __getattr__(self, name):
return self[name]
def _merge(self, config):
for k, v in config.items():
if isinstance(v, Mapping) and k in self:
if not isinstance(self[k], Mapping):
raise ConfigurationError(
"unresolveable conflict during merge")
self[k]._merge(v)
else:
self[k] = v
def by_ref(self, path, value=None):
if path[:2] == "..":
path = path[1:]
return self._parent.by_ref(path, value)
if path[:1] != ".":
return self._root.by_ref("." + path, value)
path = path[1:]
if "." in path:
n, path = path.split(".", 1)
n = self[n]
if isinstance(n, Configuration):
return n.by_ref("." + path, value)
else:
return obj_by_ref(n, path)
else:
if value is None:
return self[path]
else:
self[path] = value
return value
def __add__(self, config):
return self.merge(config)
def __repr__(self):
return repr(self.__struct)
__str__ = __repr__
@classmethod
[docs] def from_file(cls, filename, ctx=None, pwd=None, constructors=None,
multi_constructors=None, configure=True):
""" Construct :class:`.Configuration` object by reading and parsing file
``filename``.
:param filename:
filename to parse config from
:param ctx:
mapping object used for value interpolation
:param constructors:
mapping of names to constructor for custom objects in YAML. Look at
`_timedelta_constructor` and `_re_constructor` for examples.
"""
filename = path.abspath(filename)
if pwd is None:
pwd = path.dirname(filename)
with open(filename, "r") as f:
return cls.from_string(f.read(), ctx=ctx, pwd=pwd,
constructors=constructors,
multi_constructors=multi_constructors,
configure=configure)
@classmethod
[docs] def from_string(cls, string, ctx=None, pwd=None, constructors=None,
multi_constructors=None, configure=True):
""" Construct :class:`.Configuration` from ``string``.
:param string:
string to parse config from
:param ctx:
mapping object used for value interpolation
:param constructors:
mapping of names to constructor for custom objects in YAML. Look at
`_timedelta_constructor` and `_re_constructor` for examples.
"""
ctx = ctx or {}
ctx['pwd'] = pwd
string = string % ctx
cfg = cls.load(string, constructors=constructors,
multi_constructors=multi_constructors)
return cls.from_dict(cfg, pwd=pwd, configure=configure)
@classmethod
[docs] def from_dict(cls, cfg, pwd=None, configure=True):
""" Construct :class:`.Configuration` from dict ``d``.
:param d:
mapping object to use for config
"""
c = cls(cfg, pwd=pwd)
if configure:
c.configure()
return c
@classmethod
def load(cls, stream, constructors=None, multi_constructors=None):
loader = Loader(stream)
cs = dict(cls._constructors)
if constructors:
cs.update(constructors)
mcs = dict(cls._multi_constructors)
if multi_constructors:
mcs.update(multi_constructors)
if cs:
for name, constructor in cs.items():
loader.add_constructor(name, constructor)
if mcs:
for name, constructor in mcs.items():
loader.add_multi_constructor(name, constructor)
try:
return loader.get_single_data()
finally:
loader.dispose()
@classmethod
def add_constructor(cls, name):
if not '_constructors' in cls.__dict__:
cls.__dict__['_constructors'] = dict(cls._constructors)
cname = '!%s' % name
def registration(func):
if cname in cls._constructors:
raise ValueError("constructor '%s' already exist")
cls._constructors[cname] = func
return func
return registration
@classmethod
def add_multi_constructor(cls, name):
if not '_multi_constructors' in cls.__dict__:
cls.__dict__['_multi_constructors'] = dict(cls._multi_constructors)
cname = '!%s:' % name
def registration(func):
if cname in cls._multi_constructors:
raise ValueError("multiconstructor '%s' already exist")
cls._multi_constructors[cname] = func
return func
return registration
@Configuration.add_constructor('timedelta')
def _timedelta_contructor(loader, node):
item = loader.construct_scalar(node)
if not isinstance(item, basestring) or not item:
raise ConfigurationError(
"value '%s' cannot be interpreted as date range" % item)
num, typ = item[:-1], item[-1].lower()
if not num.isdigit():
raise ConfigurationError(
"value '%s' cannot be interpreted as date range" % item)
num = int(num)
if typ == "d":
return timedelta(days=num)
elif typ == "h":
return timedelta(seconds=num * 3600)
elif typ == "w":
return timedelta(days=num * 7)
elif typ == "m":
return timedelta(seconds=num * 60)
elif typ == "s":
return timedelta(seconds=num)
else:
raise ConfigurationError(
"value '%s' cannot be interpreted as date range" % item)
@Configuration.add_constructor('bytesize')
def _bytesize_constructor(loader, node):
item = loader.construct_scalar(node)
if not isinstance(item, basestring) or not item:
raise ConfigurationError(
"value '%s' cannot be interpreted as byte size" % item)
if item.isdigit():
return int(item) # bytes
num, typ = item[:-1], item[-1].lower()
if item[-2:].lower() in ('kb', 'mb', 'gb', 'tb', 'pb'):
num, typ = item[:-2], item[-2:-1].lower()
elif item[-1:].lower() in ('k', 'm', 'b', 't', 'p', 'b'):
num, typ = item[:-1], item[-1].lower()
else:
raise ConfigurationError(
"value '%s' cannot be interpreted as byte size" % item)
if not num.isdigit():
raise ConfigurationError(
"value '%s' cannot be interpreted as byte size" % item)
num = int(num)
if typ == 'b':
return num
elif typ == 'k':
return num * 1024
elif typ == 'm':
return num * 1024 * 1024
elif typ == 'g':
return num * 1024 * 1024 * 1024
elif typ == 't':
return num * 1024 * 1024 * 1024 * 1024
elif typ == 'p':
return num * 1024 * 1024 * 1024 * 1024 * 1024
else:
raise ConfigurationError(
"value '%s' cannot be interpreted as byte size" % item)
@Configuration.add_constructor('re')
def _re_constructor(loader, node):
item = loader.construct_scalar(node)
if not isinstance(item, basestring) or not item:
raise ConfigurationError(
"value '%s' cannot be interpreted as regular expression" % item)
return re_compile(item)
@Configuration.add_constructor('directory')
def _directory_constructor(loader, node):
item = loader.construct_scalar(node)
if not path.exists(item):
mkdir(item)
elif not path.isdir(item):
raise ConfigurationError("'%s' is not a directory" % item)
return item
class Directive(object):
def __call__(self, ctx):
raise NotImplementedError()
class Ref(Directive):
def __init__(self, ref):
self.ref = ref
def __call__(self, ctx):
o = ctx.by_ref(self.ref)
if isinstance(o, Factory):
return ctx.by_ref(self.ref, o(ctx))
return o
def __str__(self):
return '%s(%s)' % (self.__class__.__name__, self.ref)
__repr__ = __str__
@Configuration.add_multi_constructor('ref')
def _ref_constructor(loader, tag, node):
return Ref(tag)
class Factory(Directive):
def __init__(self, factory, config):
self.factory = factory
self.config = config
def __call__(self, ctx):
config = dict(self.config)
factory = self.factory
if isinstance(factory, basestring):
try:
factory = import_string(factory)
except ImportStringError as e:
raise ConfigurationError("cannot import factory: %s" % e)
if isinstance(factory, FunctionType):
argspec = getargspec(factory)
elif isinstance(factory, type):
argspec = getargspec(factory.__init__)
argspec = argspec._replace(args=argspec.args[1:])
args = []
kwargs = {}
pos_cut = len(argspec.args) - len(argspec.defaults or [])
for a in argspec.args[:pos_cut]:
if not a in config:
raise ConfigurationError(
"missing '%s' argument for %s" % (a, factory))
arg = config.pop(a)
if isinstance(arg, Directive):
arg = arg(ctx)
args.append(arg)
for a in argspec.args[pos_cut:]:
if a in config:
arg = config.pop(a)
if isinstance(arg, Directive):
arg = arg(ctx)
kwargs[a] = arg
if argspec.keywords:
for k in config.keys():
arg = config.pop(k)
if isinstance(arg, Directive):
arg = arg(ctx)
kwargs[k] = arg
if config:
raise ConfigurationError(
"extra arguments '%s' found for %s" % (config, factory))
return factory(*args, **kwargs)
def __str__(self):
return '%s(%s)' % (self.__class__.__name__, self.factory)
__repr__ = __str__
@Configuration.add_multi_constructor('factory')
def _factory_constructor(loader, tag, node):
if node.value:
item = loader.construct_mapping(node, deep=True)
return Factory(tag, item)
else:
return Factory(tag, {})
class Obj(Directive):
def __init__(self, obj):
self.obj = obj
def __call__(self, ctx):
try:
return import_string(self.obj)
except ImportStringError as e:
raise ConfigurationError("cannot import obj: %s" % e)
@Configuration.add_multi_constructor('obj')
def _obj_constructor(loader, tag, node):
return Obj(tag)
class Include(Directive):
def __init__(self, filename):
self.filename = filename
def __call__(self, ctx):
return Configuration.from_file(path.join(ctx._pwd, self.filename))
@Configuration.add_multi_constructor('include')
def _include_constructor(loader, tag, node):
return Include(tag)
class Extends(Directive):
def __init__(self, filename, config):
self.filename = filename
self.config = config
def __call__(self, ctx):
sup = Configuration.from_file(path.join(ctx._pwd, self.filename))
cfg = Configuration.from_dict(self.config)
return sup + cfg
def __iter__(self):
return iter(self.config)
def __getitem__(self, name):
return self.config[name]
def __getattr__(self, name):
return getattr(self.config, name)
def __contains__(self, name):
return name in self.config
@Configuration.add_multi_constructor('extends')
def _extends_constructor(loader, tag, node):
item = loader.construct_mapping(node, deep=True)
return Extends(tag, item)
@Configuration.add_constructor('logging')
def _logging_constructor(loader, node):
config = loader.construct_mapping(node, deep=True)
disable_existing_loggers = config.pop('disable_existing_loggers', False)
configure_logging(config, disable_existing_loggers=disable_existing_loggers)
[docs]def import_string(import_name, silent=False):
"""Imports an object based on a string. This is useful if you want to
use import paths as endpoints or something similar. An import path can
be specified either in dotted notation (``xml.sax.saxutils.escape``)
or with a colon as object delimiter (``xml.sax.saxutils:escape``).
If `silent` is True the return value will be `None` if the import fails.
For better debugging we recommend the new :func:`import_module`
function to be used instead.
:param import_name: the dotted name for the object to import.
:param silent: if set to `True` import errors are ignored and
`None` is returned instead.
:return: imported object
:copyright: (c) 2011 by the Werkzeug Team
"""
# force the import name to automatically convert to strings
if isinstance(import_name, unicode):
import_name = str(import_name)
try:
if ':' in import_name:
module, obj = import_name.split(':', 1)
elif '.' in import_name:
module, obj = import_name.rsplit('.', 1)
else:
return __import__(import_name)
# __import__ is not able to handle unicode strings in the fromlist
# if the module is a package
if isinstance(obj, unicode):
obj = obj.encode('utf-8')
try:
return getattr(__import__(module, None, None, [obj]), obj)
except (ImportError, AttributeError):
# support importing modules not yet set up by the parent module
# (or package for that matter)
modname = module + '.' + obj
__import__(modname)
return sys.modules[modname]
except ImportError, e:
if not silent:
raise ImportStringError(import_name, e), None, sys.exc_info()[2]
[docs]class ImportStringError(ImportError):
"""Provides information about a failed :func:`import_string` attempt.
:copyright: (c) 2011 by the Werkzeug Team
"""
#: String in dotted notation that failed to be imported.
import_name = None
#: Wrapped exception.
exception = None
def __init__(self, import_name, exception):
self.import_name = import_name
self.exception = exception
msg = (
'import_string() failed for %r. Possible reasons are:\n\n'
'- missing __init__.py in a package;\n'
'- package or module path not included in sys.path;\n'
'- duplicated package or module name taking precedence in '
'sys.path;\n'
'- missing module, class, function or variable;\n\n'
'Debugged import:\n\n%s\n\n'
'Original exception:\n\n%s: %s')
name = ''
tracked = []
for part in import_name.replace(':', '.').split('.'):
name += (name and '.') + part
imported = import_string(name, silent=True)
if imported:
tracked.append((name, imported.__file__))
else:
track = ['- %r found in %r.' % (n, i) for n, i in tracked]
track.append('- %r not found.' % name)
msg = msg % (import_name, '\n'.join(track),
exception.__class__.__name__, str(exception))
break
ImportError.__init__(self, msg)
def __repr__(self):
return '<%s(%r, %r)>' % (self.__class__.__name__, self.import_name,
self.exception)
def format_config(config, _lvl=0):
indent = " " * _lvl
buf = ""
for k, v in sorted(config.items()):
buf += "%s%s:\n" % (indent, k)
if isinstance(v, Configuration):
buf += format_config(v, _lvl + 1)
else:
buf += "%s%s\n" % (" " * (_lvl + 1), v)
return buf
def print_config(config):
print format_config(config)
def obj_by_ref(o, path):
for s in path.split("."):
o = getattr(o, s)
return o