from .autopath import do_autopath do_autopath() from tiramisu.option import BoolOption, UnicodeOption, SymLinkOption, \ IntOption, IPOption, NetmaskOption, StrOption, OptionDescription, \ DynOptionDescription, MasterSlaves from tiramisu.config import Config, GroupConfig, MetaConfig from tiramisu.setting import groups, owners from tiramisu.storage import delete_session from tiramisu.error import ConfigError from pickle import dumps, loads from py.test import raises import sys def return_value(value=None): return value def _get_slots(opt): slots = set() for subclass in opt.__class__.__mro__: if subclass is not object and '__slots__' in dir(subclass): slots.update(subclass.__slots__) return slots def _no_state(opt): for attr in _get_slots(opt): if 'state' in attr: try: getattr(opt, attr) except: pass else: raise Exception('opt should have already attribute {0}'.format(attr)) def _diff_opt(opt1, opt2): attr1 = set(_get_slots(opt1)) attr2 = set(_get_slots(opt2)) diff1 = attr1 - attr2 diff2 = attr2 - attr1 if diff1 != set(): raise Exception('more attribute in opt1 {0}'.format(list(diff1))) if diff2 != set(): raise Exception('more attribute in opt2 {0}'.format(list(diff2))) for attr in attr1: if attr in ['_cache_paths', '_cache_consistencies']: continue err1 = False err2 = False val1 = None val2 = None try: val1 = getattr(opt1, attr) msg1 = "exists" except: err1 = True msg1 = "not exists" try: val2 = getattr(opt2, attr) msg2 = "exists" except: err2 = True msg2 = "not exists" if not err1 == err2: raise ValueError("{0} {1} before but {2} after for {3}".format(attr, msg1, msg2, opt1.impl_getname())) if val1 is None: assert val1 == val2 elif attr == '_children': assert val1[0] == val2[0] for index, _opt in enumerate(val1[1]): assert _opt._name == val2[1][index]._name elif attr == '_requires': if val1 == val2 == []: pass else: for idx, req in enumerate(val1[0][0][0]): assert val1[0][0][0][idx][0]._name == val2[0][0][0][idx][0]._name assert val1[0][0][0][idx][1] == val2[0][0][0][idx][1] assert val1[0][0][1:] == val2[0][0][1:] elif attr == '_opt': assert val1._name == val2._name elif attr == '_consistencies': # dict is only a cache if isinstance(val1, list): for index, consistency in enumerate(val1): assert consistency[0] == val2[index][0] for idx, opt in enumerate(consistency[1]): assert opt._name == val2[index][1][idx]._name elif attr == '_val_call': for idx, v in enumerate(val1): if v is None: assert val2[idx] is None else: assert v[0] == val2[idx][0] if len(v) == 2: if v[1] is not None: for key, values in v[1].items(): for i, value in enumerate(values): if isinstance(value, tuple) and value[0] is not None: assert v[1][key][i][0].impl_getname() == val2[idx][1][key][i][0].impl_getname() assert v[1][key][i][1] == val2[idx][1][key][i][1] else: assert v[1][key][i] == val2[idx][1][key][i] else: assert v[1] == val2[idx][1] elif attr == '_master_slaves': assert val1._p_._sm_getmaster().impl_getname() == val2._p_._sm_getmaster().impl_getname() sval1 = [opt.impl_getname() for opt in val1._p_._sm_getslaves()] sval2 = [opt.impl_getname() for opt in val2._p_._sm_getslaves()] assert sval1 == sval2 elif attr == '_subdyn': try: assert val1.impl_getname() == val2.impl_getname() except AttributeError: assert val1 == val2 elif attr == '_dependencies': assert len(val1) == len(val2) for idx, val in enumerate(val1): if isinstance(val, MasterSlaves): assert val._p_.master.impl_getname() == val2[idx]._p_.master.impl_getname() else: assert val.impl_getname() == val2[idx].impl_getname() else: assert val1 == val2, "error for {}".format(attr) def _diff_opts(opt1, opt2): _diff_opt(opt1, opt2) if isinstance(opt1, OptionDescription) or isinstance(opt1, DynOptionDescription): children1 = set([opt.impl_getname() for opt in opt1._impl_getchildren(dyn=False)]) children2 = set([opt.impl_getname() for opt in opt2._impl_getchildren(dyn=False)]) diff1 = children1 - children2 diff2 = children2 - children1 if diff1 != set(): raise Exception('more attribute in opt1 {0}'.format(list(diff1))) if diff2 != set(): raise Exception('more attribute in opt2 {0}'.format(list(diff2))) for child in children1: _diff_opts(opt1._getattr(child, dyn=False), opt2._getattr(child, dyn=False)) def _diff_conf(cfg1, cfg2): attr1 = set(_get_slots(cfg1)) attr2 = set(_get_slots(cfg2)) diff1 = attr1 - attr2 diff2 = attr2 - attr1 if diff1 != set(): raise Exception('more attribute in cfg1 {0}'.format(list(diff1))) if diff2 != set(): raise Exception('more attribute in cfg2 {0}'.format(list(diff2))) for attr in attr1: if attr in ('_impl_context', '__weakref__'): continue err1 = False err2 = False val1 = None val2 = None try: val1 = getattr(cfg1, attr) except: err1 = True try: val2 = getattr(cfg2, attr) except: err2 = True assert err1 == err2 if val1 is None: assert val1 == val2 elif attr == '_impl_values': assert cfg1.cfgimpl_get_values().get_modified_values() == cfg2.cfgimpl_get_values().get_modified_values() elif attr == '_impl_settings': assert cfg1.cfgimpl_get_settings().get_modified_properties() == cfg2.cfgimpl_get_settings().get_modified_properties() assert cfg1.cfgimpl_get_settings().get_modified_permissives() == cfg2.cfgimpl_get_settings().get_modified_permissives() elif attr == '_impl_descr': _diff_opt(cfg1.cfgimpl_get_description(), cfg2.cfgimpl_get_description()) elif attr == '_impl_children': for index, _opt in enumerate(val1): _diff_conf(_opt, val2[index]) else: assert val1 == val2 def test_diff_opt(): b = BoolOption('b', '') u = UnicodeOption('u', '', requires=[{'option': b, 'expected': True, 'action': 'disabled', 'inverse': True}]) s = SymLinkOption('s', u) o = OptionDescription('o', '', [b, u, s]) o1 = OptionDescription('o1', '', [o]) a = dumps(o1) q = loads(a) _diff_opts(o1, q) def test_diff_information(): b = BoolOption('b', '') b.impl_set_information('doc', 'oh') b.impl_set_information('doc1', 'oh') b.impl_set_information('doc2', 'oh') o = OptionDescription('o', '', [b]) o1 = OptionDescription('o1', '', [o]) a = dumps(o1) q = loads(a) _diff_opts(o1, q) def test_diff_information_config(): b = BoolOption('b', '') b.impl_set_information('info', 'oh') b.impl_set_information('info1', 'oh') b.impl_set_information('info2', 'oh') o = OptionDescription('o', '', [b]) o1 = OptionDescription('o1', '', [o]) try: cfg = Config(o1, persistent=True, session_id='29090938') except ValueError: cfg = Config(o1, session_id='29090938') cfg._impl_test = True cfg.impl_set_information('info', 'oh') a = dumps(cfg) q = loads(a) _diff_opts(cfg.cfgimpl_get_description(), q.cfgimpl_get_description()) _diff_conf(cfg, q) assert cfg.impl_get_information('info') == 'oh' assert q.impl_get_information('info') == 'oh' try: delete_session('config', '29090938') except ValueError: pass def test_diff_opt_multi(): b = BoolOption('b', '', multi=True) o = OptionDescription('o', '', [b]) o1 = OptionDescription('o1', '', [o]) a = dumps(o1) q = loads(a) _diff_opt(o1, q) _diff_opt(o1.o, q.o) _diff_opt(o1.o.b, q.o.b) def test_only_optiondescription(): b = BoolOption('b', '') b raises(SystemError, "a = dumps(b)") def test_diff_opt_cache(): b = BoolOption('b', '') u = UnicodeOption('u', '', requires=[{'option': b, 'expected': True, 'action': 'disabled', 'inverse': True}]) u.impl_add_consistency('not_equal', b) s = SymLinkOption('s', u) o = OptionDescription('o', '', [b, u, s]) o1 = OptionDescription('o1', '', [o]) o1.impl_build_cache_option() a = dumps(o1) q = loads(a) _diff_opts(o1, q) def test_diff_opt_callback(): b = BoolOption('b', '', callback=return_value) b2 = BoolOption('b2', '', callback=return_value, callback_params={'': ('yes',)}) b3 = BoolOption('b3', '', callback=return_value, callback_params={'': ('yes', (b, False)), 'value': ('no',)}) b4 = BoolOption("b4", "", callback=return_value, callback_params={'': ((None,),), 'value': ('string',)}) o = OptionDescription('o', '', [b, b2, b3, b4]) o1 = OptionDescription('o1', '', [o]) o1.impl_build_cache_option() a = dumps(o1) q = loads(a) _diff_opts(o1, q) def test_no_state_attr(): # all _state_xxx attributes should be deleted b = BoolOption('b', '') u = UnicodeOption('u', '', requires=[{'option': b, 'expected': True, 'action': 'disabled', 'inverse': True}]) s = SymLinkOption('s', u) o = OptionDescription('o', '', [b, u, s]) o1 = OptionDescription('o1', '', [o]) a = dumps(o1) q = loads(a) _no_state(q) _no_state(q.o) _no_state(q.o.b) _no_state(q.o.u) _no_state(q.o.s) def test_state_config(): val1 = BoolOption('val1', "") maconfig = OptionDescription('rootconfig', '', [val1]) try: cfg = Config(maconfig, persistent=True, session_id='29090931') except ValueError: cfg = Config(maconfig, session_id='29090931') cfg._impl_test = True a = dumps(cfg) q = loads(a) _diff_opts(cfg.cfgimpl_get_description(), q.cfgimpl_get_description()) _diff_conf(cfg, q) try: delete_session('config', '29090931') except ValueError: pass def test_state_config2(): a = IPOption('a', '') b = NetmaskOption('b', '') dod1 = OptionDescription('dod1', '', [a, b]) b.impl_add_consistency('ip_netmask', a) od1 = OptionDescription('od1', '', [dod1]) st1 = StrOption('st1', "", multi=True) st2 = StrOption('st2', "", multi=True) st3 = StrOption('st3', "", multi=True, callback=return_value, callback_params={'': ((st2, False),)}) stm = OptionDescription('st1', '', [st1, st2, st3]) stm.impl_set_group_type(groups.master) st = OptionDescription('st', '', [stm]) od = OptionDescription('od', '', [st]) od2 = OptionDescription('od', '', [od, od1]) try: cfg = Config(od2, persistent=True, session_id='29090939') except ValueError: cfg = Config(od2, session_id='29090939') cfg._impl_test = True a = dumps(cfg) q = loads(a) _diff_opts(cfg.cfgimpl_get_description(), q.cfgimpl_get_description()) _diff_conf(cfg, q) try: delete_session('config', '29090939') except ValueError: pass def test_diff_opt_config(): b = BoolOption('b', '') u = UnicodeOption('u', '', requires=[{'option': b, 'expected': True, 'action': 'disabled', 'inverse': True}]) u.impl_set_information('info', 'oh') u.impl_set_information('info1', 'oh') u.impl_set_information('info2', 'oh') s = SymLinkOption('s', u) o = OptionDescription('o', '', [b, u, s]) o1 = OptionDescription('o1', '', [o]) try: cfg = Config(o1, persistent=True, session_id='29090940') except ValueError: cfg = Config(o1, session_id='29090940') cfg._impl_test = True a = dumps(cfg) q = loads(a) _diff_opts(cfg.cfgimpl_get_description(), q.cfgimpl_get_description()) _diff_conf(cfg, q) try: delete_session('config', '29090940') except ValueError: pass def test_state_properties(): val1 = BoolOption('val1', "") maconfig = OptionDescription('rootconfig', '', [val1]) try: cfg = Config(maconfig, persistent=True, session_id='29090932') except ValueError: cfg = Config(maconfig, session_id='29090932') cfg._impl_test = True cfg.read_write() cfg.cfgimpl_get_settings()[val1].append('test') a = dumps(cfg) q = loads(a) _diff_conf(cfg, q) try: delete_session('config', '29090932') except ValueError: pass def test_state_values(): val1 = BoolOption('val1', "") maconfig = OptionDescription('rootconfig', '', [val1]) try: cfg = Config(maconfig, persistent=True, session_id='29090933') except ValueError: cfg = Config(maconfig, session_id='29090933') cfg._impl_test = True cfg.val1 = True a = dumps(cfg) q = loads(a) _diff_conf(cfg, q) q.val1 = False assert cfg.val1 is True assert q.val1 is False try: delete_session('config', '29090933') except ValueError: pass def test_state_values_owner(): val1 = BoolOption('val1', "") maconfig = OptionDescription('rootconfig', '', [val1]) try: cfg = Config(maconfig, persistent=True, session_id='29090934') except ValueError: cfg = Config(maconfig, session_id='29090934') cfg._impl_test = True owners.addowner('newowner') cfg.cfgimpl_get_settings().setowner(owners.newowner) cfg.val1 = True a = dumps(cfg) q = loads(a) _diff_conf(cfg, q) q.val1 = False nval1 = q.cfgimpl_get_description().val1 assert q.getowner(nval1) == owners.newowner try: delete_session('config', '29090934') except ValueError: pass def test_state_metaconfig(): i1 = IntOption('i1', '') od1 = OptionDescription('od1', '', [i1]) od2 = OptionDescription('od2', '', [od1]) try: cfg = Config(od2, persistent=True, session_id='29090935') except ValueError: conf1 = Config(od2, session_id='29090935') conf1._impl_test = True conf2 = Config(od2, session_id='29090936') conf2._impl_test = True meta = MetaConfig([conf1, conf2], session_id='29090937') meta._impl_test = True raises(ConfigError, "dumps(meta)") try: delete_session('config', '29090935') except ValueError: pass def test_state_groupconfig(): i1 = IntOption('i1', '') od1 = OptionDescription('od1', '', [i1]) od2 = OptionDescription('od2', '', [od1]) try: cfg = Config(od2, persistent=True, session_id='29090938') except ValueError: conf1 = Config(od2, session_id='29090938') conf1._impl_test = True conf2 = Config(od2, session_id='29090939') conf2._impl_test = True meta = GroupConfig([conf1, conf2], session_id='29090940') meta._impl_test = True a = dumps(meta) q = loads(a) _diff_conf(meta, q) try: delete_session('config', '29090938') except ValueError: pass def test_state_unkown_setting_owner(): """load an unknow _owner, should create it""" assert not 'supernewuser' in owners.__dict__ val = """ccopy_reg _reconstructor p0 (ctiramisu.setting Settings p1 c__builtin__ object p2 Ntp3 Rp4 (dp5 S'_owner' p6 S'supernewuser' p7 sS'_p_' p8 g0 (ctiramisu.storage.dictionary.setting Settings p9 g2 Ntp10 Rp11 (dp12 S'_cache' p13 (dp14 sS'_permissives' p15 (dp16 sS'_properties' p17 (dp18 sbsb. .""" if sys.version_info[0] >= 3: # pragma: optional cover val = bytes(val, "UTF-8") loads(val) assert 'supernewuser' in owners.__dict__