585 lines
29 KiB
Python
585 lines
29 KiB
Python
# coding: utf-8
|
|
from .autopath import do_autopath
|
|
do_autopath()
|
|
|
|
from tiramisu import setting, value
|
|
setting.expires_time = 1
|
|
value.expires_time = 1
|
|
from tiramisu.option import BoolOption, IPOption, IntOption, StrOption, OptionDescription
|
|
from tiramisu.config import Config
|
|
from tiramisu.error import ConfigError
|
|
from tiramisu.setting import groups
|
|
|
|
|
|
from time import sleep, time
|
|
from py.test import raises
|
|
|
|
|
|
def make_description():
|
|
u1 = IntOption('u1', '', multi=True)
|
|
u2 = IntOption('u2', '')
|
|
u3 = IntOption('u3', '', multi=True)
|
|
return OptionDescription('od1', '', [u1, u2, u3])
|
|
|
|
|
|
def test_cache_config():
|
|
od1 = make_description()
|
|
assert od1.impl_already_build_caches() is False
|
|
c = Config(od1)
|
|
assert od1.impl_already_build_caches() is True
|
|
c
|
|
|
|
|
|
def test_cache():
|
|
od1 = make_description()
|
|
c = Config(od1)
|
|
values = c.cfgimpl_get_values()
|
|
settings = c.cfgimpl_get_settings()
|
|
c.u1
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
c.u2
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u2' in values._p_.get_cached(c)
|
|
assert 'u2' in settings._p_.get_cached(c)
|
|
|
|
|
|
def test_get_cache():
|
|
# force a value in cache, try if reget corrupted value
|
|
od1 = make_description()
|
|
c = Config(od1)
|
|
values = c.cfgimpl_get_values()
|
|
settings = c.cfgimpl_get_settings()
|
|
ntime = time() + 1
|
|
settings._p_.setcache('u1', set(['inject']), ntime, None)
|
|
assert 'inject' in settings[od1.u1]
|
|
values._p_.setcache('u1', 100, ntime, None)
|
|
assert c.u1 == [100]
|
|
|
|
|
|
def test_get_cache_no_expire():
|
|
# force a value in cache, try if reget corrupted value
|
|
od1 = make_description()
|
|
c = Config(od1)
|
|
values = c.cfgimpl_get_values()
|
|
settings = c.cfgimpl_get_settings()
|
|
settings._p_.setcache('u1', set(['inject2']), None, None)
|
|
assert 'inject2' in settings[od1.u1]
|
|
values._p_.setcache('u1', 200, None, None)
|
|
assert c.u1 == [200]
|
|
|
|
|
|
def test_cache_reset():
|
|
od1 = make_description()
|
|
c = Config(od1)
|
|
values = c.cfgimpl_get_values()
|
|
settings = c.cfgimpl_get_settings()
|
|
#when change a value
|
|
c.u1
|
|
c.u2
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u2' in values._p_.get_cached(c)
|
|
assert 'u2' in settings._p_.get_cached(c)
|
|
c.u2 = 1
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u2' not in values._p_.get_cached(c)
|
|
assert 'u2' not in settings._p_.get_cached(c)
|
|
#when remove a value
|
|
c.u1
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
del(c.u2)
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u2' not in values._p_.get_cached(c)
|
|
assert 'u2' not in settings._p_.get_cached(c)
|
|
#when add/del property
|
|
c.u1
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
c.cfgimpl_get_settings()[od1.u2].append('test')
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u2' not in values._p_.get_cached(c)
|
|
assert 'u2' not in settings._p_.get_cached(c)
|
|
c.u1
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
c.cfgimpl_get_settings()[od1.u2].remove('test')
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u2' not in values._p_.get_cached(c)
|
|
assert 'u2' not in settings._p_.get_cached(c)
|
|
#when enable/disabled property
|
|
c.u1
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
c.cfgimpl_get_settings().append('test')
|
|
assert 'u1' not in values._p_.get_cached(c)
|
|
assert 'u1' not in settings._p_.get_cached(c)
|
|
c.u1
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
c.cfgimpl_get_settings().remove('test')
|
|
assert 'u1' not in values._p_.get_cached(c)
|
|
assert 'u1' not in settings._p_.get_cached(c)
|
|
|
|
|
|
def test_cache_reset_multi():
|
|
od1 = make_description()
|
|
c = Config(od1)
|
|
values = c.cfgimpl_get_values()
|
|
settings = c.cfgimpl_get_settings()
|
|
c.u1
|
|
c.u3
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u3' in values._p_.get_cached(c)
|
|
assert 'u3' in settings._p_.get_cached(c)
|
|
#when change a value
|
|
c.u3 = [1]
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u3' not in values._p_.get_cached(c)
|
|
assert 'u3' not in settings._p_.get_cached(c)
|
|
#when append value
|
|
c.u1
|
|
c.u3
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u3' in values._p_.get_cached(c)
|
|
assert 'u3' in settings._p_.get_cached(c)
|
|
c.u3.append(1)
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u3' not in values._p_.get_cached(c)
|
|
assert 'u3' not in settings._p_.get_cached(c)
|
|
#when pop value
|
|
c.u1
|
|
c.u3
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u3' in values._p_.get_cached(c)
|
|
assert 'u3' in settings._p_.get_cached(c)
|
|
c.u3.pop(1)
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u3' not in values._p_.get_cached(c)
|
|
assert 'u3' not in settings._p_.get_cached(c)
|
|
#when remove a value
|
|
c.u1
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
del(c.u3)
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u3' not in values._p_.get_cached(c)
|
|
assert 'u3' not in settings._p_.get_cached(c)
|
|
|
|
|
|
def test_reset_cache():
|
|
od1 = make_description()
|
|
c = Config(od1)
|
|
values = c.cfgimpl_get_values()
|
|
settings = c.cfgimpl_get_settings()
|
|
c.u1
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
c.cfgimpl_reset_cache()
|
|
assert 'u1' not in values._p_.get_cached(c)
|
|
assert 'u1' not in settings._p_.get_cached(c)
|
|
c.u1
|
|
sleep(1)
|
|
c.u1
|
|
sleep(1)
|
|
c.u2
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u2' in values._p_.get_cached(c)
|
|
assert 'u2' in settings._p_.get_cached(c)
|
|
c.cfgimpl_reset_cache()
|
|
assert 'u1' not in values._p_.get_cached(c)
|
|
assert 'u1' not in settings._p_.get_cached(c)
|
|
assert 'u2' not in values._p_.get_cached(c)
|
|
assert 'u2' not in settings._p_.get_cached(c)
|
|
|
|
|
|
def test_reset_cache_subconfig():
|
|
od1 = make_description()
|
|
od2 = OptionDescription('od2', '', [od1])
|
|
c = Config(od2)
|
|
values = c.cfgimpl_get_values()
|
|
c.od1.u1
|
|
assert 'od1.u1' in values._p_.get_cached(c)
|
|
c.od1.cfgimpl_reset_cache()
|
|
assert 'od1.u1' not in values._p_.get_cached(c)
|
|
|
|
|
|
def test_reset_cache_only_expired():
|
|
od1 = make_description()
|
|
c = Config(od1)
|
|
values = c.cfgimpl_get_values()
|
|
settings = c.cfgimpl_get_settings()
|
|
c.u1
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
c.cfgimpl_reset_cache(True)
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
sleep(1)
|
|
c.u1
|
|
sleep(1)
|
|
c.u2
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u2' in values._p_.get_cached(c)
|
|
assert 'u2' in settings._p_.get_cached(c)
|
|
c.cfgimpl_reset_cache(True)
|
|
assert 'u1' not in values._p_.get_cached(c)
|
|
assert 'u1' not in settings._p_.get_cached(c)
|
|
assert 'u2' in values._p_.get_cached(c)
|
|
assert 'u2' in settings._p_.get_cached(c)
|
|
|
|
|
|
def test_cache_not_expire():
|
|
od1 = make_description()
|
|
c = Config(od1)
|
|
values = c.cfgimpl_get_values()
|
|
settings = c.cfgimpl_get_settings()
|
|
settings.remove('expire')
|
|
c.u1
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
c.cfgimpl_reset_cache(True)
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
sleep(1)
|
|
c.u2
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u2' in values._p_.get_cached(c)
|
|
assert 'u2' in settings._p_.get_cached(c)
|
|
c.cfgimpl_reset_cache(True)
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
assert 'u2' in values._p_.get_cached(c)
|
|
assert 'u2' in settings._p_.get_cached(c)
|
|
|
|
|
|
def test_cache_not_cache():
|
|
od1 = make_description()
|
|
c = Config(od1)
|
|
values = c.cfgimpl_get_values()
|
|
settings = c.cfgimpl_get_settings()
|
|
settings.remove('cache')
|
|
c.u1
|
|
assert 'u1' not in values._p_.get_cached(c)
|
|
assert 'u1' not in settings._p_.get_cached(c)
|
|
|
|
|
|
def test_reset_cache_only():
|
|
od1 = make_description()
|
|
c = Config(od1)
|
|
values = c.cfgimpl_get_values()
|
|
settings = c.cfgimpl_get_settings()
|
|
c.u1
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
c.cfgimpl_reset_cache(only=('values',))
|
|
assert 'u1' not in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
c.u1
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' in settings._p_.get_cached(c)
|
|
c.cfgimpl_reset_cache(only=('settings',))
|
|
assert 'u1' in values._p_.get_cached(c)
|
|
assert 'u1' not in settings._p_.get_cached(c)
|
|
|
|
|
|
def test_force_cache():
|
|
u1 = IntOption('u1', '', multi=True)
|
|
u2 = IntOption('u2', '')
|
|
u3 = IntOption('u3', '', multi=True)
|
|
u4 = IntOption('u4', '', properties=('disabled',))
|
|
od = OptionDescription('od1', '', [u1, u2, u3, u4])
|
|
c = Config(od)
|
|
c.cfgimpl_get_settings().remove('expire')
|
|
|
|
c.cfgimpl_get_values().force_cache()
|
|
assert c.cfgimpl_get_values()._p_.get_cached(c) == {'u1': {None: ([], None)},
|
|
'u2': {None: (None, None)},
|
|
'u3': {None: ([], None)},
|
|
'u4': {None: (None, None)}}
|
|
assert c.cfgimpl_get_settings()._p_.get_cached(c) == {'u1': {None: (set(['empty']), None)},
|
|
'u2': {None: (set([]), None)},
|
|
'u3': {None: (set(['empty']), None)},
|
|
'u4': {None: (set(['disabled']), None)}}
|
|
c.read_only()
|
|
|
|
c.cfgimpl_get_values().force_cache()
|
|
assert c.cfgimpl_get_values()._p_.get_cached(c) == {'u1': {None: ([], None)},
|
|
'u2': {None: (None, None)},
|
|
'u3': {None: ([], None)}}
|
|
assert c.cfgimpl_get_settings()._p_.get_cached(c) == {'u1': {None: (set(['empty']), None)},
|
|
'u2': {None: (set([]), None)},
|
|
'u3': {None: (set(['empty']), None)},
|
|
'u4': {None: (set(['disabled']), None)}}
|
|
|
|
c.cfgimpl_get_settings().remove('cache')
|
|
raises(ConfigError, "c.cfgimpl_get_values().force_cache()")
|
|
|
|
|
|
def test_cache_master_slave():
|
|
ip_admin_eth0 = StrOption('ip_admin_eth0', "ip réseau autorisé", multi=True)
|
|
netmask_admin_eth0 = StrOption('netmask_admin_eth0', "masque du sous-réseau", multi=True)
|
|
interface1 = OptionDescription('ip_admin_eth0', '', [ip_admin_eth0, netmask_admin_eth0])
|
|
interface1.impl_set_group_type(groups.master)
|
|
maconfig = OptionDescription('toto', '', [interface1])
|
|
cfg = Config(maconfig)
|
|
cfg.read_write()
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {}
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {}
|
|
#
|
|
cfg.ip_admin_eth0.ip_admin_eth0.append('192.168.1.2')
|
|
cfg.ip_admin_eth0.ip_admin_eth0
|
|
cfg.ip_admin_eth0.netmask_admin_eth0
|
|
cache = cfg.cfgimpl_get_values()._p_.get_cached(cfg)
|
|
assert set(cache.keys()) == set(['ip_admin_eth0.ip_admin_eth0', 'ip_admin_eth0.netmask_admin_eth0'])
|
|
assert set(cache['ip_admin_eth0.ip_admin_eth0'].keys()) == set([None])
|
|
assert cache['ip_admin_eth0.ip_admin_eth0'][None][0] == ['192.168.1.2']
|
|
assert set(cache['ip_admin_eth0.netmask_admin_eth0'].keys()) == set([None, 0])
|
|
assert cache['ip_admin_eth0.netmask_admin_eth0'][None][0] == [None]
|
|
assert cache['ip_admin_eth0.netmask_admin_eth0'][0][0] is None
|
|
cache = cfg.cfgimpl_get_settings()._p_.get_cached(cfg)
|
|
assert set(cache.keys()) == set(['ip_admin_eth0', 'ip_admin_eth0.ip_admin_eth0', 'ip_admin_eth0.netmask_admin_eth0'])
|
|
assert set(cache['ip_admin_eth0'].keys()) == set([None])
|
|
assert set(cache['ip_admin_eth0.ip_admin_eth0'].keys()) == set([None])
|
|
assert set(cache['ip_admin_eth0.netmask_admin_eth0'].keys()) == set([None, 0])
|
|
#
|
|
cfg.ip_admin_eth0.ip_admin_eth0.append('192.168.1.1')
|
|
cfg.ip_admin_eth0.ip_admin_eth0
|
|
cfg.ip_admin_eth0.netmask_admin_eth0
|
|
cache = cfg.cfgimpl_get_values()._p_.get_cached(cfg)
|
|
assert set(cache.keys()) == set(['ip_admin_eth0.ip_admin_eth0', 'ip_admin_eth0.netmask_admin_eth0'])
|
|
assert set(cache['ip_admin_eth0.ip_admin_eth0'].keys()) == set([None])
|
|
assert cache['ip_admin_eth0.ip_admin_eth0'][None][0] == ['192.168.1.2', '192.168.1.1']
|
|
assert set(cache['ip_admin_eth0.netmask_admin_eth0'].keys()) == set([None, 0, 1])
|
|
assert cache['ip_admin_eth0.netmask_admin_eth0'][None][0] == [None, None]
|
|
assert cache['ip_admin_eth0.netmask_admin_eth0'][0][0] is None
|
|
assert cache['ip_admin_eth0.netmask_admin_eth0'][1][0] is None
|
|
cache = cfg.cfgimpl_get_settings()._p_.get_cached(cfg)
|
|
assert set(cache.keys()) == set(['ip_admin_eth0', 'ip_admin_eth0.ip_admin_eth0', 'ip_admin_eth0.netmask_admin_eth0'])
|
|
assert set(cache['ip_admin_eth0'].keys()) == set([None])
|
|
assert set(cache['ip_admin_eth0.ip_admin_eth0'].keys()) == set([None])
|
|
assert set(cache['ip_admin_eth0.netmask_admin_eth0'].keys()) == set([None, 0, 1])
|
|
#DEL, insert, ...
|
|
|
|
|
|
def return_value(value=None):
|
|
return value
|
|
|
|
|
|
def test_cache_callback():
|
|
val1 = StrOption('val1', "", 'val')
|
|
val2 = StrOption('val2', "", callback=return_value, callback_params={'': ((val1, False),)}, properties=('mandatory',))
|
|
val3 = StrOption('val3', "", callback=return_value, callback_params={'': ('yes',)})
|
|
val4 = StrOption('val4', "", callback=return_value, callback_params={'value': ((val1, False),)})
|
|
val5 = StrOption('val5', "", callback=return_value, callback_params={'value': ('yes',)}, multi=True)
|
|
maconfig = OptionDescription('rootconfig', '', [val1, val2, val3, val4, val5])
|
|
cfg = Config(maconfig)
|
|
cfg.cfgimpl_get_settings().remove('expire')
|
|
cfg.read_write()
|
|
cfg.cfgimpl_get_values().force_cache()
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val2': {None: (set(['mandatory']), None)},
|
|
'val3': {None: (set([]), None)},
|
|
'val4': {None: (set([]), None)},
|
|
'val5': {None: (set(['empty']), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('val', None)},
|
|
'val2': {None: ('val', None)},
|
|
'val3': {None: ('yes', None)},
|
|
'val4': {None: ('val', None)},
|
|
'val5': {None: (['yes'], None)}}
|
|
cfg.val1 = 'new'
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val3': {None: (set([]), None)},
|
|
'val5': {None: (set(['empty']), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val3': {None: ('yes', None)},
|
|
'val5': {None: (['yes'], None)}}
|
|
cfg.cfgimpl_get_values().force_cache()
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val2': {None: (set(['mandatory']), None)},
|
|
'val3': {None: (set([]), None)},
|
|
'val4': {None: (set([]), None)},
|
|
'val5': {None: (set(['empty']), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)},
|
|
'val2': {None: ('new', None)},
|
|
'val3': {None: ('yes', None)},
|
|
'val4': {None: ('new', None)},
|
|
'val5': {None: (['yes'], None)}}
|
|
cfg.val3 = 'new2'
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val2': {None: (set(['mandatory']), None)},
|
|
'val4': {None: (set([]), None)},
|
|
'val5': {None: (set(['empty']), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)},
|
|
'val2': {None: ('new', None)},
|
|
'val4': {None: ('new', None)},
|
|
'val5': {None: (['yes'], None)}}
|
|
cfg.cfgimpl_get_values().force_cache()
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val2': {None: (set(['mandatory']), None)},
|
|
'val3': {None: (set([]), None)},
|
|
'val4': {None: (set([]), None)},
|
|
'val5': {None: (set(['empty']), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)},
|
|
'val2': {None: ('new', None)},
|
|
'val3': {None: ('new2', None)},
|
|
'val4': {None: ('new', None)},
|
|
'val5': {None: (['yes'], None)}}
|
|
cfg.val4 = 'new3'
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val2': {None: (set(['mandatory']), None)},
|
|
'val3': {None: (set([]), None)},
|
|
'val5': {None: (set(['empty']), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)},
|
|
'val2': {None: ('new', None)},
|
|
'val3': {None: ('new2', None)},
|
|
'val5': {None: (['yes'], None)}}
|
|
cfg.cfgimpl_get_values().force_cache()
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val2': {None: (set(['mandatory']), None)},
|
|
'val3': {None: (set([]), None)},
|
|
'val4': {None: (set([]), None)},
|
|
'val5': {None: (set(['empty']), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)},
|
|
'val2': {None: ('new', None)},
|
|
'val3': {None: ('new2', None)},
|
|
'val4': {None: ('new3', None)},
|
|
'val5': {None: (['yes'], None)}}
|
|
cfg.val5.append('new4')
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val2': {None: (set(['mandatory']), None)},
|
|
'val3': {None: (set([]), None)},
|
|
'val4': {None: (set([]), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)},
|
|
'val2': {None: ('new', None)},
|
|
'val3': {None: ('new2', None)},
|
|
'val4': {None: ('new3', None)}}
|
|
cfg.cfgimpl_get_values().force_cache()
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val2': {None: (set(['mandatory']), None)},
|
|
'val3': {None: (set([]), None)},
|
|
'val4': {None: (set([]), None)},
|
|
'val5': {None: (set(['empty']), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1': {None: ('new', None)},
|
|
'val2': {None: ('new', None)},
|
|
'val3': {None: ('new2', None)},
|
|
'val4': {None: ('new3', None)},
|
|
'val5': {None: (['yes', 'new4'], None)}}
|
|
|
|
|
|
def test_cache_master_and_slaves_master():
|
|
val1 = StrOption('val1', "", multi=True)
|
|
val2 = StrOption('val2', "", multi=True)
|
|
interface1 = OptionDescription('val1', '', [val1, val2])
|
|
interface1.impl_set_group_type(groups.master)
|
|
maconfig = OptionDescription('rootconfig', '', [interface1])
|
|
cfg = Config(maconfig)
|
|
cfg.cfgimpl_get_settings().remove('expire')
|
|
cfg.read_write()
|
|
cfg.cfgimpl_get_values().force_cache()
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val1.val1': {None: (set(['empty']), None)},
|
|
'val1.val2': {None: (set([]), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1.val1': {None: ([], None)}, 'val1.val2': {None: ([], None)}}
|
|
cfg.val1.val1.append()
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {}
|
|
cfg.cfgimpl_get_values().force_cache()
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val1.val1': {None: (set(['empty']), None)},
|
|
'val1.val2': {None: (set([]), None), 0: (set([]), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1.val1': {None: ([None], None)},
|
|
'val1.val2': {None: ([None], None), 0: (None, None)}}
|
|
cfg.val1.val1.append()
|
|
cfg.cfgimpl_get_values().force_cache()
|
|
cfg.val1.val2[1] = 'oui'
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val1.val1': {None: (set(['empty']), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1.val1': {None: ([None, None], None)}}
|
|
cfg.cfgimpl_get_values().force_cache()
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val1.val1': {None: (set(['empty']), None)},
|
|
'val1.val2': {None: (set([]), None), 0: (set([]), None), 1: (set([]), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1.val1': {None: ([None, None], None)},
|
|
'val1.val2': {None: ([None, 'oui'], None), 0: (None, None), 1: ('oui', None)}}
|
|
|
|
|
|
def test_cache_master_callback():
|
|
val1 = StrOption('val1', "", multi=True)
|
|
val2 = StrOption('val2', "", multi=True, callback=return_value, callback_params={'value': ((val1, False),)})
|
|
interface1 = OptionDescription('val1', '', [val1, val2])
|
|
interface1.impl_set_group_type(groups.master)
|
|
maconfig = OptionDescription('rootconfig', '', [interface1])
|
|
cfg = Config(maconfig)
|
|
cfg.cfgimpl_get_settings().remove('expire')
|
|
cfg.read_write()
|
|
cfg.cfgimpl_get_values().force_cache()
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val1.val1': {None: (set(['empty']), None)},
|
|
'val1.val2': {None: (set([]), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1.val1': {None: ([], None)}, 'val1.val2': {None: ([], None)}}
|
|
cfg.val1.val1.append()
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {}
|
|
cfg.cfgimpl_get_values().force_cache()
|
|
assert cfg.cfgimpl_get_settings()._p_.get_cached(cfg) == {'val1': {None: (set([]), None)},
|
|
'val1.val1': {None: (set(['empty']), None)},
|
|
'val1.val2': {None: (set([]), None), 0: (set([]), None)}}
|
|
assert cfg.cfgimpl_get_values()._p_.get_cached(cfg) == {'val1.val1': {None: ([None], None)},
|
|
'val1.val2': {None: ([None], None), 0: (None, None)}}
|
|
|
|
|
|
def test_cache_requires():
|
|
a = BoolOption('activate_service', '', True)
|
|
b = IPOption('ip_address_service', '',
|
|
requires=[{'option': a, 'expected': False, 'action': 'disabled'}])
|
|
od = OptionDescription('service', '', [a, b])
|
|
c = Config(od)
|
|
c.cfgimpl_get_settings().remove('expire')
|
|
c.read_write()
|
|
assert c.cfgimpl_get_settings()._p_.get_cached(c) == {}
|
|
assert c.cfgimpl_get_values()._p_.get_cached(c) == {}
|
|
assert c.ip_address_service == None
|
|
assert c.cfgimpl_get_settings()._p_.get_cached(c) == {'activate_service': {None: (set([]), None)},
|
|
'ip_address_service': {None: (set([]), None)}}
|
|
|
|
assert c.cfgimpl_get_values()._p_.get_cached(c) == {'ip_address_service': {None: (None, None)}}
|
|
c.cfgimpl_get_values().force_cache()
|
|
assert c.cfgimpl_get_settings()._p_.get_cached(c) == {'activate_service': {None: (set([]), None)},
|
|
'ip_address_service': {None: (set([]), None)}}
|
|
|
|
assert c.cfgimpl_get_values()._p_.get_cached(c) == {'ip_address_service': {None: (None, None)},
|
|
'activate_service': {None: (True, None)}}
|
|
c.ip_address_service = '1.1.1.1'
|
|
assert c.cfgimpl_get_settings()._p_.get_cached(c) == {'activate_service': {None: (set([]), None)}}
|
|
|
|
assert c.cfgimpl_get_values()._p_.get_cached(c) == {'activate_service': {None: (True, None)}}
|
|
c.cfgimpl_get_values().force_cache()
|
|
assert c.cfgimpl_get_settings()._p_.get_cached(c) == {'activate_service': {None: (set([]), None)},
|
|
'ip_address_service': {None: (set([]), None)}}
|
|
|
|
assert c.cfgimpl_get_values()._p_.get_cached(c) == {'ip_address_service': {None: ('1.1.1.1', None)},
|
|
'activate_service': {None: (True, None)}}
|
|
c.activate_service = False
|
|
assert c.cfgimpl_get_settings()._p_.get_cached(c) == {}
|
|
|
|
assert c.cfgimpl_get_values()._p_.get_cached(c) == {}
|
|
c.cfgimpl_get_values().force_cache()
|
|
assert c.cfgimpl_get_settings()._p_.get_cached(c) == {'activate_service': {None: (set([]), None)},
|
|
'ip_address_service': {None: (set(['disabled']), None)}}
|
|
|
|
assert c.cfgimpl_get_values()._p_.get_cached(c) == {'activate_service': {None: (False, None)}}
|