rougail-user-data-environment/tests/test_load.py

196 lines
7 KiB
Python

import os
from pytest import fixture # , raises
from pathlib import Path
from rougail import Rougail
#########################
from dotenv import load_dotenv
from rougail.user_data_environment import RougailUserDataEnvironment as RougailUserData
from json import load, dump
#########################
from rougail_tests.utils import get_structures_list, get_rougail_config, get_values_for_config, config_to_dict
EXT = "env"
#########################
#let's save the original environment
save = os.environ.copy()
#########################
excludes = []
excludes = [
'40_0leadership_follower_default_submulti', # submulti is not allowed
'40_0leadership_follower_default_submulti_calculation', # submulti is not allowed
'40_6leadership_follower_multi', # submulti is not allowed
]
test_ok = get_structures_list(excludes)
# test_ok = [Path('../rougail-tests/structures/60_6family_dynamic_sub_dynamic_empty2')]
def idfn(fixture_value):
return fixture_value.name
@fixture(scope="module", params=test_ok, ids=idfn)
def test_dir(request):
return request.param
def _test_dictionaries(test_dir, namespace, ext, *, level, need_exclude=False):
rougailconfig = get_rougail_config(test_dir, namespace)
if not rougailconfig:
return
##################################
rougailconfig['step.user_data'] = ['environment']
##################################
dir_name = 'test'
if namespace:
dir_name += '_namespace'
elif (test_dir / 'force_namespace').is_file():
return
rougail = Rougail(rougailconfig)
config = rougail.run()
##################################
root_path = Path('tests') / 'results' / dir_name / test_dir.name
makedict = root_path / 'makedict' / f'{level}.json'
filename = root_path / 'file'
if need_exclude:
filename = filename / f'{level}_exclude.{EXT}'
else:
filename = filename / f'{level}.{EXT}'
populate(filename, makedict, rougailconfig, level, need_exclude, namespace)
load_dotenv(str(filename))
##################################
# loads variables in the tiramisu config
generated_user_data = RougailUserData(config, rougailconfig=rougailconfig).run()
errors = rougail.user_datas(generated_user_data)
#expected output
with open(Path('tests') / 'results' / dir_name / test_dir.name / 'makedict' / f'{level}.json') as json_file:
expected = load(json_file)
# here is the effective test
errors_file = Path('tests') / 'results' / dir_name / test_dir.name / 'errors' / f'{level}.json'
if not errors_file.is_file():
errors_file.parent.mkdir(parents=True, exist_ok=True)
with open(errors_file, 'a') as json_file:
dump(errors, json_file, indent=4)
with open(errors_file) as json_file:
expected_errors = load(json_file)
# expected_errors = {
# 'errors': [],
# 'warnings': [],
# }
assert expected_errors == errors
#
config.property.read_only()
config_dict = dict(config_to_dict(config.value.get()))
assert expected == config_dict
######################################
#teardown: set the original environement again
os.environ = save.copy()
######################################
def populate(filename, makedict_file, rougailconfig, level, need_exclude, namespace):
if filename.is_file() and makedict_file.is_file():
return
config = Rougail(rougailconfig).run()
values = get_values_for_config(config, not need_exclude, level)
if not filename.is_file():
filename.parent.mkdir(parents=True, exist_ok=True)
with filename.open('w') as fh:
##############################################
fh.write('\n'.join(values_to_env(values, namespace)) + '\n')
##############################################
if not makedict_file.is_file():
makedict_file.parent.mkdir(parents=True, exist_ok=True)
config.property.read_only()
config_dict = dict(config_to_dict(config.value.get()))
with makedict_file.open('w') as fh:
dump(config_dict, fh, indent=4)
fh.write('\n')
##############################################
def values_to_env(values, namespace):
def parse(root_path, values_, level=-1):
level += 1
for key, value in values_.items():
if level == 0:
sub_root_path = key
else:
sub_root_path = root_path + '.' + key
if isinstance(value, dict):
yield from parse(sub_root_path, value, level)
elif isinstance(value, list):
if value and isinstance(value[0], dict):
keys = []
for ls in value:
for k in ls:
if k not in keys:
keys.append(k)
for k in keys:
for val in value:
sub_val = val.get(k, "")
if isinstance(sub_val, list):
raise Exception("submulti is not allowed for environment user_datas")
ns = (sub_root_path + '.' + k).upper()
if not namespace:
ns = "ROUGAIL_" + ns
yield ns + '="' + ','.join([convert_str(val.get(k, "")) for val in value])+ '"'
else:
ns = sub_root_path.upper()
if not namespace:
ns = "ROUGAIL_" + ns
yield ns + '="' + ','.join([convert_str(val) for val in value]) + '"'
elif value is None or value == 'None':
ns = sub_root_path.upper()
if not namespace:
ns = "ROUGAIL_" + ns
yield ns + '=""'
else:
ns = sub_root_path.upper()
if not namespace:
ns = "ROUGAIL_" + ns
yield ns + '="' + str(value) + '"'
return list(parse(None, values))
def convert_str(val):
if isinstance(val, bool):
return {True: 'true', False: 'false'}.get(val)
if val is None:
return ""
return str(val)
##############################################
def test_dictionaries_all(test_dir):
"tests the output"
_test_dictionaries(test_dir, False, EXT, level='all')
def test_dictionaries_all_exclude(test_dir):
"tests the output"
_test_dictionaries(test_dir, False, EXT, level='all', need_exclude=True)
def test_dictionaries_mandatories(test_dir):
"tests the output"
_test_dictionaries(test_dir, False, EXT, level='mandatories')
def test_dictionaries_namespace_all(test_dir):
"tests the output"
_test_dictionaries(test_dir, True, EXT, level='all')
def test_dictionaries_namespace_all_exclude(test_dir):
"tests the output"
_test_dictionaries(test_dir, True, EXT, level='all', need_exclude=True)
def test_dictionaries_namespace_mandatories(test_dir):
"tests the output"
_test_dictionaries(test_dir, True, EXT, level='mandatories')