improve GroupConfig/MetaConfig

add set_value in GroupConfig
This commit is contained in:
Emmanuel Garette 2014-12-01 21:49:50 +01:00
parent c6c331eb54
commit 2ccf92f879
6 changed files with 580 additions and 197 deletions

View file

@ -128,8 +128,8 @@ def test_deref_groupconfig():
i1 = IntOption('i1', '')
od1 = OptionDescription('od1', '', [i1])
od2 = OptionDescription('od2', '', [od1])
conf1 = Config(od2)
conf2 = Config(od2)
conf1 = Config(od2, 'conf1')
conf2 = Config(od2, 'conf2')
meta = GroupConfig([conf1, conf2])
w = weakref.ref(conf1)
del(conf1)
@ -142,8 +142,8 @@ def test_deref_metaconfig():
i1 = IntOption('i1', '')
od1 = OptionDescription('od1', '', [i1])
od2 = OptionDescription('od2', '', [od1])
conf1 = Config(od2)
conf2 = Config(od2)
conf1 = Config(od2, 'conf1')
conf2 = Config(od2, 'conf2')
meta = MetaConfig([conf1, conf2])
w = weakref.ref(conf1)
del(conf1)

View file

@ -2,10 +2,10 @@ import autopath
from py.test import raises
from tiramisu.setting import owners
from tiramisu.setting import groups, owners
from tiramisu.config import Config, GroupConfig, MetaConfig
from tiramisu.option import IntOption, OptionDescription
from tiramisu.error import ConfigError, PropertiesOptionError
from tiramisu.option import IntOption, StrOption, OptionDescription
from tiramisu.error import ConfigError, ConflictError
owners.addowner('meta')
@ -19,8 +19,8 @@ def make_description():
i6 = IntOption('i6', '', properties=('disabled',))
od1 = OptionDescription('od1', '', [i1, i2, i3, i4, i5, i6])
od2 = OptionDescription('od2', '', [od1])
conf1 = Config(od2)
conf2 = Config(od2)
conf1 = Config(od2, name='conf1')
conf2 = Config(od2, name='conf2')
conf1.read_write()
conf2.read_write()
meta = MetaConfig([conf1, conf2])
@ -94,7 +94,7 @@ def test_contexts():
conf1, conf2 = meta.cfgimpl_get_children()
assert conf1.od1.i2 == conf2.od1.i2 == 1
assert conf1.getowner(conf1.unwrap_from_path('od1.i2')) is conf2.getowner(conf2.unwrap_from_path('od1.i2')) is owners.default
meta.setattrs('od1.i2', 6)
meta.set_value('od1.i2', 6, only_config=True)
assert meta.od1.i2 == 1
assert conf1.od1.i2 == conf2.od1.i2 == 6
assert conf1.getowner(conf1.unwrap_from_path('od1.i2')) is conf2.getowner(conf2.unwrap_from_path('od1.i2')) is owners.user
@ -109,6 +109,11 @@ def test_find():
'od1.i2': 1, 'od1.i5': [2], 'od1.i6': None}
def test_group_error():
raises(ValueError, "GroupConfig('str')")
raises(ValueError, "GroupConfig(['str'])")
def test_meta_meta():
meta1 = make_description()
meta2 = MetaConfig([meta1])
@ -148,22 +153,17 @@ def test_meta_meta_set():
meta2 = MetaConfig([meta1])
meta2.cfgimpl_get_settings().setowner(owners.meta)
conf1, conf2 = meta1.cfgimpl_get_children()
meta2.setattrs('od1.i1', 7)
meta2.set_value('od1.i1', 7, only_config=True)
#PropertiesOptionError
meta2.setattrs('od1.i6', 7)
meta2.set_value('od1.i6', 7, only_config=True)
assert conf1.od1.i1 == conf2.od1.i1 == 7
assert conf1.getowner(conf1.unwrap_from_path('od1.i1')) is conf2.getowner(conf2.unwrap_from_path('od1.i1')) is owners.user
assert [conf1, conf2] == meta2.find_firsts(byname='i1', byvalue=7, type_='config')
assert ['od1.i1', 'od1.i1'] == meta2.find_firsts(byname='i1', byvalue=7, type_='path')
assert [conf1, conf2] == meta2.find_firsts(byname='i1', byvalue=7).cfgimpl_get_children()
conf1.od1.i1 = 8
assert [conf1, conf2] == meta2.find_firsts(byname='i1', type_='config')
assert ['od1.i1', 'od1.i1'] == meta2.find_firsts(byname='i1', type_='path')
assert [conf2] == meta2.find_firsts(byname='i1', byvalue=7, type_='config')
assert ['od1.i1'] == meta2.find_firsts(byname='i1', byvalue=7, type_='path')
assert [conf1] == meta2.find_firsts(byname='i1', byvalue=8, type_='config')
assert ['od1.i1'] == meta2.find_firsts(byname='i1', byvalue=8, type_='path')
assert [conf1, conf2] == meta2.find_firsts(byname='i5', byvalue=2, type_='config')
assert ['od1.i5', 'od1.i5'] == meta2.find_firsts(byname='i5', byvalue=2, type_='path')
assert [conf1, conf2] == meta2.find_firsts(byname='i1').cfgimpl_get_children()
assert [conf2] == meta2.find_firsts(byname='i1', byvalue=7).cfgimpl_get_children()
assert [conf1] == meta2.find_firsts(byname='i1', byvalue=8).cfgimpl_get_children()
assert [conf1, conf2] == meta2.find_firsts(byname='i5', byvalue=2).cfgimpl_get_children()
raises(AttributeError, "meta2.find_firsts(byname='i1', byvalue=10)")
raises(AttributeError, "meta2.find_firsts(byname='not', byvalue=10)")
raises(AttributeError, "meta2.find_firsts(byname='i6')")
@ -173,25 +173,29 @@ def test_not_meta():
i1 = IntOption('i1', '')
od1 = OptionDescription('od1', '', [i1])
od2 = OptionDescription('od2', '', [od1])
conf1 = Config(od2)
conf2 = Config(od2)
conf1 = Config(od2, name='conf1')
conf2 = Config(od2, name='conf2')
conf3 = Config(od2)
conf4 = Config(od2, name='conf2')
raises(ValueError, "GroupConfig(conf1)")
meta = GroupConfig([conf1, conf2])
raises(ConfigError, 'meta.od1.i1')
conf1, conf2 = meta.cfgimpl_get_children()
meta.setattrs('od1.i1', 7)
assert conf1.od1.i1 == conf2.od1.i1 == 7
assert conf1.getowner(conf1.unwrap_from_path('od1.i1')) is conf2.getowner(conf2.unwrap_from_path('od1.i1')) is owners.user
#same name
raises(ConflictError, "GroupConfig([conf2, conf4])")
grp = GroupConfig([conf1, conf2])
raises(ConfigError, 'grp.od1.i1')
conf1, conf2 = grp.cfgimpl_get_children()
grp.set_value('od1.i1', 7)
assert grp.conf1.od1.i1 == conf2.od1.i1 == 7
assert grp.conf1.getowner(grp.conf1.unwrap_from_path('od1.i1')) is grp.conf2.getowner(grp.conf2.unwrap_from_path('od1.i1')) is owners.user
def test_group_find_firsts():
i1 = IntOption('i1', '')
od1 = OptionDescription('od1', '', [i1])
od2 = OptionDescription('od2', '', [od1])
conf1 = Config(od2)
conf2 = Config(od2)
meta = GroupConfig([conf1, conf2])
conf1, conf2 = meta.find_firsts(byname='i1')
conf1 = Config(od2, name='conf1')
conf2 = Config(od2, name='conf2')
grp = GroupConfig([conf1, conf2])
assert [conf1, conf2] == grp.find_firsts(byname='i1').cfgimpl_get_children()
def test_meta_path():
@ -207,12 +211,251 @@ def test_meta_unconsistent():
i4 = IntOption('i4', '', default=2)
od1 = OptionDescription('od1', '', [i1, i2, i3, i4])
od2 = OptionDescription('od2', '', [od1])
conf1 = Config(od2)
conf2 = Config(od2)
conf3 = Config(od2)
conf4 = Config(od1)
conf1 = Config(od2, name='conf1')
conf2 = Config(od2, name='conf2')
conf3 = Config(od2, name='conf3')
conf4 = Config(od1, name='conf4')
meta = MetaConfig([conf1, conf2])
meta.cfgimpl_get_settings().setowner(owners.meta)
raises(TypeError, 'MetaConfig("string")')
#same descr but conf1 already in meta
raises(ValueError, "MetaConfig([conf1, conf3])")
#not same descr
raises(ValueError, "MetaConfig([conf3, conf4])")
def test_meta_master_slaves():
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
interface1.impl_set_group_type(groups.master)
conf1 = Config(interface1, name='conf1')
conf2 = Config(interface1, name='conf2')
meta = MetaConfig([conf1, conf2])
meta.conf1.read_only()
meta.conf2.read_only()
assert [conf1, conf2] == meta.find_firsts(byname='netmask_admin_eth0').cfgimpl_get_children()
meta.conf1.read_write()
meta.conf2.read_only()
assert [conf2] == meta.find_firsts(byname='netmask_admin_eth0').cfgimpl_get_children()
meta.conf2.read_write()
raises(AttributeError, "meta.find_firsts(byname='netmask_admin_eth0')")
assert [conf1, conf2] == meta.find_firsts(byname='netmask_admin_eth0',
check_properties=None).cfgimpl_get_children()
meta.conf1.read_only()
meta.conf2.read_only()
meta.read_write()
assert [conf1, conf2] == meta.find_firsts(byname='netmask_admin_eth0').cfgimpl_get_children()
def test_meta_master_slaves_value():
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
interface1.impl_set_group_type(groups.master)
conf1 = Config(interface1, name='conf1')
conf2 = Config(interface1, name='conf2')
meta = MetaConfig([conf1, conf2])
meta.conf1.ip_admin_eth0 = ['192.168.1.1']
assert meta.conf1.netmask_admin_eth0 == [None]
del(meta.conf1.ip_admin_eth0)
assert meta.conf1.netmask_admin_eth0 == []
meta.ip_admin_eth0 = ['192.168.1.1']
assert meta.conf1.netmask_admin_eth0 == [None]
meta.netmask_admin_eth0 = ['255.255.255.0']
assert meta.conf1.netmask_admin_eth0 == ['255.255.255.0']
meta.netmask_admin_eth0 = ['255.255.0.0']
assert meta.conf1.netmask_admin_eth0 == ['255.255.0.0']
meta.conf1.ip_admin_eth0 = ['192.168.1.1']
assert meta.conf1.netmask_admin_eth0 == [None]
def test_meta_master_slaves_owners():
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
interface1.impl_set_group_type(groups.master)
conf1 = Config(interface1, name='conf1')
conf2 = Config(interface1, name='conf2')
meta = MetaConfig([conf1, conf2])
meta.cfgimpl_get_settings().setowner(owners.meta)
assert meta.conf1.getowner(ip_admin_eth0) == owners.default
assert meta.conf1.getowner(netmask_admin_eth0) == owners.default
meta.conf1.ip_admin_eth0 = ['192.168.1.1']
assert meta.conf1.getowner(ip_admin_eth0) == owners.user
assert meta.conf1.getowner(netmask_admin_eth0) == owners.default
del(meta.conf1.ip_admin_eth0)
assert meta.conf1.getowner(ip_admin_eth0) == owners.default
assert meta.conf1.getowner(netmask_admin_eth0) == owners.default
meta.ip_admin_eth0 = ['192.168.1.1']
assert meta.conf1.getowner(ip_admin_eth0) == owners.meta
assert meta.conf1.getowner(netmask_admin_eth0) == owners.default
meta.netmask_admin_eth0 = ['255.255.255.0']
assert meta.conf1.getowner(ip_admin_eth0) == owners.meta
assert meta.conf1.getowner(netmask_admin_eth0) == owners.meta
meta.netmask_admin_eth0 = ['255.255.0.0']
assert meta.conf1.getowner(ip_admin_eth0) == owners.meta
assert meta.conf1.getowner(netmask_admin_eth0) == owners.meta
meta.conf1.ip_admin_eth0 = ['192.168.1.1']
assert meta.conf1.getowner(ip_admin_eth0) == owners.user
assert meta.conf1.getowner(netmask_admin_eth0) == owners.default
def test_meta_force_default():
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
interface1.impl_set_group_type(groups.master)
conf1 = Config(interface1, name='conf1')
conf1.read_write()
conf2 = Config(interface1, name='conf2')
conf2.read_write()
meta = MetaConfig([conf1, conf2])
meta.read_write()
meta.cfgimpl_get_settings().setowner(owners.meta)
assert meta.ip_admin_eth0 == []
assert meta.conf1.ip_admin_eth0 == []
assert meta.conf2.ip_admin_eth0 == []
meta.set_value('ip_admin_eth0', ['192.168.1.1'])
assert meta.ip_admin_eth0 == ['192.168.1.1']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.1']
assert meta.conf2.ip_admin_eth0 == ['192.168.1.1']
meta.conf1.ip_admin_eth0 = ['192.168.1.2']
assert meta.ip_admin_eth0 == ['192.168.1.1']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.2']
assert meta.conf2.ip_admin_eth0 == ['192.168.1.1']
meta.set_value('ip_admin_eth0', ['192.168.1.3'])
assert meta.ip_admin_eth0 == ['192.168.1.3']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.2']
assert meta.conf2.ip_admin_eth0 == ['192.168.1.3']
meta.set_value('ip_admin_eth0', ['192.168.1.4'], force_default=True)
assert meta.ip_admin_eth0 == ['192.168.1.4']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
assert meta.conf2.ip_admin_eth0 == ['192.168.1.4']
def test_meta_force_dont_change_value():
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
interface1.impl_set_group_type(groups.master)
conf1 = Config(interface1, name='conf1')
conf1.read_write()
conf2 = Config(interface1, name='conf2')
conf2.read_write()
meta = MetaConfig([conf1, conf2])
meta.read_write()
meta.cfgimpl_get_settings().setowner(owners.meta)
assert meta.ip_admin_eth0 == []
assert meta.conf1.ip_admin_eth0 == []
assert meta.conf2.ip_admin_eth0 == []
meta.conf1.ip_admin_eth0 = ['192.168.1.4']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
assert meta.conf2.ip_admin_eth0 == []
assert conf1.getowner(ip_admin_eth0) is owners.user
assert conf2.getowner(ip_admin_eth0) is owners.default
meta.set_value('ip_admin_eth0', ['192.168.1.4'], force_dont_change_value=True)
assert meta.ip_admin_eth0 == ['192.168.1.4']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
assert meta.conf2.ip_admin_eth0 == []
assert conf1.getowner(ip_admin_eth0) is owners.user
assert conf2.getowner(ip_admin_eth0) is owners.user
def test_meta_force_default_if_same():
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
interface1.impl_set_group_type(groups.master)
conf1 = Config(interface1, name='conf1')
conf1.read_write()
conf2 = Config(interface1, name='conf2')
conf2.read_write()
meta = MetaConfig([conf1, conf2])
meta.read_write()
meta.cfgimpl_get_settings().setowner(owners.meta)
#
assert meta.ip_admin_eth0 == []
assert meta.conf1.ip_admin_eth0 == []
assert meta.conf2.ip_admin_eth0 == []
#
meta.conf1.ip_admin_eth0 = ['192.168.1.4']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
assert meta.conf2.ip_admin_eth0 == []
assert conf1.getowner(ip_admin_eth0) is owners.user
assert conf2.getowner(ip_admin_eth0) is owners.default
meta.set_value('ip_admin_eth0', ['192.168.1.4'], force_default_if_same=True)
assert meta.ip_admin_eth0 == ['192.168.1.4']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
assert meta.conf2.ip_admin_eth0 == ['192.168.1.4']
assert conf1.getowner(ip_admin_eth0) is owners.meta
assert conf2.getowner(ip_admin_eth0) is owners.meta
#
meta.conf1.ip_admin_eth0 = ['192.168.1.3']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.3']
assert meta.conf2.ip_admin_eth0 == ['192.168.1.4']
assert conf1.getowner(ip_admin_eth0) is owners.user
assert conf2.getowner(ip_admin_eth0) is owners.meta
meta.set_value('ip_admin_eth0', ['192.168.1.5'], force_default_if_same=True)
assert meta.ip_admin_eth0 == ['192.168.1.5']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.3']
assert meta.conf2.ip_admin_eth0 == ['192.168.1.5']
assert conf1.getowner(ip_admin_eth0) is owners.user
assert conf2.getowner(ip_admin_eth0) is owners.meta
def test_meta_force_default_if_same_and_dont_change():
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
interface1.impl_set_group_type(groups.master)
conf1 = Config(interface1, name='conf1')
conf1.read_write()
conf2 = Config(interface1, name='conf2')
conf2.read_write()
meta = MetaConfig([conf1, conf2])
meta.read_write()
meta.cfgimpl_get_settings().setowner(owners.meta)
#
assert meta.ip_admin_eth0 == []
assert meta.conf1.ip_admin_eth0 == []
assert meta.conf2.ip_admin_eth0 == []
#
meta.conf1.ip_admin_eth0 = ['192.168.1.4']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
assert meta.conf2.ip_admin_eth0 == []
assert conf1.getowner(ip_admin_eth0) is owners.user
assert conf2.getowner(ip_admin_eth0) is owners.default
meta.set_value('ip_admin_eth0', ['192.168.1.4'], force_default_if_same=True, force_dont_change_value=True)
assert meta.ip_admin_eth0 == ['192.168.1.4']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.4']
assert meta.conf2.ip_admin_eth0 == []
assert conf1.getowner(ip_admin_eth0) is owners.meta
assert conf2.getowner(ip_admin_eth0) is owners.user
#
meta.conf1.ip_admin_eth0 = ['192.168.1.3']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.3']
assert meta.conf2.ip_admin_eth0 == []
assert conf1.getowner(ip_admin_eth0) is owners.user
assert conf2.getowner(ip_admin_eth0) is owners.user
meta.set_value('ip_admin_eth0', ['192.168.1.5'], force_default_if_same=True, force_dont_change_value=True)
assert meta.ip_admin_eth0 == ['192.168.1.5']
assert meta.conf1.ip_admin_eth0 == ['192.168.1.3']
assert meta.conf2.ip_admin_eth0 == []
assert conf1.getowner(ip_admin_eth0) is owners.user
assert conf2.getowner(ip_admin_eth0) is owners.user
def test_meta_force_default_and_dont_change():
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip", multi=True)
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "mask", multi=True, properties=('hidden',))
interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
interface1.impl_set_group_type(groups.master)
conf1 = Config(interface1, name='conf1')
conf1.read_write()
conf2 = Config(interface1, name='conf2')
conf2.read_write()
meta = MetaConfig([conf1, conf2])
meta.read_write()
meta.cfgimpl_get_settings().setowner(owners.meta)
raises(ValueError, "meta.set_value('ip_admin_eth0', ['192.168.1.4'], force_default=True, force_dont_change_value=True)")

View file

@ -20,9 +20,10 @@
# ____________________________________________________________
"options handler global entry point"
import weakref
from tiramisu.error import PropertiesOptionError, ConfigError
from tiramisu.error import PropertiesOptionError, ConfigError, ConflictError
from tiramisu.option import OptionDescription, Option, SymLinkOption, \
DynSymLinkOption
from tiramisu.option.baseoption import valid_name
from tiramisu.setting import groups, Settings, default_encoding, undefined
from tiramisu.storage import get_storages, get_storage, set_storage, \
_impl_getstate_setting
@ -598,9 +599,10 @@ class _CommonConfig(SubConfig):
# ____________________________________________________________
class Config(_CommonConfig):
"main configuration management entry"
__slots__ = ('__weakref__', '_impl_test')
__slots__ = ('__weakref__', '_impl_test', '_impl_name')
def __init__(self, descr, session_id=None, persistent=False):
def __init__(self, descr, session_id=None, persistent=False,
name=undefined):
""" Configuration option management master class
:param descr: describes the configuration schema
@ -621,6 +623,13 @@ class Config(_CommonConfig):
self._impl_meta = None
#undocumented option used only in test script
self._impl_test = False
if name is undefined:
name = 'config'
if session_id is not None:
name += session_id
if name is not None and not valid_name(name): # pragma: optional cover
raise ValueError(_("invalid name: {0} for config").format(name))
self._impl_name = name
def cfgimpl_reset_cache(self,
only_expired=False,
@ -630,14 +639,36 @@ class Config(_CommonConfig):
if 'settings' in only:
self.cfgimpl_get_settings().reset_cache(only_expired=only_expired)
def impl_getname(self):
return self._impl_name
def impl_setname(self, name):
self._impl_name = name
class GroupConfig(_CommonConfig):
__slots__ = ('_impl_children', '__weakref__')
__slots__ = ('__weakref__', '_impl_children', '_impl_name')
def __init__(self, children, session_id=None, persistent=False,
_descr=None):
_descr=None, name=undefined):
if not isinstance(children, list):
raise ValueError(_("metaconfig's children must be a list"))
raise ValueError(_("groupconfig's children must be a list"))
names = []
for child in children:
if not isinstance(child, _CommonConfig):
raise ValueError(_("groupconfig's children must be Config, MetaConfig or GroupConfig"))
name = child._impl_name
if name is None:
raise ValueError(_('name must be set to config before creating groupconfig'))
#if name in names:
# raise ValueError(_('config name must be uniq in groupconfig'))
names.append(name)
if len(names) != len(set(names)):
for idx in xrange(1, len(names) + 1):
name = names.pop(0)
if name in names:
raise ConflictError(_('config name must be uniq in '
'groupconfig for {0}').format(name))
self._impl_children = children
settings, values = get_storages(self, session_id, persistent)
self._impl_settings = Settings(self, settings)
@ -646,6 +677,9 @@ class GroupConfig(_CommonConfig):
self._impl_meta = None
#undocumented option used only in test script
self._impl_test = False
if name is undefined:
name = session_id
self._impl_name = name
def cfgimpl_get_children(self):
return self._impl_children
@ -653,7 +687,7 @@ class GroupConfig(_CommonConfig):
#def cfgimpl_get_context(self):
# "a meta config is a config which has a setting, that is itself"
# return self
#
def cfgimpl_reset_cache(self,
only_expired=False,
only=('values', 'settings')):
@ -664,74 +698,87 @@ class GroupConfig(_CommonConfig):
for child in self._impl_children:
child.cfgimpl_reset_cache(only_expired=only_expired, only=only)
def setattrs(self, path, value):
def set_value(self, path, value):
"""Setattr not in current GroupConfig, but in each children
"""
for child in self._impl_children:
try:
if not isinstance(child, GroupConfig):
setattr(child, path, value)
if isinstance(child, MetaConfig):
child.set_value(path, value, only_config=True)
elif isinstance(child, GroupConfig):
child.set_value(path, value)
else:
child.setattrs(path, value)
setattr(child, path, value)
except PropertiesOptionError:
pass
def find_firsts(self, byname=None, bypath=undefined, byoption=undefined,
byvalue=undefined, type_='option', display_error=True):
byvalue=undefined, display_error=True, _sub=False,
check_properties=True):
"""Find first not in current GroupConfig, but in each children
"""
ret = []
#if MetaConfig, all children have same OptionDescription in context
#so search only one time the option for all children
try:
if bypath is undefined and byname is not None and \
isinstance(self, MetaConfig):
bypath = self._find(bytype=None, byvalue=undefined, byname=byname,
first=True, type_='path',
check_properties=False,
display_error=display_error)
byname = None
byoption = self.cfgimpl_get_description(
).impl_get_opt_by_path(bypath)
except AttributeError:
return self._find_return_results([], True)
if bypath is undefined and byname is not None and \
isinstance(self, MetaConfig):
bypath = self._find(bytype=None, byvalue=undefined, byname=byname,
first=True, type_='path',
check_properties=None,
display_error=display_error)
byname = None
byoption = self.cfgimpl_get_description(
).impl_get_opt_by_path(bypath)
for child in self._impl_children:
try:
if isinstance(child, GroupConfig):
ret.extend(child.find_firsts(byname=byname,
bypath=bypath,
ret.extend(child.find_firsts(byname=byname, bypath=bypath,
byoption=byoption,
byvalue=byvalue,
type_=type_,
display_error=False))
check_properties=check_properties,
display_error=False,
_sub=True))
else:
if type_ == 'config':
f_type = 'path'
else:
f_type = type_
f_ret = child._find(None, byname, byvalue, first=True,
type_=f_type, display_error=False,
only_path=bypath,
only_option=byoption)
if type_ == 'config':
ret.append(child)
else:
ret.append(f_ret)
child._find(None, byname, byvalue, first=True,
type_='path', display_error=False,
check_properties=check_properties,
only_path=bypath, only_option=byoption)
ret.append(child)
except AttributeError:
pass
return self._find_return_results(ret, display_error)
if _sub:
return ret
else:
return GroupConfig(self._find_return_results(ret, display_error))
def __repr__(self):
return object.__repr__(self)
def __str__(self):
return object.__str__(self)
ret = ''
for child in self._impl_children:
ret += '({0})\n'.format(child._impl_name)
try:
ret += super(GroupConfig, self).__str__()
except ConfigError:
pass
return ret
def getattr(self, name, force_permissive=False, validate=True):
for child in self._impl_children:
if name == child._impl_name:
return child
return super(GroupConfig, self).getattr(name, force_permissive,
validate)
class MetaConfig(GroupConfig):
__slots__ = tuple()
def __init__(self, children, session_id=None, persistent=False):
def __init__(self, children, session_id=None, persistent=False,
name=undefined):
descr = None
for child in children:
if not isinstance(child, _CommonConfig):
@ -747,4 +794,37 @@ class MetaConfig(GroupConfig):
'have the same optiondescription'))
child._impl_meta = weakref.ref(self)
super(MetaConfig, self).__init__(children, session_id, persistent, descr)
super(MetaConfig, self).__init__(children, session_id, persistent,
descr, name)
def set_value(self, path, value, force_default=False,
force_dont_change_value=False, force_default_if_same=False,
only_config=False):
if only_config:
if force_default or force_default_if_same or force_dont_change_value:
raise ValueError(_('force_default, force_default_if_same or '
'force_dont_change_value cannot be set with'
' only_config'))
return super(MetaConfig, self).set_value(path, value)
if force_default or force_default_if_same or force_dont_change_value:
if force_default and force_dont_change_value:
raise ValueError(_('force_default and force_dont_change_value'
' cannot be set together'))
opt = self.cfgimpl_get_description().impl_get_opt_by_path(path)
for child in self._impl_children:
if force_default_if_same or force_default:
if force_default_if_same:
if not child.cfgimpl_get_values()._contains(path):
child_value = undefined
else:
child_value = child.getattr(path)
if force_default or value == child_value:
child.cfgimpl_get_values().reset(opt, path=path,
validate=False)
continue
if force_dont_change_value:
child_value = child.getattr(path)
if value != child_value:
setattr(child, path, child_value)
setattr(self, path, value)

View file

@ -100,13 +100,11 @@ class MasterSlaves(object):
else: # pragma: no dynoptiondescription cover
return opt == self.master or opt in self.slaves
def reset(self, opt, values):
#FIXME pas de opt ???
def reset(self, opt, values, validate):
for slave in self.getslaves(opt):
values.reset(slave)
values.reset(slave, validate=validate)
def pop(self, opt, values, index):
#FIXME pas test de meta ...
for slave in self.getslaves(opt):
if not values.is_default_owner(slave, validate_properties=False,
validate_meta=False):
@ -157,52 +155,6 @@ class MasterSlaves(object):
def _getslave(self, values, opt, path, validate, force_permissive,
force_properties, validate_properties):
value = values._get_validated_value(opt, path, validate,
force_permissive,
force_properties,
validate_properties,
None) # not undefined
return self.get_slave_value(values, opt, value, validate,
validate_properties, force_permissive)
def setitem(self, values, opt, value, path):
if self.is_master(opt):
masterlen = len(value)
for slave in self.getslaves(opt):
slave_path = slave.impl_getpath(values._getcontext())
slave_value = values._get_validated_value(slave,
slave_path,
False,
False,
None, False,
None) # not undefined
slavelen = len(slave_value)
self.validate_slave_length(masterlen, slavelen, slave.impl_getname(), opt)
else:
self.validate_slave_length(self.get_length(values, opt,
slave_path=path), len(value),
opt.impl_getname(), opt, setitem=True)
def get_length(self, values, opt, validate=True, slave_path=undefined,
slave_value=undefined, force_permissive=False):
"""get master len with slave option"""
masterp = self.getmaster(opt).impl_getpath(values._getcontext())
if slave_value is undefined:
slave_path = undefined
return len(self.getitem(values, self.getmaster(opt), masterp, validate,
force_permissive, None, True, slave_path,
slave_value))
def validate_slave_length(self, masterlen, valuelen, name, opt, setitem=False):
if valuelen > masterlen or (valuelen < masterlen and setitem): # pragma: optional cover
log.debug('validate_slave_length: masterlen: {0}, valuelen: {1}, '
'setitem: {2}'.format(masterlen, valuelen, setitem))
raise SlaveError(_("invalid len for the slave: {0}"
" which has {1} as master").format(
name, self.getmaster(opt).impl_getname()))
def get_slave_value(self, values, opt, value, validate=True,
validate_properties=True, force_permissive=False):
"""
if master has length 0:
return []
@ -224,20 +176,72 @@ class MasterSlaves(object):
list is smaller than master: return list + None
list is greater than master: raise SlaveError
"""
master = self.getmaster(opt)
masterp = master.impl_getpath(values._getcontext())
masterlen = self.get_length(values, opt, validate, undefined,
undefined, force_permissive,
master=master)
master_is_meta = values._is_meta(opt, masterp)
value = values._get_validated_value(opt, path, validate,
force_permissive,
force_properties,
validate_properties,
None, # not undefined
with_meta=master_is_meta)
#if slave, had values until master's one
path = opt.impl_getpath(values._getcontext())
masterlen = self.get_length(values, opt, validate, path, value,
force_permissive)
valuelen = len(value)
if validate:
self.validate_slave_length(masterlen, valuelen, opt.impl_getname(), opt)
self.validate_slave_length(masterlen, valuelen,
opt.impl_getname(), opt)
if valuelen < masterlen:
for num in range(0, masterlen - valuelen):
index = valuelen + num
value.append(values._get_validated_value(opt, path, True,
False, None,
validate_properties,
with_meta=master_is_meta,
index=index),
setitem=False,
force=True)
return value
def setitem(self, values, opt, value, path):
if self.is_master(opt):
masterlen = len(value)
for slave in self.getslaves(opt):
slave_path = slave.impl_getpath(values._getcontext())
slave_value = values._get_validated_value(slave,
slave_path,
False,
False,
None, False,
None) # not undefined
slavelen = len(slave_value)
self.validate_slave_length(masterlen, slavelen, slave.impl_getname(), opt)
else:
self.validate_slave_length(self.get_length(values, opt,
slave_path=path), len(value),
opt.impl_getname(), opt, setitem=True)
def get_length(self, values, opt, validate=True, slave_path=undefined,
slave_value=undefined, force_permissive=False, master=None,
masterp=None):
"""get master len with slave option"""
if master is None:
master = self.getmaster(opt)
if masterp is None:
masterp = master.impl_getpath(values._getcontext())
if slave_value is undefined:
slave_path = undefined
return len(self.getitem(values, master, masterp, validate,
force_permissive, None, True, slave_path,
slave_value))
def validate_slave_length(self, masterlen, valuelen, name, opt, setitem=False):
if valuelen > masterlen or (valuelen < masterlen and setitem): # pragma: optional cover
log.debug('validate_slave_length: masterlen: {0}, valuelen: {1}, '
'setitem: {2}'.format(masterlen, valuelen, setitem))
raise SlaveError(_("invalid len for the slave: {0}"
" which has {1} as master").format(
name, self.getmaster(opt).impl_getname()))

View file

@ -216,7 +216,6 @@ def populate_owners():
setattr(owners, name, owners.Owner(name))
setattr(owners, 'addowner', addowner)
# ____________________________________________________________
# populate groups and owners with default attributes
groups = GroupModule()
@ -334,8 +333,11 @@ class Settings(object):
path = opt.impl_getpath(self._getcontext())
return self._getitem(opt, path)
def _getitem(self, opt, path):
return Property(self, self._getproperties(opt, path), opt, path)
def _getitem(self, opt, path, self_properties=undefined):
return Property(self,
self._getproperties(opt, path,
self_properties=self_properties),
opt, path)
def __setitem__(self, opt, value): # pragma: optional cover
raise ValueError(_('you should only append/remove properties'))
@ -352,19 +354,22 @@ class Settings(object):
self._p_.delproperties(_path)
self._getcontext().cfgimpl_reset_cache()
def _getproperties(self, opt=None, path=None, _is_apply_req=True):
def _getproperties(self, opt=None, path=None, _is_apply_req=True,
self_properties=undefined):
"""
be careful, _is_apply_req doesn't copy properties
"""
if opt is None:
props = copy(self._p_.getproperties(path, default_properties))
else:
if self_properties is undefined:
self_properties = self._getproperties()
if path is None: # pragma: optional cover
raise ValueError(_('if opt is not None, path should not be'
' None in _getproperties'))
ntime = None
if 'cache' in self and self._p_.hascache(path):
if 'expire' in self:
if 'cache' in self_properties and self._p_.hascache(path):
if 'expire' in self_properties:
ntime = int(time())
is_cached, props = self._p_.getcache(path, ntime)
if is_cached:
@ -373,8 +378,8 @@ class Settings(object):
if _is_apply_req:
props = copy(props)
props |= self.apply_requires(opt, path)
if 'cache' in self:
if 'expire' in self:
if 'cache' in self_properties:
if 'expire' in self_properties:
if ntime is None:
ntime = int(time())
ntime = ntime + expires_time
@ -412,7 +417,8 @@ class Settings(object):
#____________________________________________________________
def validate_properties(self, opt_or_descr, is_descr, is_write, path,
value=None, force_permissive=False,
force_properties=None, force_permissives=None):
force_properties=None, force_permissives=None,
self_properties=undefined):
"""
validation upon the properties related to `opt_or_descr`
@ -432,8 +438,10 @@ class Settings(object):
(typically with the `frozen` property)
"""
# opt properties
properties = self._getproperties(opt_or_descr, path)
self_properties = self._getproperties()
if self_properties is undefined:
self_properties = self._getproperties()
properties = self._getproperties(opt_or_descr, path,
self_properties=self_properties)
# remove opt permissive
# permissive affect option's permission with or without permissive
# global property

View file

@ -54,7 +54,8 @@ class Values(object):
raise ConfigError(_('the context does not exist anymore'))
return context
def _getvalue(self, opt, path, is_default, index=undefined):
def _getvalue(self, opt, path, is_default, index=undefined,
with_meta=True, setting_properties=undefined):
"""actually retrieves the value
:param opt: the `option.Option()` object
@ -63,8 +64,10 @@ class Values(object):
if opt.impl_is_optiondescription(): # pragma: optional cover
raise ValueError(_('optiondescription has no value'))
setting = self._getcontext().cfgimpl_get_settings()
force_default = 'frozen' in setting._getitem(opt, path) and \
'force_default_on_freeze' in setting._getitem(opt, path)
force_default = 'frozen' in setting._getitem(opt, path,
self_properties=setting_properties) and \
'force_default_on_freeze' in setting._getitem(opt, path,
self_properties=setting_properties)
if not is_default and not force_default:
value = self._p_.getvalue(path)
if index is not undefined:
@ -96,19 +99,21 @@ class Values(object):
return value
except IndexError:
pass
meta = self._getcontext().cfgimpl_get_meta()
if meta is not None:
#FIXME : problème de longueur si meta + slave
#doit passer de meta à pas meta
#en plus il faut gérer la longueur avec les meta !
#FIXME SymLinkOption
value = meta.cfgimpl_get_values()._get_cached_item(opt, path)
if isinstance(value, Multi):
if index is not undefined:
value = value[index]
else:
value = list(value)
return value
if with_meta:
meta = self._getcontext().cfgimpl_get_meta()
if meta is not None:
#FIXME : possible problème de longueur si slave en SymLinkOption
try:
value = meta.cfgimpl_get_values(
)._get_cached_item(opt, path)
if isinstance(value, Multi):
if index is not undefined:
value = value[index]
else:
value = list(value)
return value
except PropertiesOptionError:
pass
# now try to get default value
value = opt.impl_getdefault()
if opt.impl_is_multi() and index is not undefined:
@ -146,19 +151,21 @@ class Values(object):
"""overrides the builtins `del()` instructions"""
self.reset(opt)
def reset(self, opt, path=None):
def reset(self, opt, path=None, validate=True):
if path is None:
path = opt.impl_getpath(self._getcontext())
context = self._getcontext()
context.cfgimpl_get_settings().validate_properties(opt, False, True,
path)
if self._p_.hasvalue(path):
setting = context.cfgimpl_get_settings()
opt.impl_validate(opt.impl_getdefault(),
context, 'validator' in setting)
if validate:
context.cfgimpl_get_settings().validate_properties(opt, False,
True, path)
if self._contains(path):
if validate:
setting = context.cfgimpl_get_settings()
opt.impl_validate(opt.impl_getdefault(),
context, 'validator' in setting)
context.cfgimpl_reset_cache()
if opt.impl_is_master_slaves('master'):
opt.impl_get_master_slaves().reset(opt, self)
opt.impl_get_master_slaves().reset(opt, self, validate)
self._p_.resetvalue(path)
def _isempty(self, opt, value):
@ -186,13 +193,16 @@ class Values(object):
def _get_cached_item(self, opt, path=None, validate=True,
force_permissive=False, force_properties=None,
validate_properties=True):
validate_properties=True,
setting_properties=undefined):
if path is None:
path = opt.impl_getpath(self._getcontext())
ntime = None
setting = self._getcontext().cfgimpl_get_settings()
if 'cache' in setting and self._p_.hascache(path):
if 'expire' in setting:
if setting_properties is undefined:
setting_properties = self._getcontext().cfgimpl_get_settings(
)._getproperties()
if 'cache' in setting_properties and self._p_.hascache(path):
if 'expire' in setting_properties:
ntime = int(time())
is_cached, value = self._p_.getcache(path, ntime)
if is_cached:
@ -201,10 +211,11 @@ class Values(object):
value = Multi(value, self.context, opt, path)
return value
val = self._getitem(opt, path, validate, force_permissive,
force_properties, validate_properties)
if 'cache' in setting and validate and validate_properties and \
force_permissive is False and force_properties is None:
if 'expire' in setting:
force_properties, validate_properties,
setting_properties)
if 'cache' in setting_properties and validate and validate_properties \
and force_permissive is False and force_properties is None:
if 'expire' in setting_properties:
if ntime is None:
ntime = int(time())
ntime = ntime + expires_time
@ -212,7 +223,7 @@ class Values(object):
return val
def _getitem(self, opt, path, validate, force_permissive, force_properties,
validate_properties):
validate_properties, setting_properties=undefined):
if opt.impl_is_master_slaves():
return opt.impl_get_master_slaves().getitem(self, opt, path,
validate,
@ -223,11 +234,13 @@ class Values(object):
return self._get_validated_value(opt, path, validate,
force_permissive,
force_properties,
validate_properties)
validate_properties,
setting_properties=setting_properties)
def _get_validated_value(self, opt, path, validate, force_permissive,
force_properties, validate_properties,
index=undefined, submulti_index=undefined):
index=undefined, submulti_index=undefined,
with_meta=True, setting_properties=undefined):
"""same has getitem but don't touch the cache
index is None for slave value, if value returned is not a list, just return []
"""
@ -235,13 +248,16 @@ class Values(object):
setting = context.cfgimpl_get_settings()
is_default = self._is_default_owner(opt, path,
validate_properties=False,
validate_meta=False)
validate_meta=False,
setting_properties=setting_properties)
try:
if index is None:
gv_index = undefined
else:
gv_index = index
value = self._getvalue(opt, path, is_default, index=gv_index)
value = self._getvalue(opt, path, is_default, index=gv_index,
with_meta=with_meta,
setting_properties=setting_properties)
config_error = None
except ConfigError as err:
# For calculating properties, we need value (ie for mandatory
@ -277,13 +293,17 @@ class Values(object):
force_submulti_index = None
else:
force_submulti_index = submulti_index
opt.impl_validate(value, context, 'validator' in setting,
if setting_properties is undefined:
setting_properties = setting._getproperties()
opt.impl_validate(value, context,
'validator' in setting_properties,
force_index=force_index,
force_submulti_index=force_submulti_index)
#FIXME pas de test avec les metas ...
#FIXME et les symlinkoption ...
if is_default and 'force_store_value' in setting._getitem(opt,
path):
path,
self_properties=setting_properties):
if isinstance(value, Multi):
item = list(value)
else:
@ -294,7 +314,8 @@ class Values(object):
setting.validate_properties(opt, False, False, value=value,
path=path,
force_permissive=force_permissive,
force_properties=force_properties)
force_properties=force_properties,
self_properties=setting_properties)
if config_error is not None:
raise config_error
return value
@ -308,24 +329,27 @@ class Values(object):
# user didn't change value, so not write
# valid opt
context = self._getcontext()
setting_properties = context.cfgimpl_get_settings()._getproperties()
opt.impl_validate(value, context,
'validator' in context.cfgimpl_get_settings())
'validator' in setting_properties)
if opt.impl_is_multi():
#value = Multi(value, self.context, opt, path)
if opt.impl_is_master_slaves():
opt.impl_get_master_slaves().setitem(self, opt, value, path)
self._setvalue(opt, path, value, force_permissive=force_permissive,
is_write=is_write)
is_write=is_write,
setting_properties=setting_properties)
def _setvalue(self, opt, path, value, force_permissive=False,
is_write=True, validate_properties=True):
is_write=True, validate_properties=True,
setting_properties=undefined):
context = self._getcontext()
context.cfgimpl_reset_cache()
if validate_properties:
setting = context.cfgimpl_get_settings()
setting.validate_properties(opt, False, is_write,
value=value, path=path,
force_permissive=force_permissive)
force_permissive=force_permissive,
self_properties=setting_properties)
owner = context.cfgimpl_get_settings().getowner()
if isinstance(value, Multi):
value = list(value)
@ -335,6 +359,18 @@ class Values(object):
value[idx] = list(val)
self._p_.setvalue(path, value, owner)
def _is_meta(self, opt, path):
context = self._getcontext()
setting = context.cfgimpl_get_settings()
settings = setting._getitem(opt, path)
if 'frozen' in settings and 'force_default_on_freeze' in settings:
return False
if self._p_.getowner(path, owners.default) is not owners.default:
return False
if context.cfgimpl_get_meta() is not None:
return True
return False
def getowner(self, opt, force_permissive=False):
"""
retrieves the option's owner
@ -351,17 +387,27 @@ class Values(object):
return self._getowner(opt, path, force_permissive=force_permissive)
def _getowner(self, opt, path, validate_properties=True,
force_permissive=False, validate_meta=True):
if not isinstance(opt, Option) and not isinstance(opt, DynSymLinkOption):
force_permissive=False, validate_meta=undefined,
setting_properties=undefined):
if not isinstance(opt, Option) and not isinstance(opt,
DynSymLinkOption):
raise ConfigError(_('owner only avalaible for an option'))
context = self._getcontext()
setting = context.cfgimpl_get_settings()
if 'frozen' in setting._getitem(opt, path) and \
'force_default_on_freeze' in setting._getitem(opt, path):
settings = setting._getitem(opt, path,
self_properties=setting_properties)
if 'frozen' in settings and 'force_default_on_freeze' in settings:
return owners.default
if validate_properties:
self._getitem(opt, path, True, force_permissive, None, True)
owner = self._p_.getowner(path, owners.default)
if validate_meta is undefined:
if opt.impl_is_master_slaves('slave'):
master = opt.impl_get_master_slaves().getmaster(opt)
masterp = master.impl_getpath(context)
validate_meta = self._is_meta(opt, masterp)
else:
validate_meta = True
if validate_meta:
meta = context.cfgimpl_get_meta()
if owner is owners.default and meta is not None:
@ -405,9 +451,11 @@ class Values(object):
validate_meta=validate_meta)
def _is_default_owner(self, opt, path, validate_properties=True,
validate_meta=True):
validate_meta=True, setting_properties=undefined):
return self._getowner(opt, path, validate_properties,
validate_meta=validate_meta) == owners.default
validate_meta=validate_meta,
setting_properties=setting_properties) == \
owners.default
def reset_cache(self, only_expired):
"""