1251 lines
55 KiB
Python
1251 lines
55 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright (C) 2012-2017 Team tiramisu (see AUTHORS for all contributors)
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Lesser General Public License as published by the
|
|
# Free Software Foundation, either version 3 of the License, or (at your
|
|
# option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
# The original `Config` design model is unproudly borrowed from
|
|
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
|
|
# the whole pypy projet is under MIT licence
|
|
# ____________________________________________________________
|
|
"options handler global entry point"
|
|
import weakref
|
|
import sys
|
|
from time import time
|
|
from copy import copy
|
|
|
|
|
|
from .error import PropertiesOptionError, ConfigError, ConflictError
|
|
from .option import OptionDescription, Option, SymLinkOption, \
|
|
DynSymLinkOption, SynDynOptionDescription
|
|
from .option.baseoption import valid_name
|
|
from .setting import groups, Settings, default_encoding, undefined
|
|
from .storage import get_storages, set_storage, get_default_values_storages
|
|
from .value import Values # , Multi
|
|
from .i18n import _
|
|
|
|
|
|
if sys.version_info[0] >= 3: # pragma: no cover
|
|
xrange = range
|
|
|
|
|
|
class SubConfig(object):
|
|
"""Sub configuration management entry.
|
|
Tree if OptionDescription's responsability. SubConfig are generated
|
|
on-demand. A Config is also a SubConfig.
|
|
Root Config is call context below
|
|
"""
|
|
__slots__ = ('_impl_context',
|
|
'_impl_descr',
|
|
'_impl_path',
|
|
'_impl_setting_properties',
|
|
'_impl_length')
|
|
|
|
def __init__(self,
|
|
descr,
|
|
context,
|
|
setting_properties,
|
|
validate,
|
|
force_permissive,
|
|
subpath=None):
|
|
""" Configuration option management master class
|
|
|
|
:param descr: describes the configuration schema
|
|
:type descr: an instance of ``option.OptionDescription``
|
|
:param context: the current root config
|
|
:type context: `Config`
|
|
:type subpath: `str` with the path name
|
|
"""
|
|
# main option description
|
|
error = False
|
|
if descr is not None and not isinstance(descr, OptionDescription) and \
|
|
not isinstance(descr, SynDynOptionDescription): # pragma: optional cover
|
|
error = True
|
|
if error:
|
|
raise TypeError(_('descr must be an optiondescription, not {0}'
|
|
).format(type(descr)))
|
|
self._impl_descr = descr
|
|
# sub option descriptions
|
|
if not isinstance(context, weakref.ReferenceType): # pragma: optional cover
|
|
raise ValueError('context must be a Weakref')
|
|
self._impl_context = context
|
|
self._impl_path = subpath
|
|
if setting_properties is not undefined:
|
|
self._impl_setting_properties = setting_properties
|
|
if setting_properties is not None and descr.impl_get_group_type() == groups.master:
|
|
self._impl_length = descr.get_length(context().cfgimpl_get_values(),
|
|
setting_properties=setting_properties,
|
|
validate=validate,
|
|
force_permissive=force_permissive)
|
|
|
|
def cfgimpl_get_length(self):
|
|
return getattr(self, '_impl_length')
|
|
|
|
def reset_one_option_cache(self,
|
|
values,
|
|
settings,
|
|
resetted_opts,
|
|
opt,
|
|
path):
|
|
tresetted_opts = copy(resetted_opts)
|
|
opt.reset_cache(opt,
|
|
path,
|
|
values,
|
|
'values',
|
|
tresetted_opts)
|
|
tresetted_opts = copy(resetted_opts)
|
|
opt.reset_cache(opt,
|
|
path,
|
|
settings,
|
|
'settings',
|
|
tresetted_opts)
|
|
resetted_opts |= tresetted_opts
|
|
for woption in opt._get_dependencies(self):
|
|
option = woption()
|
|
if option in resetted_opts:
|
|
continue
|
|
option_path = opt.impl_getpath(self)
|
|
self.reset_one_option_cache(values,
|
|
settings,
|
|
resetted_opts,
|
|
option,
|
|
option_path)
|
|
del option
|
|
|
|
def cfgimpl_reset_cache(self,
|
|
only_expired=False,
|
|
opt=None,
|
|
path=None,
|
|
resetted_opts=None):
|
|
"""reset all settings in cache
|
|
|
|
:param only_expired: if True reset only expired cached values
|
|
:type only_expired: boolean
|
|
"""
|
|
|
|
def reset_expired_cache():
|
|
# reset cache for expired cache value ony
|
|
datetime = int(time())
|
|
values._p_.reset_expired_cache(datetime)
|
|
settings._p_.reset_expired_cache(datetime)
|
|
|
|
def reset_all_cache():
|
|
values._p_.reset_all_cache()
|
|
settings._p_.reset_all_cache()
|
|
|
|
if resetted_opts is None:
|
|
resetted_opts = set()
|
|
|
|
context = self._cfgimpl_get_context()
|
|
values = context.cfgimpl_get_values()
|
|
settings = context.cfgimpl_get_settings()
|
|
|
|
if not None in (opt, path):
|
|
if opt not in resetted_opts:
|
|
self.reset_one_option_cache(values,
|
|
settings,
|
|
resetted_opts,
|
|
opt,
|
|
path)
|
|
|
|
elif only_expired:
|
|
reset_expired_cache()
|
|
else:
|
|
reset_all_cache()
|
|
|
|
def cfgimpl_get_home_by_path(self,
|
|
path,
|
|
setting_properties,
|
|
validate_properties=True,
|
|
force_permissive=False):
|
|
""":returns: tuple (config, name)"""
|
|
path = path.split('.')
|
|
for step in path[:-1]:
|
|
self = self.getattr(step,
|
|
force_permissive=force_permissive,
|
|
validate_properties=validate_properties,
|
|
setting_properties=setting_properties)
|
|
if isinstance(self, Exception):
|
|
return self, None
|
|
return self, path[-1]
|
|
|
|
# ______________________________________________________________________
|
|
def __iter__(self, force_permissive=False):
|
|
"""Pythonesque way of parsing group's ordered options.
|
|
iteration only on Options (not OptionDescriptions)"""
|
|
setting_properties = self.cfgimpl_get_context().cfgimpl_get_settings().get_context_properties()
|
|
for child in self.cfgimpl_get_description().impl_getchildren(context=self._cfgimpl_get_context()):
|
|
if not child.impl_is_optiondescription():
|
|
try:
|
|
name = child.impl_getname()
|
|
yield name, self.getattr(name,
|
|
force_permissive=force_permissive,
|
|
setting_properties=setting_properties)
|
|
except GeneratorExit: # pragma: optional cover
|
|
if sys.version_info[0] < 3:
|
|
raise StopIteration
|
|
else:
|
|
raise GeneratorExit()
|
|
except PropertiesOptionError: # pragma: optional cover
|
|
pass # option with properties
|
|
|
|
def iter_all(self, force_permissive=False):
|
|
"""A way of parsing options **and** groups.
|
|
iteration on Options and OptionDescriptions."""
|
|
for child in self.cfgimpl_get_description().impl_getchildren():
|
|
try:
|
|
yield child.impl_getname(), self.getattr(child.impl_getname(),
|
|
force_permissive=force_permissive)
|
|
except GeneratorExit: # pragma: optional cover
|
|
if sys.version_info[0] < 3:
|
|
raise StopIteration
|
|
else:
|
|
raise GeneratorExit()
|
|
except PropertiesOptionError: # pragma: optional cover
|
|
pass # option with properties
|
|
|
|
def iter_groups(self,
|
|
group_type=None,
|
|
force_permissive=False):
|
|
"""iteration on groups objects only.
|
|
All groups are returned if `group_type` is `None`, otherwise the groups
|
|
can be filtered by categories (families, or whatever).
|
|
|
|
:param group_type: if defined, is an instance of `groups.GroupType`
|
|
or `groups.MasterGroupType` that lives in
|
|
`setting.groups`
|
|
"""
|
|
if group_type is not None and not isinstance(group_type,
|
|
groups.GroupType): # pragma: optional cover
|
|
raise TypeError(_("unknown group_type: {0}").format(group_type))
|
|
context = self._cfgimpl_get_context()
|
|
setting_properties = context.cfgimpl_get_settings().get_context_properties()
|
|
for child in self.cfgimpl_get_description().impl_getchildren(context=context):
|
|
if child.impl_is_optiondescription():
|
|
try:
|
|
if group_type is None or (group_type is not None and
|
|
child.impl_get_group_type()
|
|
== group_type):
|
|
name = child.impl_getname()
|
|
yield name, self.getattr(name,
|
|
force_permissive=force_permissive,
|
|
setting_properties=setting_properties)
|
|
except GeneratorExit: # pragma: optional cover
|
|
if sys.version_info[0] < 3:
|
|
raise StopIteration
|
|
else:
|
|
raise GeneratorExit()
|
|
except PropertiesOptionError: # pragma: optional cover
|
|
pass
|
|
# ______________________________________________________________________
|
|
|
|
def __str__(self):
|
|
"Config's string representation"
|
|
lines = []
|
|
for name, grp in self.iter_groups():
|
|
lines.append("[{0}]".format(name))
|
|
for name, value in self:
|
|
try:
|
|
lines.append("{0} = {1}".format(name, value))
|
|
except UnicodeEncodeError: # pragma: optional cover
|
|
lines.append("{0} = {1}".format(name,
|
|
value.encode(default_encoding)))
|
|
return '\n'.join(lines)
|
|
|
|
__repr__ = __str__
|
|
|
|
def _cfgimpl_get_context(self):
|
|
"""context could be None, we need to test it
|
|
context is None only if all reference to `Config` object is deleted
|
|
(for example we delete a `Config` and we manipulate a reference to
|
|
old `SubConfig`, `Values`, `Multi` or `Settings`)
|
|
"""
|
|
context = self._impl_context()
|
|
if context is None: # pragma: optional cover
|
|
raise ConfigError(_('the context does not exist anymore'))
|
|
return context
|
|
|
|
def cfgimpl_get_context(self):
|
|
return self._cfgimpl_get_context()
|
|
|
|
def cfgimpl_get_description(self):
|
|
if self._impl_descr is None: # pragma: optional cover
|
|
raise ConfigError(_('no option description found for this config'
|
|
' (may be GroupConfig)'))
|
|
else:
|
|
return self._impl_descr
|
|
|
|
def cfgimpl_get_settings(self):
|
|
return self._cfgimpl_get_context()._impl_settings
|
|
|
|
def cfgimpl_get_values(self):
|
|
return self._cfgimpl_get_context()._impl_values
|
|
|
|
# ____________________________________________________________
|
|
# attribute methods
|
|
def __setattr__(self, name, value):
|
|
"attribute notation mechanism for the setting of the value of an option"
|
|
self.setattr(name,
|
|
value)
|
|
|
|
def setattr(self,
|
|
name,
|
|
value,
|
|
force_permissive=False,
|
|
index=None,
|
|
setting_properties=None,
|
|
_commit=True):
|
|
|
|
if name.startswith('_impl_'):
|
|
return object.__setattr__(self,
|
|
name,
|
|
value)
|
|
context = self._cfgimpl_get_context()
|
|
if '.' in name: # pragma: optional cover
|
|
self, name = self.cfgimpl_get_home_by_path(name,
|
|
force_permissive=force_permissive,
|
|
setting_properties=setting_properties)
|
|
child = self.cfgimpl_get_description().__getattr__(name,
|
|
context=context)
|
|
if isinstance(child, (OptionDescription, SynDynOptionDescription)):
|
|
raise TypeError(_("can't assign to an OptionDescription")) # pragma: optional cover
|
|
elif child.impl_is_symlinkoption() and \
|
|
not isinstance(child, DynSymLinkOption): # pragma: no dynoptiondescription cover
|
|
raise TypeError(_("can't assign to a SymlinkOption"))
|
|
else:
|
|
self.cfgimpl_get_description().impl_validate_value(child,
|
|
value,
|
|
self)
|
|
subpath = self._get_subpath(name)
|
|
return self.cfgimpl_get_values().setvalue(child,
|
|
value,
|
|
subpath,
|
|
force_permissive,
|
|
index,
|
|
setting_properties,
|
|
_commit)
|
|
|
|
def delattr(self,
|
|
name,
|
|
index=None,
|
|
force_permissive=False,
|
|
setting_properties=None,
|
|
validate=True):
|
|
|
|
context = self._cfgimpl_get_context()
|
|
if '.' in name: # pragma: optional cover
|
|
self, name = self.cfgimpl_get_home_by_path(name,
|
|
force_permissive=force_permissive,
|
|
setting_properties=setting_properties)
|
|
child = self.cfgimpl_get_description().__getattr__(name,
|
|
context=context)
|
|
if isinstance(child, (OptionDescription, SynDynOptionDescription)):
|
|
raise TypeError(_("can't delete an OptionDescription")) # pragma: optional cover
|
|
elif child.impl_is_symlinkoption() and \
|
|
not isinstance(child, DynSymLinkOption): # pragma: no dynoptiondescription cover
|
|
raise TypeError(_("can't delete a SymlinkOption"))
|
|
subpath = self._get_subpath(name)
|
|
values = self.cfgimpl_get_values()
|
|
if index is not None:
|
|
if child.impl_is_master_slaves('master'):
|
|
values.reset_master(self,
|
|
child,
|
|
subpath,
|
|
index,
|
|
force_permissive,
|
|
setting_properties)
|
|
elif child.impl_is_master_slaves('slave'):
|
|
values.reset_slave(child,
|
|
subpath,
|
|
index,
|
|
setting_properties,
|
|
force_permissive=force_permissive,
|
|
validate=validate)
|
|
else:
|
|
raise ValueError(_("can delete value with index only with a master or a slave"))
|
|
else:
|
|
values.reset(child,
|
|
subpath,
|
|
setting_properties,
|
|
force_permissive=force_permissive,
|
|
validate=validate)
|
|
|
|
def _get_subpath(self, name):
|
|
if self._impl_path is None:
|
|
subpath = name
|
|
else:
|
|
subpath = self._impl_path + '.' + name
|
|
return subpath
|
|
|
|
def getattr(self,
|
|
name,
|
|
setting_properties,
|
|
force_permissive=False,
|
|
validate=True,
|
|
validate_properties=True,
|
|
index=None,
|
|
returns_option=False):
|
|
"""
|
|
attribute notation mechanism for accessing the value of an option
|
|
:param name: attribute name
|
|
:return: option's value if name is an option name, OptionDescription
|
|
otherwise
|
|
"""
|
|
if '.' in name:
|
|
self, name = self.cfgimpl_get_home_by_path(name,
|
|
force_permissive=force_permissive,
|
|
validate_properties=validate_properties,
|
|
setting_properties=setting_properties)
|
|
|
|
context = self._cfgimpl_get_context()
|
|
option = self.cfgimpl_get_description().__getattr__(name,
|
|
context=context)
|
|
if option.impl_is_symlinkoption() and isinstance(option, DynSymLinkOption):
|
|
# FIXME peuvent-il vraiment etre le 2 ?
|
|
# si non supprimer tout ces tests inutiles
|
|
raise Exception('oui ca existe ...')
|
|
if option.impl_is_symlinkoption() and not isinstance(option, DynSymLinkOption):
|
|
if returns_option is True:
|
|
return option
|
|
path = context.cfgimpl_get_description().impl_get_path_by_opt(option.impl_getopt())
|
|
return context.getattr(path,
|
|
validate=validate,
|
|
validate_properties=validate_properties,
|
|
force_permissive=force_permissive,
|
|
setting_properties=setting_properties,
|
|
index=index)
|
|
|
|
subpath = self._get_subpath(name)
|
|
if setting_properties:
|
|
self.cfgimpl_get_settings().validate_properties(option,
|
|
subpath,
|
|
setting_properties,
|
|
index=index,
|
|
force_permissive=force_permissive)
|
|
if option.impl_is_optiondescription():
|
|
if returns_option is True:
|
|
return option
|
|
return SubConfig(option,
|
|
self._impl_context,
|
|
setting_properties,
|
|
validate,
|
|
force_permissive,
|
|
subpath)
|
|
|
|
if option.impl_is_master_slaves('slave'):
|
|
if index is None:
|
|
raise IndexError(_('index is mandatory for the slave "{}"'
|
|
'').format(subpath))
|
|
length = self.cfgimpl_get_length()
|
|
if index >= length:
|
|
raise IndexError(_('index ({}) is higher than the master length ({}) '
|
|
'for "{}"').format(index,
|
|
length,
|
|
subpath))
|
|
elif index:
|
|
raise IndexError(_('index is forbidden for the not slave "{}"'
|
|
'').format(subpath))
|
|
if validate:
|
|
self.cfgimpl_get_description().impl_validate(context,
|
|
force_permissive,
|
|
setting_properties)
|
|
|
|
if returns_option is True:
|
|
return option
|
|
return self.cfgimpl_get_values().get_cached_value(option,
|
|
path=subpath,
|
|
validate=validate,
|
|
setting_properties=setting_properties,
|
|
force_permissive=force_permissive,
|
|
index=index)
|
|
|
|
def find(self,
|
|
setting_properties,
|
|
bytype=None,
|
|
byname=None,
|
|
byvalue=undefined,
|
|
type_='option',
|
|
check_properties=True,
|
|
force_permissive=False):
|
|
"""
|
|
finds a list of options recursively in the config
|
|
|
|
:param bytype: Option class (BoolOption, StrOption, ...)
|
|
:param byname: filter by Option.impl_getname()
|
|
:param byvalue: filter by the option's value
|
|
:returns: list of matching Option objects
|
|
"""
|
|
return self._cfgimpl_get_context()._find(bytype,
|
|
byname,
|
|
byvalue,
|
|
setting_properties=setting_properties,
|
|
first=False,
|
|
type_=type_,
|
|
_subpath=self.cfgimpl_get_path(False),
|
|
check_properties=check_properties,
|
|
force_permissive=force_permissive)
|
|
|
|
def find_first(self,
|
|
setting_properties,
|
|
bytype=None,
|
|
byname=None,
|
|
byvalue=undefined,
|
|
type_='option',
|
|
raise_if_not_found=True,
|
|
check_properties=True,
|
|
force_permissive=False):
|
|
"""
|
|
finds an option recursively in the config
|
|
|
|
:param bytype: Option class (BoolOption, StrOption, ...)
|
|
:param byname: filter by Option.impl_getname()
|
|
:param byvalue: filter by the option's value
|
|
:returns: list of matching Option objects
|
|
"""
|
|
return self._cfgimpl_get_context()._find(bytype,
|
|
byname,
|
|
byvalue,
|
|
setting_properties=setting_properties,
|
|
first=True,
|
|
type_=type_,
|
|
_subpath=self.cfgimpl_get_path(False),
|
|
raise_if_not_found=raise_if_not_found,
|
|
check_properties=check_properties,
|
|
force_permissive=force_permissive)
|
|
|
|
def _find(self,
|
|
bytype,
|
|
byname,
|
|
byvalue,
|
|
first,
|
|
type_='option',
|
|
_subpath=None,
|
|
check_properties=True,
|
|
raise_if_not_found=True,
|
|
force_permissive=False,
|
|
only_path=undefined,
|
|
only_option=undefined,
|
|
setting_properties=None):
|
|
"""
|
|
convenience method for finding an option that lives only in the subtree
|
|
|
|
:param first: return only one option if True, a list otherwise
|
|
:return: find list or an exception if nothing has been found
|
|
"""
|
|
def _filter_by_value():
|
|
if byvalue is undefined:
|
|
return True
|
|
try:
|
|
value = self.getattr(path,
|
|
force_permissive=force_permissive,
|
|
setting_properties=setting_properties)
|
|
except PropertiesOptionError:
|
|
return False
|
|
if isinstance(value, list):
|
|
return byvalue in value
|
|
else:
|
|
return value == byvalue
|
|
|
|
if type_ not in ('option', 'path', 'value'): # pragma: optional cover
|
|
raise ValueError(_('unknown type_ type {0}'
|
|
'for _find').format(type_))
|
|
find_results = []
|
|
# if value and/or check_properties are set, need all avalaible option
|
|
# If first one has no good value or not good property check second one
|
|
# and so on
|
|
only_first = first is True and byvalue is undefined and \
|
|
check_properties is None
|
|
if only_path is not undefined:
|
|
options = [(only_path, only_option)]
|
|
else:
|
|
options = self.cfgimpl_get_description().impl_get_options_paths(bytype,
|
|
byname,
|
|
_subpath,
|
|
only_first,
|
|
self._cfgimpl_get_context(),
|
|
setting_properties)
|
|
for path, option in options:
|
|
if not _filter_by_value():
|
|
continue
|
|
#remove option with propertyerror, ...
|
|
if check_properties:
|
|
try:
|
|
self.unwrap_from_path(path,
|
|
setting_properties=setting_properties,
|
|
force_permissive=force_permissive)
|
|
except PropertiesOptionError:
|
|
continue
|
|
if type_ == 'value':
|
|
retval = self.getattr(path,
|
|
force_permissive=force_permissive,
|
|
setting_properties=setting_properties)
|
|
elif type_ == 'path':
|
|
retval = path
|
|
elif type_ == 'option':
|
|
retval = option
|
|
if first:
|
|
return retval
|
|
else:
|
|
find_results.append(retval)
|
|
return self._find_return_results(find_results,
|
|
raise_if_not_found)
|
|
|
|
def _find_return_results(self,
|
|
find_results,
|
|
raise_if_not_found):
|
|
if find_results == []: # pragma: optional cover
|
|
if raise_if_not_found:
|
|
raise AttributeError(_("no option found in config"
|
|
" with these criteria"))
|
|
else:
|
|
return find_results
|
|
|
|
def make_dict(self,
|
|
setting_properties,
|
|
flatten=False,
|
|
_currpath=None,
|
|
withoption=None,
|
|
withvalue=undefined,
|
|
force_permissive=False,
|
|
fullpath=False):
|
|
"""exports the whole config into a `dict`, for example:
|
|
|
|
>>> print(cfg.make_dict())
|
|
{'od2.var4': None, 'od2.var5': None, 'od2.var6': None}
|
|
|
|
|
|
|
|
:param flatten: returns a dict(name=value) instead of a dict(path=value)
|
|
::
|
|
|
|
>>> print(cfg.make_dict(flatten=True))
|
|
{'var5': None, 'var4': None, 'var6': None}
|
|
|
|
:param withoption: returns the options that are present in the very same
|
|
`OptionDescription` than the `withoption` itself::
|
|
|
|
>>> print(cfg.make_dict(withoption='var1'))
|
|
{'od2.var4': None, 'od2.var5': None,
|
|
'od2.var6': None,
|
|
'od2.var1': u'value',
|
|
'od1.var1': None,
|
|
'od1.var3': None,
|
|
'od1.var2': None}
|
|
|
|
:param withvalue: returns the options that have the value `withvalue`
|
|
::
|
|
|
|
>>> print(c.make_dict(withoption='var1',
|
|
withvalue=u'value'))
|
|
{'od2.var4': None,
|
|
'od2.var5': None,
|
|
'od2.var6': None,
|
|
'od2.var1': u'value'}
|
|
|
|
:returns: dict of Option's name (or path) and values
|
|
"""
|
|
pathsvalues = []
|
|
if _currpath is None:
|
|
_currpath = []
|
|
if withoption is None and withvalue is not undefined: # pragma: optional cover
|
|
raise ValueError(_("make_dict can't filtering with value without "
|
|
"option"))
|
|
if withoption is not None:
|
|
context = self._cfgimpl_get_context()
|
|
for path in context._find(bytype=None,
|
|
byname=withoption,
|
|
byvalue=withvalue,
|
|
first=False,
|
|
type_='path',
|
|
_subpath=self.cfgimpl_get_path(False),
|
|
force_permissive=force_permissive,
|
|
setting_properties=setting_properties):
|
|
path = '.'.join(path.split('.')[:-1])
|
|
opt = context.unwrap_from_path(path,
|
|
orce_permissive=True)
|
|
mypath = self.cfgimpl_get_path()
|
|
if mypath is not None:
|
|
if mypath == path:
|
|
withoption = None
|
|
withvalue = undefined
|
|
break
|
|
else:
|
|
tmypath = mypath + '.'
|
|
if not path.startswith(tmypath): # pragma: optional cover
|
|
raise AttributeError(_('unexpected path {0}, '
|
|
'should start with {1}'
|
|
'').format(path, mypath))
|
|
path = path[len(tmypath):]
|
|
self._make_sub_dict(opt,
|
|
path,
|
|
pathsvalues,
|
|
_currpath,
|
|
flatten,
|
|
force_permissive=force_permissive,
|
|
setting_properties=setting_properties,
|
|
fullpath=fullpath)
|
|
#withoption can be set to None below !
|
|
if withoption is None:
|
|
for opt in self.cfgimpl_get_description().impl_getchildren(setting_properties):
|
|
path = opt.impl_getname()
|
|
self._make_sub_dict(opt,
|
|
path,
|
|
pathsvalues,
|
|
_currpath,
|
|
flatten,
|
|
force_permissive=force_permissive,
|
|
setting_properties=setting_properties,
|
|
fullpath=fullpath)
|
|
if _currpath == []:
|
|
options = dict(pathsvalues)
|
|
return options
|
|
return pathsvalues
|
|
|
|
def _make_sub_dict(self,
|
|
option,
|
|
path,
|
|
pathsvalues,
|
|
_currpath,
|
|
flatten,
|
|
setting_properties,
|
|
force_permissive=False,
|
|
fullpath=False):
|
|
try:
|
|
if not option.impl_is_optiondescription() and option.impl_is_master_slaves('slave'):
|
|
ret = []
|
|
length = self.cfgimpl_get_length()
|
|
if length:
|
|
for idx in range(length):
|
|
ret.append(self.getattr(path,
|
|
index=idx,
|
|
force_permissive=force_permissive,
|
|
setting_properties=setting_properties))
|
|
elif setting_properties:
|
|
self.cfgimpl_get_settings().validate_properties(option,
|
|
path,
|
|
setting_properties,
|
|
force_permissive=force_permissive)
|
|
else:
|
|
ret = self.getattr(path,
|
|
force_permissive=force_permissive,
|
|
setting_properties=setting_properties)
|
|
except PropertiesOptionError:
|
|
pass
|
|
else:
|
|
if option.impl_is_optiondescription():
|
|
pathsvalues += ret.make_dict(setting_properties,
|
|
flatten=flatten,
|
|
_currpath=_currpath + path.split('.'),
|
|
force_permissive=force_permissive,
|
|
fullpath=fullpath)
|
|
else:
|
|
if flatten:
|
|
name = option.impl_getname()
|
|
elif fullpath:
|
|
#FIXME
|
|
#root_path = self.cfgimpl_get_path()
|
|
#if root_path is None:
|
|
# name = opt.impl_getname()
|
|
#else:
|
|
# name = '.'.join([root_path, opt.impl_getname()])
|
|
name = self._get_subpath(name)
|
|
else:
|
|
name = '.'.join(_currpath + [option.impl_getname()])
|
|
pathsvalues.append((name, ret))
|
|
|
|
def cfgimpl_get_path(self,
|
|
dyn=True):
|
|
descr = self.cfgimpl_get_description()
|
|
if not dyn and descr.impl_is_dynoptiondescription():
|
|
context_descr = self._cfgimpl_get_context().cfgimpl_get_description()
|
|
return context_descr.impl_get_path_by_opt(descr.impl_getopt())
|
|
return self._impl_path
|
|
|
|
|
|
class _CommonConfig(SubConfig):
|
|
"abstract base class for the Config, GroupConfig and the MetaConfig"
|
|
__slots__ = ('_impl_values', '_impl_settings', '_impl_meta', '_impl_test')
|
|
|
|
def _impl_build_all_caches(self,
|
|
force_store_values):
|
|
descr = self.cfgimpl_get_description()
|
|
if not descr.impl_already_build_caches():
|
|
descr._build_cache_option()
|
|
descr._build_cache(self)
|
|
descr.impl_build_force_store_values(self,
|
|
force_store_values)
|
|
|
|
def read_only(self):
|
|
"read only is a global config's setting, see `settings.py`"
|
|
self.cfgimpl_get_settings().read_only()
|
|
|
|
def read_write(self):
|
|
"read write is a global config's setting, see `settings.py`"
|
|
self.cfgimpl_get_settings().read_write()
|
|
|
|
def getowner(self,
|
|
opt,
|
|
index=None,
|
|
force_permissive=False):
|
|
"""convenience method to retrieve an option's owner
|
|
from the config itself
|
|
"""
|
|
if not isinstance(opt, Option) and \
|
|
not isinstance(opt, SymLinkOption) and \
|
|
not isinstance(opt, DynSymLinkOption): # pragma: optional cover
|
|
raise TypeError(_('opt in getowner must be an option not {0}'
|
|
'').format(type(opt)))
|
|
return self.cfgimpl_get_values().getowner(opt,
|
|
index=index,
|
|
force_permissive=force_permissive)
|
|
|
|
def unwrap_from_path(self,
|
|
path,
|
|
setting_properties=None,
|
|
force_permissive=False,
|
|
index=None,
|
|
validate=True,
|
|
validate_properties=True):
|
|
"""convenience method to extract and Option() object from the Config()
|
|
and it is **fast**: finds the option directly in the appropriate
|
|
namespace
|
|
|
|
:returns: Option()
|
|
"""
|
|
if '.' in path:
|
|
self, path = self.cfgimpl_get_home_by_path(path,
|
|
force_permissive=force_permissive,
|
|
validate_properties=validate_properties,
|
|
setting_properties=setting_properties)
|
|
if isinstance(self, Exception):
|
|
return self
|
|
option = self.cfgimpl_get_description().__getattr__(path,
|
|
context=self._cfgimpl_get_context(),
|
|
setting_properties=setting_properties)
|
|
if not validate_properties:
|
|
return option
|
|
else:
|
|
if index is None and option.impl_is_master_slaves('slave'):
|
|
subpath = self._get_subpath(path)
|
|
self.cfgimpl_get_settings().validate_properties(option,
|
|
subpath,
|
|
setting_properties,
|
|
force_permissive=force_permissive)
|
|
return option
|
|
else:
|
|
return self.getattr(path,
|
|
validate=validate,
|
|
force_permissive=force_permissive,
|
|
index=index,
|
|
setting_properties=setting_properties,
|
|
validate_properties=validate_properties,
|
|
returns_option=True)
|
|
|
|
def cfgimpl_get_path(self, dyn=True):
|
|
return None
|
|
|
|
def cfgimpl_get_meta(self):
|
|
if self._impl_meta is not None:
|
|
return self._impl_meta()
|
|
|
|
# information
|
|
def impl_set_information(self, key, value):
|
|
"""updates the information's attribute
|
|
|
|
:param key: information's key (ex: "help", "doc"
|
|
:param value: information's value (ex: "the help string")
|
|
"""
|
|
self._impl_values.set_information(key, value)
|
|
|
|
def impl_get_information(self, key, default=undefined):
|
|
"""retrieves one information's item
|
|
|
|
:param key: the item string (ex: "help")
|
|
"""
|
|
return self._impl_values.get_information(key, default)
|
|
|
|
def impl_del_information(self, key, raises=True):
|
|
self._impl_values.del_information(key, raises)
|
|
|
|
def __getstate__(self):
|
|
raise NotImplementedError()
|
|
|
|
def _gen_fake_values(self):
|
|
fake_config = Config(self._impl_descr,
|
|
persistent=False,
|
|
force_values=get_default_values_storages(),
|
|
force_settings=self.cfgimpl_get_settings())
|
|
fake_config.cfgimpl_get_values()._p_.importation(self.cfgimpl_get_values()._p_.exportation(fake=True))
|
|
return fake_config
|
|
|
|
def duplicate(self,
|
|
session_id=None,
|
|
force_values=None,
|
|
force_settings=None):
|
|
config = Config(self._impl_descr,
|
|
_duplicate=True,
|
|
session_id=session_id,
|
|
force_values=force_values,
|
|
force_settings=force_settings)
|
|
config.cfgimpl_get_values()._p_.importation(self.cfgimpl_get_values()._p_.exportation())
|
|
config.cfgimpl_get_settings()._p_.set_modified_properties(self.cfgimpl_get_settings(
|
|
)._p_.get_modified_properties())
|
|
config.cfgimpl_get_settings()._pp_.set_modified_permissives(self.cfgimpl_get_settings(
|
|
)._pp_.get_modified_permissives())
|
|
return config
|
|
|
|
|
|
# ____________________________________________________________
|
|
class Config(_CommonConfig):
|
|
"main configuration management entry"
|
|
__slots__ = ('__weakref__', '_impl_test', '_impl_name')
|
|
|
|
def __init__(self,
|
|
descr,
|
|
session_id=None,
|
|
persistent=False,
|
|
force_values=None,
|
|
force_settings=None,
|
|
_duplicate=False,
|
|
_force_store_values=True):
|
|
""" Configuration option management master class
|
|
|
|
:param descr: describes the configuration schema
|
|
:type descr: an instance of ``option.OptionDescription``
|
|
:param context: the current root config
|
|
:type context: `Config`
|
|
:param session_id: session ID is import with persistent Config to
|
|
retrieve good session
|
|
:type session_id: `str`
|
|
:param persistent: if persistent, don't delete storage when leaving
|
|
:type persistent: `boolean`
|
|
"""
|
|
if force_settings is not None and force_values is not None:
|
|
if isinstance(force_settings, tuple):
|
|
self._impl_settings = Settings(self,
|
|
force_settings[0],
|
|
force_settings[1])
|
|
else:
|
|
self._impl_settings = force_settings
|
|
self._impl_values = Values(self,
|
|
force_values)
|
|
else:
|
|
properties, permissives, values, session_id = get_storages(self,
|
|
session_id,
|
|
persistent)
|
|
if not valid_name(session_id): # pragma: optional cover
|
|
raise ValueError(_("invalid session ID: {0} for config").format(session_id))
|
|
self._impl_settings = Settings(self,
|
|
properties,
|
|
permissives)
|
|
self._impl_values = Values(self,
|
|
values)
|
|
super(Config, self).__init__(descr,
|
|
weakref.ref(self),
|
|
undefined,
|
|
True,
|
|
False)
|
|
self._impl_meta = None
|
|
#undocumented option used only in test script
|
|
self._impl_test = False
|
|
if _duplicate is False and (force_settings is None or force_values is None):
|
|
self._impl_build_all_caches(_force_store_values)
|
|
self._impl_name = session_id
|
|
|
|
def impl_getname(self):
|
|
return self._impl_name
|
|
|
|
def impl_getsessionid(self):
|
|
return self._impl_values._p_._storage.session_id
|
|
|
|
|
|
class GroupConfig(_CommonConfig):
|
|
__slots__ = ('__weakref__',
|
|
'_impl_children',
|
|
'_impl_name')
|
|
|
|
def __init__(self,
|
|
children,
|
|
session_id=None,
|
|
persistent=False,
|
|
_descr=None):
|
|
if not isinstance(children, list):
|
|
raise ValueError(_("groupconfig's children must be a list"))
|
|
names = []
|
|
for child in children:
|
|
if not isinstance(child,
|
|
_CommonConfig):
|
|
raise ValueError(_("groupconfig's children must be Config, MetaConfig or GroupConfig"))
|
|
name_ = child._impl_name
|
|
names.append(name_)
|
|
if len(names) != len(set(names)):
|
|
for idx in xrange(1, len(names) + 1):
|
|
name = names.pop(0)
|
|
if name in names:
|
|
raise ConflictError(_('config name must be uniq in '
|
|
'groupconfig for {0}').format(name))
|
|
self._impl_children = children
|
|
properties, permissives, values, session_id = get_storages(self,
|
|
session_id,
|
|
persistent)
|
|
self._impl_settings = Settings(self,
|
|
properties,
|
|
permissives)
|
|
self._impl_values = Values(self, values)
|
|
super(GroupConfig, self).__init__(_descr,
|
|
weakref.ref(self),
|
|
undefined,
|
|
True,
|
|
False)
|
|
self._impl_meta = None
|
|
#undocumented option used only in test script
|
|
self._impl_test = False
|
|
self._impl_name = session_id
|
|
|
|
def cfgimpl_get_children(self):
|
|
return self._impl_children
|
|
|
|
def cfgimpl_reset_cache(self,
|
|
only_expired=False,
|
|
opt=None,
|
|
path=None,
|
|
resetted_opts=set()):
|
|
if isinstance(self, MetaConfig):
|
|
super(GroupConfig, self).cfgimpl_reset_cache(only_expired=only_expired,
|
|
opt=opt,
|
|
path=path,
|
|
resetted_opts=copy(resetted_opts))
|
|
for child in self._impl_children:
|
|
child.cfgimpl_reset_cache(only_expired=only_expired,
|
|
opt=opt,
|
|
path=path,
|
|
resetted_opts=copy(resetted_opts))
|
|
|
|
def set_value(self, path, value, _commit=True):
|
|
"""Setattr not in current GroupConfig, but in each children
|
|
"""
|
|
ret = []
|
|
for child in self._impl_children:
|
|
if isinstance(child, MetaConfig):
|
|
ret.extend(child.set_value(path,
|
|
value,
|
|
only_config=True,
|
|
_commit=False))
|
|
elif isinstance(child, GroupConfig):
|
|
ret.extend(child.set_value(path,
|
|
value,
|
|
_commit=False))
|
|
else:
|
|
childret = child.setattr(path,
|
|
value,
|
|
not_raises=True,
|
|
_commit=False)
|
|
if childret is not None:
|
|
ret.append(childret)
|
|
if _commit:
|
|
self.cfgimpl_get_values()._p_.commit()
|
|
return ret
|
|
|
|
|
|
def find_firsts(self,
|
|
byname=None,
|
|
bypath=undefined,
|
|
byoption=undefined,
|
|
byvalue=undefined,
|
|
raise_if_not_found=True,
|
|
_sub=False,
|
|
check_properties=True):
|
|
"""Find first not in current GroupConfig, but in each children
|
|
"""
|
|
ret = []
|
|
|
|
#if MetaConfig, all children have same OptionDescription in context
|
|
#so search only one time the option for all children
|
|
if bypath is undefined and byname is not None and \
|
|
isinstance(self,
|
|
MetaConfig):
|
|
bypath = self._find(bytype=None,
|
|
byvalue=undefined,
|
|
byname=byname,
|
|
first=True,
|
|
type_='path',
|
|
check_properties=None,
|
|
raise_if_not_found=raise_if_not_found)
|
|
byname = None
|
|
byoption = self.cfgimpl_get_description().impl_get_opt_by_path(bypath)
|
|
|
|
for child in self._impl_children:
|
|
if isinstance(child, GroupConfig):
|
|
ret.extend(child.find_firsts(byname=byname,
|
|
bypath=bypath,
|
|
byoption=byoption,
|
|
byvalue=byvalue,
|
|
check_properties=check_properties,
|
|
raise_if_not_found=False,
|
|
_sub=True))
|
|
elif child._find(None,
|
|
byname,
|
|
byvalue,
|
|
first=True,
|
|
type_='path',
|
|
raise_if_not_found=False,
|
|
check_properties=check_properties,
|
|
only_path=bypath,
|
|
only_option=byoption):
|
|
ret.append(child)
|
|
if _sub:
|
|
return ret
|
|
else:
|
|
return GroupConfig(self._find_return_results(ret,
|
|
raise_if_not_found))
|
|
|
|
def __str__(self):
|
|
ret = ''
|
|
for child in self._impl_children:
|
|
ret += "({0})\n".format(child._impl_name)
|
|
if self._impl_descr is not None:
|
|
ret += super(GroupConfig, self).__str__()
|
|
return ret
|
|
|
|
__repr__ = __str__
|
|
|
|
def getconfig(self,
|
|
name):
|
|
for child in self._impl_children:
|
|
if name == child.impl_getname():
|
|
return child
|
|
raise ConfigError(_('unknown config {}').format(name))
|
|
|
|
|
|
class MetaConfig(GroupConfig):
|
|
__slots__ = tuple()
|
|
|
|
def __init__(self,
|
|
children,
|
|
session_id=None,
|
|
persistent=False,
|
|
optiondescription=None,
|
|
_force_store_values=True):
|
|
descr = None
|
|
if optiondescription is not None:
|
|
new_children = []
|
|
for child_session_id in children:
|
|
new_children.append(Config(optiondescription,
|
|
persistent=persistent,
|
|
session_id=child_session_id,
|
|
_force_store_values=_force_store_values))
|
|
children = new_children
|
|
for child in children:
|
|
if not isinstance(child, _CommonConfig):
|
|
raise TypeError(_("metaconfig's children "
|
|
"should be config, not {0}"
|
|
).format(type(child)))
|
|
if child.cfgimpl_get_meta() is not None:
|
|
raise ValueError(_("child has already a metaconfig's"))
|
|
if descr is None:
|
|
descr = child.cfgimpl_get_description()
|
|
elif not descr is child.cfgimpl_get_description():
|
|
raise ValueError(_('all config in metaconfig must '
|
|
'have the same optiondescription'))
|
|
child._impl_meta = weakref.ref(self)
|
|
|
|
super(MetaConfig, self).__init__(children,
|
|
session_id,
|
|
persistent,
|
|
descr)
|
|
|
|
def set_value(self,
|
|
path,
|
|
value,
|
|
force_default=False,
|
|
force_dont_change_value=False,
|
|
force_default_if_same=False,
|
|
only_config=False,
|
|
_commit=True):
|
|
"""only_config: could be set if you want modify value in all Config included in
|
|
this MetaConfig
|
|
"""
|
|
if only_config:
|
|
if force_default or force_default_if_same or force_dont_change_value:
|
|
raise ValueError(_('force_default, force_default_if_same or '
|
|
'force_dont_change_value cannot be set with'
|
|
' only_config'))
|
|
return super(MetaConfig, self).set_value(path,
|
|
value,
|
|
_commit=_commit)
|
|
ret = []
|
|
if force_default or force_default_if_same or force_dont_change_value:
|
|
if force_default and force_dont_change_value:
|
|
raise ValueError(_('force_default and force_dont_change_value'
|
|
' cannot be set together'))
|
|
opt = self.cfgimpl_get_description().impl_get_opt_by_path(path)
|
|
setting_properties = self.cfgimpl_get_settings()._getproperties(read_write=False)
|
|
for child in self._impl_children:
|
|
if force_default_if_same or force_default:
|
|
if force_default_if_same:
|
|
if not child.cfgimpl_get_values()._contains(path):
|
|
child_value = undefined
|
|
else:
|
|
child_value = child.getattr(path)
|
|
if force_default or value == child_value:
|
|
child.cfgimpl_get_values().reset(opt,
|
|
path,
|
|
setting_properties,
|
|
validate=False,
|
|
_commit=False)
|
|
continue
|
|
if force_dont_change_value:
|
|
child_value = child.getattr(path,
|
|
setting_properties=setting_properties)
|
|
if isinstance(child_value, Exception):
|
|
ret.append(child_value)
|
|
elif value != child_value:
|
|
childret = child.setattr(path,
|
|
child_value,
|
|
_commit=False,
|
|
not_raises=True)
|
|
if childret is not None: # pragma: no cover
|
|
ret.append(childret)
|
|
|
|
self.setattr(path,
|
|
value,
|
|
_commit=_commit)
|
|
return ret
|
|
|
|
def reset(self, path):
|
|
opt = self.cfgimpl_get_description().impl_get_opt_by_path(path)
|
|
setting_properties = self.cfgimpl_get_settings()._getproperties(read_write=False)
|
|
for child in self._impl_children:
|
|
child.cfgimpl_get_values().reset(opt,
|
|
path,
|
|
setting_properties,
|
|
validate=False,
|
|
_commit=False)
|
|
self.cfgimpl_get_values().reset(opt,
|
|
path,
|
|
setting_properties,
|
|
validate=False)
|
|
|
|
def new_config(self,
|
|
session_id,
|
|
persistent=False):
|
|
config = Config(self._impl_descr,
|
|
session_id=session_id,
|
|
persistent=persistent)
|
|
|
|
if config._impl_name in [child._impl_name for child in self._impl_children]: # pragma: no cover
|
|
raise ConflictError(_('config name must be uniq in '
|
|
'groupconfig for {0}').format(config._impl_name))
|
|
config._impl_meta = weakref.ref(self)
|
|
self._impl_children.append(config)
|
|
return config
|