tiramisu/tests/test_state.py
2019-12-24 15:25:12 +01:00

246 lines
9.5 KiB
Python

#from autopath import do_autopath
#do_autopath()
#
from tiramisu import BoolOption, StrOption, SymLinkOption, OptionDescription, DynOptionDescription, \
Calculation, Params, ParamOption, ParamValue, calc_value, Config
from pickle import dumps
from py.test import raises
import pytest
import sys, warnings
from tiramisu.storage import list_sessions
def teardown_function(function):
with warnings.catch_warnings(record=True) as w:
assert list_sessions() == [], 'session list is not empty when leaving "{}"'.format(function.__name__)
def _get_slots(opt):
slots = set()
for subclass in opt.__class__.__mro__:
if subclass is not object and '__slots__' in dir(subclass):
slots.update(subclass.__slots__)
return slots
def _no_state(opt):
for attr in _get_slots(opt):
if 'state' in attr:
try:
getattr(opt, attr)
except:
pass
else:
raise Exception('opt should have already attribute {0}'.format(attr))
def _diff_opt(opt1, opt2):
attr1 = set(_get_slots(opt1))
attr2 = set(_get_slots(opt2))
diff1 = attr1 - attr2
diff2 = attr2 - attr1
if diff1 != set():
raise Exception('more attribute in opt1 {0}'.format(list(diff1)))
if diff2 != set():
raise Exception('more attribute in opt2 {0}'.format(list(diff2)))
for attr in attr1:
if attr in ['_cache_paths', '_cache_consistencies']:
continue
err1 = False
err2 = False
val1 = None
val2 = None
try:
val1 = getattr(opt1, attr)
msg1 = "exists"
tval = val1
except:
err1 = True
msg1 = "not exists"
try:
val2 = getattr(opt2, attr)
msg2 = "exists"
tval = val2
except:
err2 = True
msg2 = "not exists"
if not err1 == err2:
raise ValueError("{0} {1} before but {2} after for {3}: {4}".format(attr, msg1, msg2, opt1.impl_getname(), tval))
if val1 is None:
assert val1 == val2
elif attr == '_children':
assert val1[0] == val2[0]
for index, _opt in enumerate(val1[1]):
assert _opt._name == val2[1][index]._name
elif attr == '_requires':
if val1 == val2 == []:
pass
else:
for idx1, req1 in enumerate(val1):
for idx2, req2 in enumerate(val1[idx1]):
for idx3, req3 in enumerate(val1[idx1][idx2][0]):
assert val1[idx1][idx2][0][idx3][0].impl_getname() == val2[idx1][idx2][0][idx3][0].impl_getname()
assert val1[idx1][idx2][0][idx3][1] == val2[idx1][idx2][0][idx3][1]
assert val1[idx1][idx2][1:] == val2[idx1][idx2][1:], '{} - {}\n{} - {}'.format(val1, val2, val1[0][0][1:], val2[0][0][1:])
elif attr == '_opt':
assert val1._name == val2._name
elif attr == '_consistencies':
# dict is only a cache
if isinstance(val1, list):
for index, consistency in enumerate(val1):
assert consistency[0] == val2[index][0]
for idx, opt in enumerate(consistency[1]):
assert opt._name == val2[index][1][idx]._name
elif attr == '_val_call':
for idx, v in enumerate(val1):
if v is None:
assert val2[idx] is None
else:
assert v[0] == val2[idx][0]
if len(v) == 2:
if v[1] is not None:
for key, values in v[1].items():
for i, value in enumerate(values):
if isinstance(value, tuple) and value[0] is not None:
assert v[1][key][i][0].impl_getname() == val2[idx][1][key][i][0].impl_getname()
assert v[1][key][i][1] == val2[idx][1][key][i][1]
else:
assert v[1][key][i] == val2[idx][1][key][i]
else:
assert v[1] == val2[idx][1]
elif attr == '_leadership':
assert val1._p_._sm_get_leader().impl_getname() == val2._p_._sm_get_leader().impl_getname()
sval1 = [opt.impl_getname() for opt in val1._p_._sm_get_followers()]
sval2 = [opt.impl_getname() for opt in val2._p_._sm_get_followers()]
assert sval1 == sval2
elif attr == '_subdyn':
try:
assert val1.impl_getname() == val2.impl_getname()
except AttributeError:
assert val1 == val2
elif attr == '_dependencies':
assert len(val1) == len(val2), "_dependencies has not same len: {} - {}".format(val1, val2)
lst1 = []
lst2 = []
for idx, val in enumerate(val1):
if isinstance(val, Leadership):
lst1.append(val._p_.leader.impl_getname())
else:
lst1.append(val.impl_getname())
for idx, val in enumerate(val2):
if isinstance(val, Leadership):
lst2.append(val._p_.leader.impl_getname())
else:
lst2.append(val.impl_getname())
assert set(lst1) == set(lst2), '{} - {}'.format(lst1, lst2)
elif attr == '_cache_force_store_values':
for idx, tup in enumerate(val1):
assert tup[0] == val2[idx][0]
assert tup[1].impl_getname() == val2[idx][1].impl_getname()
elif attr in ['_extra', '_information']:
dico1 = {}
dico2 = {}
assert len(val1[0]) == len(val2[0])
assert set(val1[0]) == set(val2[0])
for idx, val in enumerate(val1[0]):
idx2 = val1[0].index(val)
assert val1[1][idx] == val1[1][idx2]
else:
#print(attr, val1, val2)
assert val1 == val2, "error for {}".format(attr)
def _diff_opts(opt1, opt2):
_diff_opt(opt1, opt2)
if isinstance(opt1, OptionDescription) or isinstance(opt1, DynOptionDescription):
children1 = set([opt.impl_getname() for opt in opt1.impl_getchildren(dyn=False)])
children2 = set([opt.impl_getname() for opt in opt2.impl_getchildren(dyn=False)])
diff1 = children1 - children2
diff2 = children2 - children1
if diff1 != set():
raise Exception('more attribute in opt1 {0}'.format(list(diff1)))
if diff2 != set():
raise Exception('more attribute in opt2 {0}'.format(list(diff2)))
for child in children1:
_diff_opts(opt1._getattr(child, dyn=False), opt2._getattr(child, dyn=False))
def _diff_conf(cfg1, cfg2):
attr1 = set(_get_slots(cfg1))
attr2 = set(_get_slots(cfg2))
diff1 = attr1 - attr2
diff2 = attr2 - attr1
if diff1 != set():
raise Exception('more attribute in cfg1 {0}'.format(list(diff1)))
if diff2 != set():
raise Exception('more attribute in cfg2 {0}'.format(list(diff2)))
for attr in attr1:
if attr in ('_impl_context', '__weakref__'):
continue
err1 = False
err2 = False
val1 = None
val2 = None
try:
val1 = getattr(cfg1, attr)
except:
err1 = True
try:
val2 = getattr(cfg2, attr)
except:
err2 = True
assert err1 == err2
if val1 is None:
assert val1 == val2
elif attr == '_impl_values':
assert cfg1.cfgimpl_get_values().get_modified_values() == cfg2.cfgimpl_get_values().get_modified_values()
elif attr == '_impl_settings':
assert cfg1.cfgimpl_get_settings().get_modified_properties() == cfg2.cfgimpl_get_settings().get_modified_properties()
assert cfg1.cfgimpl_get_settings().get_modified_permissives() == cfg2.cfgimpl_get_settings().get_modified_permissives()
elif attr == '_impl_descr':
_diff_opt(cfg1.cfgimpl_get_description(), cfg2.cfgimpl_get_description())
elif attr == '_impl_children':
for index, _opt in enumerate(val1):
_diff_conf(_opt, val2[index])
elif attr == '_impl_name':
#FIXME
pass
else:
assert val1 == val2
def test_diff_opt():
b = BoolOption('b', '')
disabled_property = Calculation(calc_value,
Params(ParamValue('disabled'),
kwargs={'condition': ParamOption(b),
'expected': ParamValue(True),
'reverse_condition': ParamValue(True)}))
u = StrOption('u', '', properties=(disabled_property,))
s = SymLinkOption('s', u)
o = OptionDescription('o', '', [b, u, s])
o1 = OptionDescription('o1', '', [o])
raises(NotImplementedError, "dumps(o1)")
@pytest.mark.asyncio
async def test_diff_information_config():
b = BoolOption('b', '')
b.impl_set_information('info', 'oh')
b.impl_set_information('info1', 'oh')
b.impl_set_information('info2', 'oh')
o = OptionDescription('o', '', [b])
o1 = OptionDescription('o1', '', [o])
d = await Config(o1)
c = d._config_bag.context
raises(NotImplementedError, "dumps(c)")
def test_only_optiondescription():
b = BoolOption('b', '')
b
raises(NotImplementedError, "a = dumps(b)")