tiramisu/tiramisu/config.py
2012-11-15 10:55:14 +01:00

693 lines
29 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"pretty small and local configuration management tool"
# Copyright (C) 2012 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# The original `Config` design model is unproudly borrowed from
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence
# ____________________________________________________________
from copy import copy
from tiramisu.error import (PropertiesOptionError, ConfigError, NotFoundError,
AmbigousOptionError, ConflictConfigError, NoMatchingOptionFound,
MandatoryError, MethodCallError, NoValueReturned)
from tiramisu.option import (OptionDescription, Option, SymLinkOption,
group_types, Multi, apply_requires)
# ______________________________________________________________________
# generic owner. 'default' is the general config owner after init time
default_owner = 'user'
# ____________________________________________________________
class Config(object):
"main configuration management entry"
#properties attribute: the name of a property enables this property
_cfgimpl_properties = ['hidden', 'disabled']
_cfgimpl_permissive = []
#mandatory means: a mandatory option has to have a value that is not None
_cfgimpl_mandatory = True
_cfgimpl_frozen = True
_cfgimpl_owner = default_owner
_cfgimpl_toplevel = None
def __init__(self, descr, parent=None):
""" Configuration option management master class
:param descr: describes the configuration schema
:type descr: an instance of ``option.OptionDescription``
:param parent: is None if the ``Config`` is root parent Config otherwise
:type parent: ``Config``
"""
self._cfgimpl_descr = descr
self._cfgimpl_value_owners = {}
self._cfgimpl_parent = parent
"`Config()` indeed is in charge of the `Option()`'s values"
self._cfgimpl_values = {}
self._cfgimpl_previous_values = {}
"warnings are a great idea, let's make up a better use of it"
self._cfgimpl_warnings = []
self._cfgimpl_toplevel = self._cfgimpl_get_toplevel()
'`freeze()` allows us to carry out this calculation again if necessary'
self._cfgimpl_frozen = self._cfgimpl_toplevel._cfgimpl_frozen
self._cfgimpl_build()
def _validate_duplicates(self, children):
"""duplicates Option names in the schema
:type children: list of `Option` or `OptionDescription`
"""
duplicates = []
for dup in children:
if dup._name not in duplicates:
duplicates.append(dup._name)
else:
raise ConflictConfigError('duplicate option name: '
'{0}'.format(dup._name))
def _cfgimpl_build(self):
"""
- builds the config object from the schema
- settles various default values for options
"""
self._validate_duplicates(self._cfgimpl_descr._children)
for child in self._cfgimpl_descr._children:
if isinstance(child, Option):
if child.is_multi():
childdef = Multi(copy(child.getdefault()), config=self,
child=child)
self._cfgimpl_values[child._name] = childdef
self._cfgimpl_previous_values[child._name] = list(childdef)
else:
childdef = child.getdefault()
self._cfgimpl_values[child._name] = childdef
self._cfgimpl_previous_values[child._name] = childdef
self._cfgimpl_value_owners[child._name] = 'default'
elif isinstance(child, OptionDescription):
self._validate_duplicates(child._children)
self._cfgimpl_values[child._name] = Config(child, parent=self)
# self.override(overrides)
def cfgimpl_set_permissive(self, permissive):
if not isinstance(permissive, list):
raise TypeError('permissive must be a list')
self._cfgimpl_permissive = permissive
def cfgimpl_update(self):
"""dynamically adds `Option()` or `OptionDescription()`
"""
# FIXME this is an update for new options in the schema only
# see the update_child() method of the descr object
for child in self._cfgimpl_descr._children:
if isinstance(child, Option):
if child._name not in self._cfgimpl_values:
if child.is_multi():
self._cfgimpl_values[child._name] = Multi(
copy(child.getdefault()), config=self, child=child)
else:
self._cfgimpl_values[child._name] = copy(child.getdefault())
self._cfgimpl_value_owners[child._name] = 'default'
elif isinstance(child, OptionDescription):
if child._name not in self._cfgimpl_values:
self._cfgimpl_values[child._name] = Config(child, parent=self)
def cfgimpl_set_owner(self, owner):
":param owner: sets the default value for owner at the Config level"
self._cfgimpl_owner = owner
for child in self._cfgimpl_descr._children:
if isinstance(child, OptionDescription):
self._cfgimpl_values[child._name].cfgimpl_set_owner(owner)
# ____________________________________________________________
# properties methods
def _cfgimpl_has_properties(self):
"has properties means the Config's properties attribute is not empty"
return bool(len(self._cfgimpl_properties))
def _cfgimpl_has_property(self, propname):
"""has property propname in the Config's properties attribute
:param property: string wich is the name of the property"""
return propname in self._cfgimpl_properties
def cfgimpl_enable_property(self, propname):
"puts property propname in the Config's properties attribute"
if self._cfgimpl_parent != None:
raise MethodCallError("this method root_hide() shall not be"
"used with non-root Config() object")
if propname not in self._cfgimpl_properties:
self._cfgimpl_properties.append(propname)
def cfgimpl_disable_property(self, propname):
"deletes property propname in the Config's properties attribute"
if self._cfgimpl_parent != None:
raise MethodCallError("this method root_hide() shall not be"
"used with non-root Config() object")
if self._cfgimpl_has_property(propname):
self._cfgimpl_properties.remove(propname)
# ____________________________________________________________
# attribute methods
def __setattr__(self, name, value):
"attribute notation mechanism for the setting of the value of an option"
if name.startswith('_cfgimpl_'):
self.__dict__[name] = value
return
if '.' in name:
homeconfig, name = self._cfgimpl_get_home_by_path(name)
return setattr(homeconfig, name, value)
if type(getattr(self._cfgimpl_descr, name)) != SymLinkOption:
self._validate(name, getattr(self._cfgimpl_descr, name))
self.setoption(name, value, self._cfgimpl_owner)
def _validate(self, name, opt_or_descr, permissive=False):
"validation for the setattr and the getattr"
apply_requires(opt_or_descr, self)
if not isinstance(opt_or_descr, Option) and \
not isinstance(opt_or_descr, OptionDescription):
raise TypeError('Unexpected object: {0}'.format(repr(opt_or_descr)))
properties = copy(opt_or_descr.properties)
for proper in copy(properties):
if not self._cfgimpl_toplevel._cfgimpl_has_property(proper):
properties.remove(proper)
if permissive:
for perm in self._cfgimpl_toplevel._cfgimpl_permissive:
if perm in properties:
properties.remove(perm)
if properties != []:
raise PropertiesOptionError("trying to access"
" to an option named: {0} with properties"
" {1}".format(name, str(properties)),
properties)
def _is_empty(self, opt):
"convenience method to know if an option is empty"
if (not opt.is_multi() and self._cfgimpl_values[opt._name] == None) or \
(opt.is_multi() and (self._cfgimpl_values[opt._name] == [] or \
None in self._cfgimpl_values[opt._name])):
return True
return False
def _test_mandatory(self, path, opt):
# mandatory options
homeconfig = self._cfgimpl_get_toplevel()
mandatory = homeconfig._cfgimpl_mandatory
if opt.is_mandatory() and mandatory:
if self._is_empty(opt) and \
opt.is_empty_by_default():
raise MandatoryError("option: {0} is mandatory "
"and shall have a value".format(path))
def __getattr__(self, name):
return self._getattr(name)
def _getattr(self, name, permissive=False):
"""
attribute notation mechanism for accessing the value of an option
:param name: attribute name
:param permissive: permissive doesn't raise some property error
(see ``_cfgimpl_permissive``)
:return: option's value if name is an option name, OptionDescription
otherwise
"""
# attribute access by passing a path,
# for instance getattr(self, "creole.general.family.adresse_ip_eth0")
if '.' in name:
homeconfig, name = self._cfgimpl_get_home_by_path(name)
return homeconfig._getattr(name, permissive)
opt_or_descr = getattr(self._cfgimpl_descr, name)
# symlink options
if type(opt_or_descr) == SymLinkOption:
return getattr(self, opt_or_descr.path)
if name not in self._cfgimpl_values:
raise AttributeError("%s object has no attribute %s" %
(self.__class__, name))
self._validate(name, opt_or_descr, permissive)
# special attributes
if name.startswith('_cfgimpl_'):
# if it were in __dict__ it would have been found already
return self.__dict__[name]
raise AttributeError("%s object has no attribute %s" %
(self.__class__, name))
if not isinstance(opt_or_descr, OptionDescription):
# options with callbacks (fill or auto)
if opt_or_descr.has_callback():
value = self._cfgimpl_values[name]
if (not opt_or_descr.is_frozen() or \
not opt_or_descr.is_forced_on_freeze()) and \
not opt_or_descr.is_default_owner(self):
if opt_or_descr.is_multi():
if None not in value:
return value
else:
return value
try:
result = opt_or_descr.getcallback_value(
self._cfgimpl_get_toplevel())
except NoValueReturned, err:
pass
else:
if opt_or_descr.is_multi():
if not isinstance(result, list):
result = [result]
_result = Multi(result, value.config, value.child)
else:
# this result **shall not** be a list
if isinstance(result, list):
raise ConfigError('invalid calculated value returned'
' for option {0} : shall not be a list'.format(name))
_result = result
if _result != None and not opt_or_descr.validate(_result):
raise ConfigError('invalid calculated value returned'
' for option {0}'.format(name))
self._cfgimpl_values[name] = _result
self._cfgimpl_value_owners[name] = 'default'
self._test_mandatory(name, opt_or_descr)
# frozen and force default
if not opt_or_descr.has_callback() and opt_or_descr.is_forced_on_freeze():
return opt_or_descr.getdefault()
return self._cfgimpl_values[name]
def unwrap_from_name(self, name):
"""convenience method to extract and Option() object from the Config()
**and it is slow**: it recursively searches into the namespaces
:returns: Option()
"""
paths = self.getpaths(allpaths=True)
opts = dict([(path, self.unwrap_from_path(path)) for path in paths])
all_paths = [p.split(".") for p in self.getpaths()]
for pth in all_paths:
if name in pth:
return opts[".".join(pth)]
raise NotFoundError("name: {0} not found".format(name))
def unwrap_from_path(self, path):
"""convenience method to extract and Option() object from the Config()
and it is **fast**: finds the option directly in the appropriate
namespace
:returns: Option()
"""
if '.' in path:
homeconfig, path = self._cfgimpl_get_home_by_path(path)
return getattr(homeconfig._cfgimpl_descr, path)
return getattr(self._cfgimpl_descr, path)
#def __delattr__(self, name):
# "if you use delattr you are responsible for all bad things happening"
# if name.startswith('_cfgimpl_'):
# del self.__dict__[name]
# return
# self._cfgimpl_value_owners[name] = 'default'
# opt = getattr(self._cfgimpl_descr, name)
# if isinstance(opt, OptionDescription):
# raise AttributeError("can't option subgroup")
# self._cfgimpl_values[name] = getattr(opt, 'default', None)
def setoption(self, name, value, who=None):
"""effectively modifies the value of an Option()
(typically called by the __setattr__)
:param who: is an owner's name
who is **not necessarily** a owner, because it cannot be a list
:type who: string
"""
child = getattr(self._cfgimpl_descr, name)
if type(child) != SymLinkOption:
if who == None:
who = self._cfgimpl_owner
if child.is_multi():
if type(value) != Multi:
if type(value) == list:
value = Multi(value, self, child)
else:
raise ConfigError("invalid value for option:"
" {0} that is set to multi".format(name))
child.setoption(self, value, who)
child.setowner(self, who)
else:
homeconfig = self._cfgimpl_get_toplevel()
child.setoption(homeconfig, value, who)
def set(self, **kwargs):
"""
"do what I mean"-interface to option setting. Searches all paths
starting from that config for matches of the optional arguments
and sets the found option if the match is not ambiguous.
:param kwargs: dict of name strings to values.
"""
all_paths = [p.split(".") for p in self.getpaths(allpaths=True)]
for key, value in kwargs.iteritems():
key_p = key.split('.')
candidates = [p for p in all_paths if p[-len(key_p):] == key_p]
if len(candidates) == 1:
name = '.'.join(candidates[0])
homeconfig, name = self._cfgimpl_get_home_by_path(name)
try:
getattr(homeconfig, name)
except MandatoryError:
pass
except Exception, e:
raise e # HiddenOptionError or DisabledOptionError
homeconfig.setoption(name, value, self._cfgimpl_owner)
elif len(candidates) > 1:
raise AmbigousOptionError(
'more than one option that ends with %s' % (key, ))
else:
raise NoMatchingOptionFound(
'there is no option that matches %s'
' or the option is hidden or disabled'% (key, ))
def get(self, name):
"""
same as a find_first() method in a config that has identical names
that is : Returns the first item of an option named 'name'
much like the attribute access way, except that
the search for the option is performed recursively in the whole
configuration tree.
**carefull**: very slow !
:returns: option value.
"""
paths = self.getpaths(allpaths=True)
pathsvalues = []
for path in paths:
pathname = path.split('.')[-1]
if pathname == name:
try:
value = getattr(self, path)
return value
except Exception, e:
raise e
raise NotFoundError("option {0} not found in config".format(name))
def _cfgimpl_get_home_by_path(self, path):
""":returns: tuple (config, name)"""
path = path.split('.')
for step in path[:-1]:
self = getattr(self, step)
return self, path[-1]
def _cfgimpl_get_toplevel(self):
":returns: root config"
while self._cfgimpl_parent is not None:
self = self._cfgimpl_parent
return self
def _cfgimpl_get_path(self):
"the path in the attribute access meaning."
subpath = []
obj = self
while obj._cfgimpl_parent is not None:
subpath.insert(0, obj._cfgimpl_descr._name)
obj = obj._cfgimpl_parent
return ".".join(subpath)
# ______________________________________________________________________
def cfgimpl_previous_value(self, path):
"stores the previous value"
home, name = self._cfgimpl_get_home_by_path(path)
return home._cfgimpl_previous_values[name]
def get_previous_value(self, name):
"for the time being, only the previous Option's value is accessible"
return self._cfgimpl_previous_values[name]
# ______________________________________________________________________
def add_warning(self, warning):
"Config implements its own warning pile. Could be useful"
self._cfgimpl_get_toplevel()._cfgimpl_warnings.append(warning)
def get_warnings(self):
"Config implements its own warning pile"
return self._cfgimpl_get_toplevel()._cfgimpl_warnings
# ____________________________________________________________
# Config()'s status
def cfgimpl_freeze(self):
"cannot modify the frozen `Option`'s"
rootconfig = self._cfgimpl_get_toplevel()
rootconfig._cfgimpl_frozen = True
self._cfgimpl_frozen = True
def cfgimpl_unfreeze(self):
"can modify the Options that are frozen"
rootconfig = self._cfgimpl_get_toplevel()
rootconfig._cfgimpl_frozen = False
self._cfgimpl_frozen = False
def is_frozen(self):
"freeze flag at Config level"
rootconfig = self._cfgimpl_get_toplevel()
return rootconfig._cfgimpl_frozen
def cfgimpl_read_only(self):
"convenience method to freeze, hidde and disable"
self.cfgimpl_freeze()
rootconfig = self._cfgimpl_get_toplevel()
rootconfig.cfgimpl_disable_property('hidden')
rootconfig.cfgimpl_enable_property('disabled')
rootconfig._cfgimpl_mandatory = True
def cfgimpl_read_write(self):
"convenience method to freeze, hidde and disable"
self.cfgimpl_freeze()
rootconfig = self._cfgimpl_get_toplevel()
rootconfig.cfgimpl_enable_property('hidden')
rootconfig.cfgimpl_enable_property('disabled')
rootconfig._cfgimpl_mandatory = False
def cfgimpl_non_mandatory(self):
"""mandatory at the Config level means that the Config raises an error
if a mandatory option is found"""
if self._cfgimpl_parent != None:
raise MethodCallError("this method root_mandatory shall"
" not be used with non-root Confit() object")
rootconfig = self._cfgimpl_get_toplevel()
rootconfig._cfgimpl_mandatory = False
def cfgimpl_mandatory(self):
"""mandatory at the Config level means that the Config raises an error
if a mandatory option is found"""
if self._cfgimpl_parent != None:
raise MethodCallError("this method root_mandatory shall"
" not be used with non-root Confit() object")
rootconfig = self._cfgimpl_get_toplevel()
rootconfig._cfgimpl_mandatory = True
def is_mandatory(self):
"all mandatory Options shall have a value"
rootconfig = self._cfgimpl_get_toplevel()
return rootconfig._cfgimpl_mandatory
# ____________________________________________________________
def getkey(self):
return self._cfgimpl_descr.getkey(self)
def __hash__(self):
return hash(self.getkey())
def __eq__(self, other):
"Config comparison"
return self.getkey() == other.getkey()
def __ne__(self, other):
"Config comparison"
return not self == other
# ______________________________________________________________________
def __iter__(self):
"iteration only on Options (not OptionDescriptions)"
for child in self._cfgimpl_descr._children:
if isinstance(child, Option):
try:
yield child._name, getattr(self, child._name)
except:
pass # option with properties
def iter_groups(self, group_type=None):
"iteration on OptionDescriptions"
if group_type == None:
groups = group_types
else:
if group_type not in group_types:
raise TypeError("Unknown group_type: {0}".format(group_type))
groups = [group_type]
for child in self._cfgimpl_descr._children:
if isinstance(child, OptionDescription):
try:
if child.get_group_type() in groups:
yield child._name, getattr(self, child._name)
except:
pass # hidden, disabled option
# ______________________________________________________________________
def __str__(self, indent=""):
"Config's string representation"
lines = []
children = [(child._name, child)
for child in self._cfgimpl_descr._children]
children.sort()
for name, child in children:
if self._cfgimpl_value_owners.get(name, None) == 'default':
continue
value = getattr(self, name)
if isinstance(value, Config):
substr = value.__str__(indent + " ")
else:
substr = "%s %s = %s" % (indent, name, value)
if substr:
lines.append(substr)
if indent and not lines:
return '' # hide subgroups with all default values
lines.insert(0, "%s[%s]" % (indent, self._cfgimpl_descr._name,))
return '\n'.join(lines)
def getpaths(self, include_groups=False, allpaths=False, mandatory=False):
"""returns a list of all paths in self, recursively, taking care of
the context of properties (hidden/disabled)
:param include_groups: if true, OptionDescription are included
:param allpaths: all the options (event the properties protected ones)
:param mandatory: includes the mandatory options
:returns: list of all paths
"""
paths = []
for path in self._cfgimpl_descr.getpaths(include_groups=include_groups):
try:
value = getattr(self, path)
except MandatoryError:
if mandatory or allpaths:
paths.append(path)
except PropertiesOptionError:
if allpaths:
paths.append(path) # option which have properties added
else:
paths.append(path)
return paths
def _find(self, bytype, byname, byvalue, byattrs, first):
"""
:param first: return only one option if True, a list otherwise
"""
def _filter_by_attrs():
if byattrs is None:
return True
for key, value in byattrs.items():
if not hasattr(option, key):
return False
else:
if getattr(option, key) != value:
return False
else:
continue
return True
def _filter_by_name():
if byname is None:
return True
pathname = path.split('.')[-1]
if pathname == byname:
return True
else:
return False
def _filter_by_value():
if byvalue is None:
return True
try:
value = getattr(self, path)
if value == byvalue:
return True
except Exception, e: # a property restricts the acces to value
pass
return False
def _filter_by_type():
if bytype is None:
return True
if isinstance(option, bytype):
return True
return False
find_results = []
paths = self.getpaths(allpaths=True)
for path in paths:
option = self.unwrap_from_path(path)
if not _filter_by_name():
continue
if not _filter_by_value():
continue
if not _filter_by_type():
continue
if not _filter_by_attrs():
continue
if first:
return option
else:
find_results.append(option)
if first:
return None
else:
return find_results
def find(self, bytype=None, byname=None, byvalue=None, byattrs=None):
"""
finds a list of options recursively in the config
:param bytype: Option class (BoolOption, StrOption, ...)
:param byname: filter by Option._name
:param byvalue: filter by the option's value
:param byattrs: dict of option attributes (default, callback...)
:returns: list of matching Option objects
"""
return self._find(bytype, byname, byvalue, byattrs, first=False)
def find_first(self, bytype=None, byname=None, byvalue=None, byattrs=None):
"""
finds an option recursively in the config
:param bytype: Option class (BoolOption, StrOption, ...)
:param byname: filter by Option._name
:param byvalue: filter by the option's value
:param byattrs: dict of option attributes (default, callback...)
:returns: list of matching Option objects
"""
return self._find(bytype, byname, byvalue, byattrs, first=True)
def make_dict(config, flatten=False):
"""export the whole config into a `dict`
:returns: dict of Option's name (or path) and values"""
paths = config.getpaths()
pathsvalues = []
for path in paths:
if flatten:
pathname = path.split('.')[-1]
else:
pathname = path
try:
value = getattr(config, path)
pathsvalues.append((pathname, value))
except:
pass # this just a hidden or disabled option
options = dict(pathsvalues)
return options
def mandatory_warnings(config):
"""convenience function to trace Options that are mandatory and
where no value has been set
:returns: generator of mandatory Option's path
"""
mandatory = config._cfgimpl_get_toplevel()._cfgimpl_mandatory
config._cfgimpl_get_toplevel()._cfgimpl_mandatory = True
for path in config._cfgimpl_descr.getpaths(include_groups=True):
try:
value = config._getattr(path, permissive=True)
except MandatoryError:
yield path
except PropertiesOptionError:
pass
config._cfgimpl_get_toplevel()._cfgimpl_mandatory = mandatory