from lxml import etree from os.path import isfile, join, isdir from pytest import fixture, mark from os import listdir, mkdir, environ from json import dump, load, dumps, loads environ['TIRAMISU_LOCALE'] = 'en' from tiramisu import Config from tiramisu.error import PropertiesOptionError dico_dirs = 'tests/dictionaries' test_ok = set() for test in listdir(dico_dirs): if isdir(join(dico_dirs, test)): if isdir(join(dico_dirs, test, 'tiramisu')): test_ok.add(test) debug = False #debug = True excludes = set([]) #excludes = set(['01base_file_utfchar']) test_ok -= excludes #test_ok = ['10check_valid_ipnetmask'] test_ok = list(test_ok) test_ok.sort() @fixture(scope="module", params=test_ok) def test_dir(request): return request.param async def launch_flattener(test_dir, filename, ): makedict_dir = join(test_dir, 'makedict') makedict_file = join(makedict_dir, 'base.json') makedict_before = join(makedict_dir, 'before.json') makedict_after = join(makedict_dir, 'after.json') modulepath = test_dir.replace('/', '.') + f'.tiramisu.{filename}' mod = __import__(modulepath) for token in modulepath.split(".")[1:]: mod = getattr(mod, token) config = await Config(mod.option_0) # change default rights ro_origin = await config.property.getdefault('read_only', 'append') ro_append = frozenset(ro_origin - {'force_store_value'}) rw_origin = await config.property.getdefault('read_write', 'append') rw_append = frozenset(rw_origin - {'force_store_value'}) await config.property.setdefault(ro_append, 'read_only', 'append') await config.property.setdefault(rw_append, 'read_write', 'append') await config.information.set('test_information', 'value') await config.property.read_only() await config.property.pop('mandatory') await config.information.set('info', 'value') config_dict = await config.value.dict(leader_to_list=True) if filename == 'base': if not isdir(makedict_dir): mkdir(makedict_dir) if not isfile(makedict_file) or debug: with open(makedict_file, 'w') as fh: dump(config_dict, fh, indent=4) fh.write('\n') else: config_dict_prefix = {'1': {}, '2': {}} for key, value in config_dict.items(): prefix, path = key.split('.', 1) if value and isinstance(value, list) and isinstance(value[0], dict): new_value = [] for dct in value: new_dct = {} for k, v in dct.items(): k = k.split('.', 1)[-1] new_dct[k] = v new_value.append(new_dct) value = new_value config_dict_prefix[prefix][path] = value assert loads(dumps(config_dict_prefix['1'])) == loads(dumps(config_dict_prefix['2'])) config_dict = config_dict_prefix['1'] if not isfile(makedict_file): raise Exception('dict is not empty') with open(makedict_file, 'r') as fh: assert load(fh) == loads(dumps(config_dict)), f"error in file {makedict_file}" # await value_owner(makedict_before, config, filename) # deploy ro = await config.property.getdefault('read_only', 'append') ro = frozenset(list(ro) + ['force_store_value']) await config.property.setdefault(ro, 'read_only', 'append') rw = await config.property.getdefault('read_write', 'append') rw = frozenset(list(rw) + ['force_store_value']) await config.property.setdefault(rw, 'read_write', 'append') await config.property.add('force_store_value') # await value_owner(makedict_after, config, filename) async def value_owner(makedict_value_owner, config, filename): ret = {} for key in await config.option.list(recursive=True): path = await key.option.path() if await key.option.isfollower(): value = [] owner = [] for idx in range(0, await key.value.len()): try: option = config.option(path, idx) value.append(await option.value.get()) owner.append(await option.owner.get()) except PropertiesOptionError as err: value.append(str(err)) owner.append('error') else: value = await key.value.get() owner = await key.owner.get() ret[path] = {'owner': owner, 'value': value, } if filename == 'base': if not isfile(makedict_value_owner) or debug: with open(makedict_value_owner, 'w') as fh: dump(ret, fh, indent=4) fh.write('\n') else: ret_prefix = {'1': {}, '2': {}} for key, value in ret.items(): prefix, path = key.split('.', 1) ret_prefix[prefix][path] = value assert loads(dumps(ret_prefix['1'])) == loads(dumps(ret_prefix['2'])) ret = ret_prefix['1'] with open(makedict_value_owner, 'r') as fh: assert load(fh) == loads(dumps(ret)), f"error in file {makedict_value_owner}" @mark.asyncio async def test_dictionary(test_dir): test_dir = join(dico_dirs, test_dir) await launch_flattener(test_dir, 'base') @mark.asyncio async def test_dictionary_multi(test_dir): test_dir = join(dico_dirs, test_dir) await launch_flattener(test_dir, 'multi')