rougail-tutorials/Rougail/inventory.py

469 lines
18 KiB
Python
Executable file

#!/usr/bin/env python3
from rougail import Rougail, RougailConfig
from argparse import ArgumentParser
from pathlib import Path
#from pprint import pprint
from yaml import safe_load, dump
from json import dumps
from tiramisu.error import ValueOptionError, PropertiesOptionError
from traceback import print_exc
import tabulate as tabulate_module
from tabulate import tabulate
INV_DIR = 'inventory'
OPS_INV_DIR = 'Ops/group_vars'
ROUGAIL_VARIABLE_TYPE = 'https://forge.cloud.silique.fr/stove/rougail/src/branch/main/doc/variable/README.md#le-type-de-la-variable'
class Inventory:
###############################################
# Create TIRAMISU object
###############################################
def __init__(self):
tabulate_module.PRESERVE_WHITESPACE = True
RougailConfig['dictionaries_dir'] = ['Rougail/socle']
RougailConfig['variable_namespace'] = 'socle'
RougailConfig['tiramisu_cache'] = 'socle.py'
catalog_dir = Path('Rougail')
for extra in catalog_dir.iterdir():
if not extra.is_dir() or \
extra.name in ['socle', 'host', 'jinja_cache', INV_DIR]:
continue
RougailConfig['extra_dictionaries'][extra.name] = [extra.name]
RougailConfig['functions_file'] = 'Rougail/functions.py'
self.errors = []
self.warnings = []
def load(self):
rougail = Rougail()
self.conf = rougail.get_config()
self.conf.property.read_write()
self.objectspace = rougail.converted
# self.objectspace.annotate()
###############################################
# Read Ops file
###############################################
def load_inventory(self):
inventory_dir = Path(OPS_INV_DIR)
if not inventory_dir.is_dir():
return
with open('Ops/host.yml') as fh:
hostnames = fh.read().strip().split('\n')
self.conf.option('socle.hostnames').value.set(hostnames)
for file in inventory_dir.iterdir():
if file.suffix not in ['.yml', '.yaml']:
continue
with open(file) as fh:
values = safe_load(fh)
if not isinstance(values, dict):
continue
for key, value in values.items():
self.read_inventory(self.conf.option('socle'),
key,
value,
file,
)
def read_inventory(self,
conf,
key,
value,
file,
*,
index=None,
):
sub_conf = conf.option(key, index)
try:
isoptiondescription = sub_conf.isoptiondescription()
except AttributeError as err:
self.warnings.append(f'"{key}" est inconnu dans "{conf.path()}" mais est défini dans "{file}"')
return
if isoptiondescription:
if not sub_conf.isleadership():
if not isinstance(value, dict):
print('pffff')
return
for sub_key, sub_value in value.items():
self.read_inventory(sub_conf,
sub_key,
sub_value,
file,
)
else:
if not isinstance(value, list):
return
leader_option = sub_conf.leader().name()
leader_value = []
for leader in value:
if leader_option not in leader:
print('pffff')
return
leader_value.append(leader[leader_option])
self.read_inventory(sub_conf,
leader_option,
leader_value,
file,
)
for idx, sub_value in enumerate(value):
for sub_key, sub_value in sub_value.items():
if sub_key == leader_option:
continue
self.read_inventory(sub_conf,
sub_key,
sub_value,
file,
index=idx,
)
else:
try:
sub_conf.value.set(value)
except ValueOptionError as err:
self.errors.append(str(err).replace('"', "'"))
except PropertiesOptionError as err:
self.warnings.append(f'"{err}" mais est défini dans "{file}"')
###############################################
# Host
###############################################
def get_hosts(self):
return self.conf.option('socle.hostnames').value.get()
###############################################
# Search unspecified mandatories variables
###############################################
def mandatory(self):
if self.errors:
return
for idx, option in enumerate(self.conf.value.mandatory()):
if not idx:
self.errors.append("Les variables suivantes sont obligatoires mais n'ont pas de valeur :")
self.errors.append(f' - {option.doc()}')
###############################################
# Tiramisu to inventory
###############################################
def display(self):
ret = {}
if self.errors:
ret['_errors'] = self.errors
else:
self.conf.property.read_only()
for line, value in self.conf.value.get().items():
self.parse_line(line,
line,
value,
ret,
False,
)
if len(ret) == 1:
ret = ret[list(ret)[0]]
if self.warnings:
ret['_warnings'] = self.warnings
print(dumps(ret))
def parse_line(self,
full_path,
line,
value,
dico,
leadership,
):
if '.' in line:
# it's a dict
family, variable = line.split('.', 1)
if '.' not in variable and self.conf.option(full_path.rsplit('.', 1)[0]).isleadership():
dico.setdefault(family, [])
leadership = True
else:
dico.setdefault(family, {})
leadership = False
self.parse_line(full_path,
variable,
value,
dico[family],
leadership,
)
elif leadership:
# it's a leadership
for val in value:
dico.append({k.rsplit('.', 1)[-1]: v for k, v in val.items()})
else:
try:
dico[line] = value
except ValueOptionError as err:
self.errors.append(str(err).replace('"', "'"))
###############################################
# DOC
###############################################
def display_doc(self):
examples_mini = {}
examples_all = {}
print(f'---\ngitea: none\ninclude_toc: true\n---\n')
print(f'# Variables')
for family_path in self.objectspace.parents['.']:
self.display_families(family_path,
family_path,
2,
False,
examples_mini,
examples_all,
)
if examples_mini:
print(f'# Example with mandatories variables')
print()
print('```')
print(dump(examples_mini, default_flow_style=False, sort_keys=False), end='')
print('```')
print()
print(f'# Example with all variables')
print()
print('```')
print(dump(examples_all, default_flow_style=False, sort_keys=False), end='')
print('```')
def display_families(self,
family_path,
true_family_path,
level,
family_type,
examples_mini,
examples_all,
):
variables = []
for idx, child_name in enumerate(self.objectspace.parents[true_family_path]):
if child_name in self.objectspace.variables:
properties = self.objectspace.properties[child_name]
if ('hidden' in properties and properties['hidden'] is True) or \
('disabled' in properties and properties['disabled'] is True):
continue
variable = self.objectspace.paths[child_name]
variables.append(self.display_variable(variable,
level,
family_type,
family_path,
idx,
examples_mini,
examples_all,
))
else:
if variables:
print(tabulate(variables, headers=['Parameter', 'Comment'], tablefmt="github"))
print()
variables = []
family = self.objectspace.paths[child_name]
if family.type == 'dynamic':
for value in family.variable.default:
sub_family_path = f'{family_path}.{family.name.replace("{{ suffix }}", value)}'
family_name = sub_family_path.rsplit('.', 1)[-1]
examples_mini[family_name] = {}
examples_all[family_name] = {}
self.display_family(family,
sub_family_path,
level,
)
self.display_families(sub_family_path,
child_name,
level + 1,
family.type,
examples_mini[family_name],
examples_all[family_name],
)
if not examples_mini[family_name]:
del examples_mini[family_name]
else:
if family.type == 'leadership':
examples_mini[family.name] = []
examples_all[family.name] = []
else:
examples_mini[family.name] = {}
examples_all[family.name] = {}
self.display_family(family,
child_name,
level,
)
sub_family_path = f'{family_path}.{family.name}'
self.display_families(sub_family_path,
child_name,
level + 1,
family.type,
examples_mini[family.name],
examples_all[family.name],
)
if not examples_mini[family.name]:
del examples_mini[family.name]
if variables:
print(tabulate(variables, headers=['Parameter', 'Comment'], tablefmt="github"))
print()
def display_family(self,
family,
family_path,
level,
):
display_path = family_path.split(".", 1)[-1]
if family.name != family.description:
title = f'{family.description} ({display_path})'
else:
title = f'{display_path}'
print('#' * level, title)
print()
informations = self.objectspace.informations.get(family.path)
if 'help' in informations:
print(informations['help'])
print()
if family.type == 'leadership':
print('This family is a leadership.')
print()
def display_variable(self,
variable,
level,
family_type,
family_path,
index,
examples_mini,
examples_all,
):
parameter = f'**{variable.name}**'
subparameter = []
if 'mandatory' in self.objectspace.properties[variable.path]:
subparameter.append('mandatory')
mandatory = True
else:
mandatory = False
if variable.path in self.objectspace.multis:
if family_type == 'leadership' and index:
multi = self.objectspace.multis[variable.path] == 'submulti'
else:
multi = True
else:
multi = False
if multi:
subparameter.append('multiple')
if subparameter:
parameter += "<br/>`" + "`, `".join(subparameter) + '`'
parameter += f'<br/>**Type:** [`{variable.type}`]({ROUGAIL_VARIABLE_TYPE})'
comment = []
if variable.name != variable.description:
comment.append(variable.description + '.')
informations = self.objectspace.informations.get(variable.path)
if 'help' in informations:
help_ = ' '.join([h.strip() for h in informations['help'].split('\n')])
if not help_.endswith('.'):
help_ += '.'
comment.append(help_)
if variable.default is not None:
default = variable.default
comment.append(f'**Default:** {default}')
if variable.type == 'choice':
if isinstance(variable.choices, list):
comment.append(f'**Choices:** {", ".join(variable.choices)}')
else:
comment.append(f'**Choices:** see variable "{variable.choices.variable.split(".", 1)[-1]}"')
#choice
example_mini = None
example_all = None
properties = self.conf.option(f'{family_path}.{variable.name}').property.get()
if 'hidden' in properties or 'disabled' in properties:
pass
elif variable.test:
example = variable.test
if not multi:
example = example[0]
title = 'Example'
if mandatory:
example_mini = example
example_all = example
else:
if mandatory:
example_mini = example
example_all = example
example = ', '.join(example)
if len(variable.test) > 1:
title = 'Examples'
else:
title = 'Example'
comment.append(f'**{title}:** {example}')
elif variable.default is not None:
example = variable.default
example_all = example
else:
example = 'xxx'
if mandatory:
example_mini = example
example_all = example
if family_type == 'leadership':
if not index:
if example_mini is not None:
for mini in example_mini:
examples_mini.append({variable.name: mini})
if example_all is not None:
for mall in example_all:
examples_all.append({variable.name: mall})
else:
if example_mini is not None:
for idx in range(0, len(examples_mini)):
examples_mini[idx][variable.name] = example_mini
if example_all is not None:
for idx in range(0, len(examples_all)):
examples_all[idx][variable.name] = example_all
else:
if example_mini is not None:
examples_mini[variable.name] = example_mini
if example_all is not None:
examples_all[variable.name] = example_all
parameter += ' ' * (250 - len(parameter))
comment = '<br/>'.join(comment)
comment += ' ' * (250 - len(comment))
return parameter, comment
###############################################
# MAIN
###############################################
def main(args):
inventory = Inventory()
inventory.load()
if args.list:
inventory.load_inventory()
print(dumps({'group': {
'hosts': inventory.get_hosts(),
'vars': {}
}
})
)
elif args.doc:
inventory.display_doc()
else:
inventory.load_inventory()
inventory.mandatory()
inventory.display()
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument('--list', action='store_true')
parser.add_argument('--host', action='store')
parser.add_argument('--debug', action='store_true')
parser.add_argument('--doc', action='store_true')
args = parser.parse_args()
try:
main(args)
except Exception as err:
if args.debug:
print_exc()
if args.doc:
exit(f'ERROR: {err}')
print(dumps({'_errors': str(err)}))