rougail-user-data-yaml/tests/test_file.py

262 lines
9.7 KiB
Python

from .custom import CustomOption
from pathlib import Path
from pytest import fixture
from json import load, dump
from ruamel.yaml import YAML
from rougail import Rougail
from rougail.config import get_rougail_config
from rougail.user_data_file import RougailUserDataFile
EXT = "yml"
dico_dirs = Path('../rougail/tests/dictionaries')
# path configuration
ROUGAILCONFIG = get_rougail_config(backward_compatibility=False, add_extra_options=False)
test_ok = set()
excludes = set()
test_ok -= excludes
for test in dico_dirs.iterdir():
if (test / 'tiramisu').is_dir() and test.name not in excludes:
test_ok.add(test.name)
# test_ok = ['60_6family_dynamic_sub_dynamic_empty']
test_ok = list(test_ok)
test_ok.sort()
@fixture(scope="module", params=test_ok)
def test_dir(request):
return request.param
def test_dictionaries_all(test_dir):
"tests the output"
_test_dictionaries(test_dir, 'all')
def test_dictionaries_all_exclude(test_dir):
"tests the output"
_test_dictionaries(test_dir, 'all', True)
def test_dictionaries_mandatories(test_dir):
"tests the output"
_test_dictionaries(test_dir, 'mandatories')
def _get_rougailconfig(test_dir):
"rougail config settings"
rougailconfig = ROUGAILCONFIG.copy()
rougailconfig['step.user_data'] = ['file']
rougailconfig['main_namespace'] = 'rougail'
rougailconfig['modes_level'] = ["basic", "standard", "advanced"]
dirs = [str(dico_dirs / test_dir / 'dictionaries' / 'rougail')]
rougailconfig['custom_types']['custom'] = CustomOption
rougailconfig['main_dictionaries'] = dirs
rougailconfig['functions_files'] = ['../rougail/tests/eosfunc/test.py']
# rougailconfig["tiramisu_cache"] = "p.py"
return rougailconfig
def _test_dictionaries(test_dir, level, need_exclude=False):
rougailconfig = _get_rougailconfig(test_dir)
# populate tests if not already exists
dest_dir = Path('tests') / 'results' / test_dir
populate(dest_dir, rougailconfig, level, need_exclude)
# loads the config in the tiramisu's meaning
rougail = Rougail(rougailconfig)
config = rougail.run()
# loading the file
if need_exclude:
filename = dest_dir / 'file' / f'{level}_exclude.{EXT}'
else:
filename = dest_dir / 'file' / f'{level}.{EXT}'
rougailconfig['file.filename'] = [str(filename)]
# loads variables in the tiramisu config
user_data = RougailUserDataFile(config,
rougailconfig=rougailconfig,
).run()
errors = rougail.user_datas(user_data)
#expected output
with open(Path('tests') / 'results' / test_dir / 'makedict' / f'{level}.json') as json_file:
expected = load(json_file)
# here is the effective test
config.property.read_only()
config_dict = dict(option_value(config.value.get()))
errors_file = Path('tests') / 'results' / test_dir / 'errors' / f'{level}.json'
if not errors_file.is_file():
errors_file.parent.mkdir(parents=True, exist_ok=True)
with open(errors_file, 'a') as json_file:
dump(errors, json_file, indent=4)
with open(errors_file) as json_file:
expected_errors = load(json_file)
assert expected_errors == errors
assert expected == config_dict
def populate(dest_dir, rougailconfig, level, need_exclude):
if need_exclude:
filename = dest_dir / 'file' / f'{level}_exclude.{EXT}'
else:
filename = dest_dir / 'file' / f'{level}.{EXT}'
makedict_file = dest_dir / 'makedict' / f'{level}.json'
if not filename.is_file() or not makedict_file.is_file():
if need_exclude:
excludes = []
else:
excludes = None
config = Rougail(rougailconfig).run()
config.property.read_only()
root_config = config.unrestraint
if level == 'all':
only = False
else:
only = True
values = {}
get_variables(root_config, root_config, values, only, excludes)
if need_exclude:
for exclude in excludes:
_values = values
*s_exclude, name = exclude.split('.')
for _name in s_exclude:
if _name not in _values:
break
_values = _values[_name]
else:
if name in _values:
del _values[name]
if not filename.is_file():
filename.parent.mkdir(parents=True, exist_ok=True)
yaml = YAML()
with filename.open('w') as fh:
yaml.dump(values, fh)
if not makedict_file.is_file():
makedict_file.parent.mkdir(parents=True, exist_ok=True)
config.property.read_only()
config_dict = dict(option_value(config.value.get()))
with makedict_file.open('w') as fh:
dump(config_dict, fh, indent=4)
fh.write('\n')
def get_value(variable, index):
if 'force_store_value' in variable.property.get():
return variable.value.get()
tests = variable.information.get('test', None)
if tests:
tests = list(tests)
else:
if variable.type() == 'integer':
tests = [1, 2, 3]
elif variable.type() == 'float':
tests = [1.1, 2.2, 3.3]
elif variable.type() == 'port':
tests = ['80', '443']
elif variable.type() == 'boolean':
tests = [True]
elif variable.type() == 'domain name':
tests = ['domain1.lan', 'domain2.lan']
elif variable.type() == 'choice':
tests = variable.value.list()
elif variable.type() == 'network address':
if variable.extra('_cidr'):
tests = ['192.168.1.0/24', '10.0.0.0/24']
else:
tests = ['192.168.1.0', '10.0.0.0']
elif variable.type() == 'netmask address':
tests = ['255.255.255.0', '255.255.0.0']
elif variable.type() == 'IP':
if variable.extra('_cidr'):
tests = ['192.168.1.6/32', '10.0.10.0/24']
else:
tests = ['192.168.1.6', '10.0.10.10']
else:
tests = ['string1', 'string2', 'string3']
if not variable.ismulti():
tests = tests[0]
elif index is not None and variable.isfollower() and variable.issubmulti() is False:
tests = tests[index]
variable.value.set(tests)
if index is not None and variable.isleader():
tests = tests[index]
return tests
def get_variables(root_config, config, values, only, excludes, *, index=None, leader_is_mandatory=False):
for idx, key in enumerate(config):
if key.isoptiondescription():
if key.isleadership():
value = []
leader = key.leader()
if only and not leader.value.mandatory():
leader_value = leader.value.get()
leader_is_mandatory = False
else:
leader_value = get_value(leader, None)
leader_is_mandatory = True
for idx_, val in enumerate(leader_value):
value.append({})
get_variables(root_config, key, value[-1], only, excludes, index=idx_, leader_is_mandatory=leader_is_mandatory)
if value:
values[key.name()] = value
else:
value = {}
get_variables(root_config, key, value, only, excludes)
if value:
values[key.name()] = value
if key.isdynamic(only_self=True):
identifier = key.identifiers(only_self=True, uncalculated=True)
exclude = key.information.get('dynamic_variable',
None,
)
if excludes is not None and exclude:
identifiers = key.identifiers()[:-1]
if identifiers:
identifiers.reverse()
for identifier in identifiers:
exclude = exclude.replace('{{ identifier }}', str(identifier), 1)
if not root_config.option(exclude).value.default():
excludes.append(exclude)
else:
if only:
if key.isleader():
mandatory = leader_is_mandatory
else:
try:
mandatory = key.value.mandatory()
except:
mandatory = False
if not only or mandatory:
if key.index() is not None and index is not None and index != key.index():
continue
value = get_value(key, index)
values[key.name()] = value
def option_value(parent, key_is_option=False):
for option, value in parent.items():
if option.isoptiondescription():
if not key_is_option and option.isleadership():
ret = []
for idx, datas in enumerate(option_value(value, key_is_option=True)):
sub_option, sub_value = datas
if not idx:
sub_option = sub_option.path()
key = sub_option
for val in sub_value:
ret.append({sub_option: val})
else:
index = sub_option.index()
sub_option = sub_option.path()
ret[index][sub_option] = sub_value
yield key, ret
else:
yield from option_value(value, key_is_option)
elif key_is_option:
yield option, value
else:
yield option.path(), value