dataset/tests/test_applicationservice.py

578 lines
22 KiB
Python
Raw Permalink Normal View History

2022-03-08 19:42:28 +01:00
from typing import Any
from pytest import fixture, mark
from os import makedirs, listdir, link, environ
from os.path import isfile, isdir, join, abspath, basename
from yaml import load, SafeLoader
from json import dump
from shutil import rmtree
from traceback import print_exc
from tiramisu import Config
from tiramisu.error import PropertiesOptionError, LeadershipError
from rougail import RougailConvert, RougailSystemdTemplate, RougailConfig
FUNCTIONS = b"""from tiramisu import valid_network_netmask, valid_ip_netmask, valid_broadcast, valid_in_network, valid_not_equal as valid_differ, valid_not_equal, calc_value
# =============================================================
# fork of risotto-setting/src/risotto_setting/config/config.py
def get_password(**kwargs):
return 'password'
def get_ip(**kwargs):
return '1.1.1.1'
def get_chain(**kwargs):
return 'chain'
def get_certificate(**kwargs):
return 'certificate'
def get_private_key(**kwargs):
return 'private_key'
def get_linked_configuration(**kwargs):
if 'test' in kwargs and kwargs['test']:
return kwargs['test'][0]
if kwargs.get('multi', False) is True:
return ['configuration']
return 'configuration'
def get_certificates(**kwargs):
return ["XXXX", "YYYY", "ZZZZ"]
def zone_information(*args, **kwargs):
if args[1] == 'network':
val = '192.168.0.0/24'
else:
val = '192.168.0.1'
if kwargs.get('multi', False):
val = [val]
return val
# =============================================================
"""
def tiramisu_display_name(kls,
dyn_name: 'Base'=None,
suffix: str=None,
) -> str:
if dyn_name is not None:
name = dyn_name
else:
name = kls.impl_getname()
return name
GENERATED_DIR = 'generated'
if isdir(GENERATED_DIR):
rmtree(GENERATED_DIR)
applications = {}
test_ok = []
for distrib in listdir('seed'):
distrib_dir = join('seed', distrib, 'applicationservice')
if isdir(distrib_dir):
for release in listdir(distrib_dir):
release_dir = join(distrib_dir, release)
if isdir(release_dir):
for applicationservice in listdir(release_dir):
applicationservice_dir = join(release_dir, applicationservice)
if isdir(applicationservice_dir):
if applicationservice in applications:
raise Exception('multi applicationservice')
applications[applicationservice] = applicationservice_dir
if isdir(join(applicationservice_dir, 'dictionaries')):
test_ok.append(applicationservice_dir)
if 'APPLICATIONSERVICE_DIR' in environ:
test_ok = []
if isdir(join(applicationservice_dir, 'dictionaries')):
test_ok.append(applicationservice_dir)
print(test_ok)
@fixture(scope="module", params=test_ok)
def test_dir(request):
return request.param
async def get_value(type: str,
properties: list,
test_values: Any,
multi: bool,
has_dependency: bool,
path,
min_number,
) -> list:
if test_values is not None:
values = list(test_values)
has_dependency = True
elif type == 'hostname':
values = ['test']
elif type == 'domainname':
values = ['test.test.com']
elif type == 'filename':
values = ['/a']
elif type == 'ip':
values = ['192.168.1.1']
elif type == 'ip_cidr':
values = ['192.168.1.1/24']
elif type == 'network':
values = ['192.168.1.0']
elif type == 'network_cidr':
values = ['192.168.1.0/24']
elif type == 'netmask':
values = ['255.255.255.0']
elif type in ['string', 'password']:
values = ['string']
elif type == 'integer':
if min_number is None:
min_number = 0
values = [min_number, min_number + 1]
elif type in 'port':
values = ["80"]
elif type == 'boolean':
values = [True, False]
elif type == 'url':
values = ['http://value']
elif type == 'username':
values = ['myname']
elif type == 'email':
values = ['a@a.com']
elif type == 'date':
values = ['2011-11-11']
elif type == 'float':
values = [1.1]
else:
raise Exception(f'Unknown type {type}')
if not has_dependency:
values = [values[0]]
if multi:
values = [values]
if 'mandatory' not in properties:
if not multi:
values.insert(0, None)
else:
values.insert(0, [])
return values
async def build_values(paths: list,
paths_values: dict,
):
if paths:
path, option, values, _ = paths[0]
for val in values:
new_paths_values = paths_values.copy()
new_paths_values[path] = val
next_paths = paths[1:]
if next_paths:
async for value in build_values(next_paths,
new_paths_values,
):
yield value
else:
yield new_paths_values
async def set_dep_paths(new_paths_no_deps, config, old_paths_values, new_paths_values, leader_followers):
n = new_paths_no_deps.copy()
for path, values in n.items():
dico = await config.value.dict()
if path in dico:
value = new_paths_no_deps[path][0]
await set_value(config, path, value, old_paths_values, new_paths_values, leader_followers, new_paths_no_deps, is_dep=True)
async def set_value(config, path, value, old_paths_values, new_paths_values, leader_followers, new_paths_no_deps, is_dep=False):
setted = False
if path not in old_paths_values or old_paths_values[path] != value:
if isinstance(value, tuple):
value = list(value)
if not await config.option(path).option.isfollower():
if await config.option(path).option.isleader():
old_values = await config.unrestraint.option(path).value.get()
if len(old_values) > len(value):
await config.unrestraint.option(path).value.reset()
leadership_path = path.rsplit('.', 1)[0]
if leadership_path in leader_followers:
for follower, values in leader_followers[leadership_path]:
new_paths_no_deps[follower] = values.copy()
await config.unrestraint.option(path).value.set(value)
setted = True
else:
length = await config.unrestraint.option(path).value.len()
if length:
setted = True
await config.unrestraint.option(path, length-1).value.set(value)
if setted and is_dep:
new_paths_no_deps[path].pop(0)
if not new_paths_no_deps[path]:
del new_paths_no_deps[path]
async def get_values(config, new_paths, new_paths_no_deps, root_dir, leader_followers):
old_paths_values = {}
if not new_paths:
yield await config.value.dict()
async for new_paths_values in build_values(new_paths, {}):
try:
await config.property.read_write()
copy_paths_values = new_paths_values.copy()
for path, value in new_paths_values.items():
await set_value(config, path, value, old_paths_values, new_paths_values, leader_followers, new_paths_no_deps)
if new_paths_no_deps:
await set_dep_paths(new_paths_no_deps, config, old_paths_values, new_paths_values, leader_followers)
await config.property.read_only()
dico = await config.value.dict()
old_paths_values = dico
yield dico
except Exception as err:
print_exc()
print('Tiramisu file ' + join(root_dir, 'dictionary.py'))
print(f'test with {new_paths_values}')
raise Exception(f'error {err}')
def calc_depends(xmls, appname):
application = join(applications[appname], 'applicationservice.yml')
with open(application) as yaml:
app = load(yaml, Loader=SafeLoader)
if 'depends' in app and app['depends']:
for xml in app['depends']:
if xml not in xmls:
xmls.insert(0, xml)
calc_depends(xmls, xml)
def reorder_new_paths(paths, new_paths=None, not_added_paths=None, added_paths=None):
root = new_paths is None
if root:
new_paths = []
added_paths = []
new_not_added_paths = set()
has_old_path = False
for path, option, values, option_dependencies in paths:
if path not in added_paths:
if not_added_paths is not None and not option_dependencies & not_added_paths:
#all dependencies are already managed, so added this paths
new_paths.append((path, option, values, option_dependencies))
added_paths.append(path)
else:
has_old_path = True
new_not_added_paths.add(path)
if has_old_path:
reorder_new_paths(paths, new_paths, new_not_added_paths, added_paths)
if root:
return new_paths
async def populate_paths(paths_done, paths, dependencies, unrestraint, unaccessible_paths, config):
for path in await unrestraint.value.dict():
if path in paths_done:
continue
properties = await config.option(path).property.get(only_raises=True,
uncalculated=True,
)
if properties:
# unmodified path
continue
for unaccessible_path in unaccessible_paths:
if path.startswith(unaccessible_path + '.'):
break
else:
# this variable is not un unaccessible path
option = unrestraint.option(path)
current_dependencies = await option.option.dependencies()
test_values = await option.information.get('test', None)
if await option.option.isfollower():
ismulti = await option.option.issubmulti()
else:
ismulti = await option.option.ismulti()
min_number = None
type = await option.option.type()
if test_values is None:
if type == 'choice':
test_values = await option.value.list()
if not test_values:
test_values = [None]
elif type == 'integer':
real_option = await option.option.get()
min_number = real_option.impl_get_extra('min_number')
elif type == 'domainname':
real_option = await option.option.get()
if real_option.impl_get_extra('_dom_type') in ['netbios', 'hostname']:
type = 'hostname'
elif type == 'ip':
true_option = await option.option.get()
if true_option.impl_get_extra('_cidr'):
type = 'ip_cidr'
elif type == 'network':
true_option = await option.option.get()
if true_option.impl_get_extra('_cidr'):
type = 'network_cidr'
else:
test_values = tuple(test_values)
full_properties = await option.property.get()
for dependency in current_dependencies:
dependencies.add(await dependency.option.path())
paths.append((path,
full_properties,
test_values,
ismulti,
type,
option,
min_number,
))
paths_done.append(path)
@mark.asyncio
async def test_template(test_dir):
if test_dir.startswith('seed/eole'):
return
# if test_dir != 'seed/eole/applicationservice/2020.1.1/eole-lemonldap-ng':
# return
# if test_dir in [
# 'seed/eole/applicationservice/2020.1.1/eole-proxy',
# 'seed/eole/applicationservice/2020.1.1/eole-bareos',
# ]:
# return
if not isdir(GENERATED_DIR):
makedirs(GENERATED_DIR)
test_name = basename(test_dir)
root_dir = f'{GENERATED_DIR}/{test_name}'
tmp_dir = f'{root_dir}/tmp'
funcs_dir = f'{root_dir}/funcs'
for dirname in [root_dir, tmp_dir, funcs_dir]:
if isdir(dirname):
rmtree(dirname)
makedirs(dirname)
print()
print('>', test_dir)
test_file = test_dir.rsplit('/', 1)[-1]
xmls = [test_file]
calc_depends(xmls, test_file)
print(' o dependencies: ' + ', '.join(xmls))
with open(f'{root_dir}/dependencies.txt', 'w') as fh:
dump(xmls, fh)
dictionaries = [join(applications[xml], 'dictionaries') for xml in xmls if isdir(join(applications[xml], 'dictionaries'))]
if 'risotto_setting.rougail' not in RougailConfig['extra_annotators']:
RougailConfig['extra_annotators'].append('risotto_setting.rougail')
RougailConfig['dictionaries_dir'] = dictionaries
print(' o dictionaries: ' + ', '.join(dictionaries))
RougailConfig['extra_dictionaries'] = {}
extra_dictionaries = []
for xml in xmls:
extras_dir = join(applications[xml], 'extras')
if isdir(extras_dir):
for extra in listdir(extras_dir):
extra_dir = join(extras_dir, extra)
if isdir(extra_dir):
RougailConfig['extra_dictionaries'].setdefault(extra, []).append(extra_dir)
extra_dictionaries.append(extra_dir)
if RougailConfig['extra_dictionaries']:
print(' o extra dictionaries: ' + ', '.join(extra_dictionaries))
if isdir('templates'):
rmtree('templates')
makedirs('templates')
func = f'{funcs_dir}/funcs.py'
with open(func, 'wb') as fh:
fh.write(FUNCTIONS)
for xml in xmls:
func_dir = join(applications[xml], 'funcs')
if isdir(func_dir):
for f in listdir(func_dir):
if not f.startswith('__'):
with open(join(func_dir, f), 'rb') as rfh:
fh.write(rfh.read())
templates_dir = join(applications[xml], 'templates')
if isdir(templates_dir):
for f in listdir(templates_dir):
template_file = join(templates_dir, f)
dest_template_file = join('templates', f)
if isfile(dest_template_file):
raise Exception(f'duplicate template {f}')
link(template_file, dest_template_file)
RougailConfig['functions_file'] = func
eolobj = RougailConvert()
xml = eolobj.save(join(root_dir, 'dictionary.py'))
optiondescription = {}
try:
exec(xml, None, optiondescription)
except Exception as err:
print('Tiramisu file ' + join(root_dir, 'dictionary.py'))
print_exc()
raise Exception(f'unknown error when load tiramisu object {err}') from err
config = await Config(optiondescription['option_0'], display_name=tiramisu_display_name)
has_services = False
for option in await config.option.list('optiondescription'):
if await option.option.name() == 'services':
has_services = True
break
if not has_services:
print(f' o number of tests: any template')
rmtree(root_dir)
return
await config.property.read_write()
paths = []
unrestraint = config.unrestraint
# this paths in unaccessible, so variable inside are unaccessible too
unaccessible_paths = []
dependencies = set()
for option in await unrestraint.option.list('optiondescription',
recursive=True,
):
path = await option.option.path()
properties = await config.option(path).property.get(only_raises=True,
uncalculated=True,
)
if properties:
unaccessible_paths.append(path)
for dependency in await option.option.dependencies():
dependencies.add(await dependency.option.path())
paths_done = []
await populate_paths(paths_done, paths, dependencies, unrestraint, unaccessible_paths, config)
all_values = {}
for path, full_properties, test_values, ismulti, type, option, min_number in paths:
values = await get_value(type,
full_properties,
test_values,
ismulti,
path in dependencies,
path,
min_number,
)
all_values[path] = values
if await option.option.isfollower() or len(values) > 1:
continue
if await config.unrestraint.option(path).option.isleader():
old_values = await config.unrestraint.option(path).value.get()
for idx in reversed(range(len(old_values))):
await config.unrestraint.option(path).value.pop(idx)
await config.unrestraint.option(path).value.set(values[0])
await populate_paths(paths_done, paths, dependencies, unrestraint, unaccessible_paths, config)
new_paths = []
number_test = 1
nb_var = 0
for path, full_properties, test_values, ismulti, type, option, min_number in paths:
if path in all_values:
values = all_values[path]
has_values = True
else:
values = await get_value(type,
full_properties,
test_values,
ismulti,
path in dependencies,
path,
min_number,
)
has_values = False
number_test *= len(values)
if await option.option.isfollower() or len(values) > 1:
option_dependencies = {await subopt.option.path() for subopt in await option.option.dependencies()}
new_paths.append((path, option, values, option_dependencies))
nb_var += 1
elif not has_values:
if await config.unrestraint.option(path).option.isleader():
old_values = await config.unrestraint.option(path).value.get()
for idx in reversed(range(len(old_values))):
await config.unrestraint.option(path).value.pop(idx)
await config.unrestraint.option(path).value.set(values[0])
print(' o number of variables: {}'.format(len(paths)))
new_paths = reorder_new_paths(new_paths)
print(f' o number of tested variables: {nb_var}')
print(f' o estimate number of tests: {number_test}')
new_paths_no_deps = {}
leader_followers = {}
if number_test > 1001:
number_test = 1
nb2 = 0
print(f' degraded mode...')
new_paths_2 = []
for path, option, values, option_dependencies in new_paths:
isfollower = await option.option.isfollower()
if isfollower:
has_dependencies = len(option_dependencies) > 1
else:
has_dependencies = len(option_dependencies) > 0
if not has_dependencies:
if isfollower:
leadership_path = path.rsplit('.', 1)[0]
leader_followers.setdefault(leadership_path, []).append((path, values.copy()))
new_paths_no_deps[path] = values
nb2 += len(values) - 1
else:
number_test *= len(values)
new_paths_2.append((path, option, values, option_dependencies))
new_paths = new_paths_2
print(' > degraded variables: {}'.format(len(new_paths_no_deps)))
print(f' > new estimated number of tests: {number_test + nb2}')
idx = 0
real_idx = 0
old_dico = {}
async for dico in get_values(config, new_paths, new_paths_no_deps, root_dir, leader_followers):
try:
real_idx += 1
if str(real_idx).endswith('0'):
print(f' ... {real_idx}')
build_dir = f'{root_dir}/{idx}'
dest_dir = f'{build_dir}/dest'
mandatories = await config.value.mandatory()
if old_dico == dico:
continue
for dirname in [build_dir, dest_dir]:
makedirs(dirname)
old_dico = dico
if mandatories:
print('Tiramisu file ' + join(root_dir, 'dictionary.py'))
raise Exception(f'Mandatory option not configured {mandatories}')
RougailConfig['functions_file'] = func
RougailConfig['tmp_dir'] = tmp_dir
RougailConfig['templates_dir'] = 'templates'
RougailConfig['destinations_dir'] = dest_dir
await launch(config, root_dir, dico)
await config.value.dict()
with open(f'{build_dir}/variables.txt', 'w') as fh:
dump(await config.value.dict(leader_to_list=True), fh, indent=4, sort_keys=True)
idx += 1
except Exception as err:
print_exc()
print('Tiramisu file ' + join(root_dir, 'dictionary.py'))
print(f'test with {dico}')
raise Exception(f'unknown error {err}') from err
print(f' o number of tests: {idx}')
if isdir('templates'):
rmtree('templates')
rmtree(tmp_dir)
rmtree(funcs_dir)
async def launch(config, root_dir, dico):
try:
await config.property.read_only()
engine = RougailSystemdTemplate(config)
await engine.instance_files()
except Exception as err:
print_exc()
print('Tiramisu file ' + join(root_dir, 'dictionary.py'))
print(f'test with {dico}')
raise Exception(f'unknown error {err}') from err