164 lines
5.8 KiB
Python
164 lines
5.8 KiB
Python
import os
|
|
from pytest import fixture # , raises
|
|
from pathlib import Path
|
|
from rougail import Rougail
|
|
#########################
|
|
from dotenv import load_dotenv
|
|
from rougail.user_data_environment import RougailUserDataEnvironment as RougailUserData
|
|
from json import load, dump
|
|
#########################
|
|
|
|
from rougail_tests.utils import get_structures_list, get_rougail_config, get_values_for_config, config_to_dict
|
|
|
|
|
|
EXT = "env"
|
|
|
|
#########################
|
|
#let's save the original environment
|
|
save = os.environ.copy()
|
|
#########################
|
|
|
|
|
|
excludes = []
|
|
excludes = [
|
|
'40_0leadership_follower_default_submulti', # submulti is not allowed
|
|
'40_0leadership_follower_default_submulti_calculation', # submulti is not allowed
|
|
'40_6leadership_follower_multi', # submulti is not allowed
|
|
]
|
|
|
|
test_ok = get_structures_list(excludes)
|
|
# test_ok = [Path('../rougail-tests/structures/60_9extra_dynamic')]
|
|
|
|
|
|
def idfn(fixture_value):
|
|
return fixture_value.name
|
|
|
|
|
|
@fixture(scope="module", params=test_ok, ids=idfn)
|
|
def test_dir(request):
|
|
return request.param
|
|
|
|
|
|
def _test_dictionaries(test_dir, namespace, ext, *, level, need_exclude=False):
|
|
rougailconfig = get_rougail_config(test_dir, namespace)
|
|
if not rougailconfig:
|
|
return
|
|
##################################
|
|
rougailconfig['step.user_data'] = ['environment']
|
|
##################################
|
|
dir_name = 'test'
|
|
if namespace:
|
|
dir_name += '_namespace'
|
|
elif (test_dir / 'force_namespace').is_file():
|
|
return
|
|
rougail = Rougail(rougailconfig)
|
|
config = rougail.run()
|
|
##################################
|
|
root_path = Path('tests') / 'results' / test_dir.name
|
|
makedict = root_path / 'makedict' / f'{level}.json'
|
|
filename = root_path / 'file'
|
|
if need_exclude:
|
|
filename = filename / f'{level}_exclude.{EXT}'
|
|
else:
|
|
filename = filename / f'{level}.{EXT}'
|
|
populate(filename, makedict, rougailconfig, level, need_exclude)
|
|
load_dotenv(str(filename))
|
|
##################################
|
|
# loads variables in the tiramisu config
|
|
generated_user_data = RougailUserData(config, rougailconfig=rougailconfig).run()
|
|
errors = rougail.user_datas(generated_user_data)
|
|
#expected output
|
|
with open(Path('tests') / 'results' / test_dir.name / 'makedict' / f'{level}.json') as json_file:
|
|
expected = load(json_file)
|
|
# here is the effective test
|
|
errors_file = Path('tests') / 'results' / test_dir.name / 'errors' / f'{level}.json'
|
|
if not errors_file.is_file():
|
|
errors_file.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(errors_file, 'a') as json_file:
|
|
dump(errors, json_file, indent=4)
|
|
with open(errors_file) as json_file:
|
|
expected_errors = load(json_file)
|
|
# expected_errors = {
|
|
# 'errors': [],
|
|
# 'warnings': [],
|
|
# }
|
|
assert expected_errors == errors
|
|
#
|
|
config.property.read_only()
|
|
config_dict = dict(config_to_dict(config.value.get()))
|
|
assert expected == config_dict
|
|
#teardown: set the original environement again
|
|
os.environ = save.copy()
|
|
|
|
|
|
def populate(filename, makedict_file, rougailconfig, level, need_exclude):
|
|
if filename.is_file() and makedict_file.is_file():
|
|
return
|
|
config = Rougail(rougailconfig).run()
|
|
values = get_values_for_config(config, not need_exclude, level)
|
|
if not filename.is_file():
|
|
filename.parent.mkdir(parents=True, exist_ok=True)
|
|
with filename.open('w') as fh:
|
|
fh.write('\n'.join(values_to_env(values)) + '\n')
|
|
if not makedict_file.is_file():
|
|
makedict_file.parent.mkdir(parents=True, exist_ok=True)
|
|
config.property.read_only()
|
|
config_dict = dict(config_to_dict(config.value.get()))
|
|
with makedict_file.open('w') as fh:
|
|
dump(config_dict, fh, indent=4)
|
|
fh.write('\n')
|
|
|
|
|
|
def values_to_env(values):
|
|
def parse(root_path, values_, level=-1):
|
|
level += 1
|
|
for key, value in values_.items():
|
|
if level == 0:
|
|
sub_root_path = key
|
|
else:
|
|
sub_root_path = root_path + '.' + key
|
|
if isinstance(value, dict):
|
|
yield from parse(sub_root_path, value, level)
|
|
elif isinstance(value, list):
|
|
if value and isinstance(value[0], dict):
|
|
keys = []
|
|
for ls in value:
|
|
for k in ls:
|
|
if k not in keys:
|
|
keys.append(k)
|
|
for k in keys:
|
|
for val in value:
|
|
sub_val = val.get(k, "")
|
|
if isinstance(sub_val, list):
|
|
raise Exception("submulti is not allowed for environment user_datas")
|
|
yield (sub_root_path + '.' + k).upper() + '="' + ','.join([convert_str(val.get(k, "")) for val in value])+ '"'
|
|
else:
|
|
yield sub_root_path.upper() + '="' + ','.join([convert_str(val) for val in value]) + '"'
|
|
elif value is None or value == 'None':
|
|
yield sub_root_path.upper() + '=""'
|
|
else:
|
|
yield sub_root_path.upper() + '="' + str(value) + '"'
|
|
return list(parse(None, values))
|
|
|
|
|
|
def convert_str(val):
|
|
if isinstance(val, bool):
|
|
return {True: 'true', False: 'false'}.get(val)
|
|
if val is None:
|
|
return ""
|
|
return str(val)
|
|
|
|
|
|
def test_dictionaries_all(test_dir):
|
|
"tests the output"
|
|
_test_dictionaries(test_dir, True, EXT, level='all')
|
|
|
|
|
|
def test_dictionaries_all_exclude(test_dir):
|
|
"tests the output"
|
|
_test_dictionaries(test_dir, True, EXT, level='all', need_exclude=True)
|
|
|
|
|
|
def test_dictionaries_mandatories(test_dir):
|
|
"tests the output"
|
|
_test_dictionaries(test_dir, True, EXT, level='mandatories')
|