#!/usr/bin/env python3 from rougail import Rougail, RougailConfig from argparse import ArgumentParser from pathlib import Path #from pprint import pprint from yaml import safe_load, dump from json import dumps from tiramisu.error import ValueOptionError, PropertiesOptionError from traceback import print_exc import tabulate as tabulate_module from tabulate import tabulate INV_DIR = 'inventory' OPS_INV_DIR = 'Ops/group_vars' ROUGAIL_VARIABLE_TYPE = 'https://forge.cloud.silique.fr/stove/rougail/src/branch/main/doc/variable/README.md#le-type-de-la-variable' class Inventory: ############################################### # Create TIRAMISU object ############################################### def __init__(self): tabulate_module.PRESERVE_WHITESPACE = True RougailConfig['dictionaries_dir'] = ['Rougail/socle'] RougailConfig['variable_namespace'] = 'socle' RougailConfig['tiramisu_cache'] = 'socle.py' catalog_dir = Path('Rougail') for extra in catalog_dir.iterdir(): if not extra.is_dir() or \ extra.name in ['socle', 'host', 'jinja_cache', INV_DIR]: continue RougailConfig['extra_dictionaries'][extra.name] = [extra.name] RougailConfig['functions_file'] = 'Rougail/functions.py' self.errors = [] self.warnings = [] def load(self): rougail = Rougail() self.conf = rougail.get_config() self.conf.property.read_write() self.objectspace = rougail.converted # self.objectspace.annotate() ############################################### # Read Ops file ############################################### def load_inventory(self): inventory_dir = Path(OPS_INV_DIR) if not inventory_dir.is_dir(): return with open('Ops/host.yml') as fh: hostnames = fh.read().strip().split('\n') self.conf.option('socle.hostnames').value.set(hostnames) for file in inventory_dir.iterdir(): if file.suffix not in ['.yml', '.yaml']: continue with open(file) as fh: values = safe_load(fh) if not isinstance(values, dict): continue for key, value in values.items(): self.read_inventory(self.conf.option('socle'), key, value, file, ) def read_inventory(self, conf, key, value, file, *, index=None, ): sub_conf = conf.option(key, index) try: isoptiondescription = sub_conf.isoptiondescription() except AttributeError as err: self.warnings.append(f'"{key}" est inconnu dans "{conf.path()}" mais est défini dans "{file}"') return if isoptiondescription: if not sub_conf.isleadership(): if not isinstance(value, dict): print('pffff') return for sub_key, sub_value in value.items(): self.read_inventory(sub_conf, sub_key, sub_value, file, ) else: if not isinstance(value, list): return leader_option = sub_conf.leader().name() leader_value = [] for leader in value: if leader_option not in leader: print('pffff') return leader_value.append(leader[leader_option]) self.read_inventory(sub_conf, leader_option, leader_value, file, ) for idx, sub_value in enumerate(value): for sub_key, sub_value in sub_value.items(): if sub_key == leader_option: continue self.read_inventory(sub_conf, sub_key, sub_value, file, index=idx, ) else: try: sub_conf.value.set(value) except ValueOptionError as err: self.errors.append(str(err).replace('"', "'")) except PropertiesOptionError as err: self.warnings.append(f'"{err}" mais est défini dans "{file}"') ############################################### # Host ############################################### def get_hosts(self): return self.conf.option('socle.hostnames').value.get() ############################################### # Search unspecified mandatories variables ############################################### def mandatory(self): if self.errors: return for idx, option in enumerate(self.conf.value.mandatory()): if not idx: self.errors.append("Les variables suivantes sont obligatoires mais n'ont pas de valeur :") self.errors.append(f' - {option.doc()}') ############################################### # Tiramisu to inventory ############################################### def display(self): ret = {} if self.errors: ret['_errors'] = self.errors else: self.conf.property.read_only() for line, value in self.conf.value.get().items(): self.parse_line(line, line, value, ret, False, ) if len(ret) == 1: ret = ret[list(ret)[0]] if self.warnings: ret['_warnings'] = self.warnings print(dumps(ret)) def parse_line(self, full_path, line, value, dico, leadership, ): if '.' in line: # it's a dict family, variable = line.split('.', 1) if '.' not in variable and self.conf.option(full_path.rsplit('.', 1)[0]).isleadership(): dico.setdefault(family, []) leadership = True else: dico.setdefault(family, {}) leadership = False self.parse_line(full_path, variable, value, dico[family], leadership, ) elif leadership: # it's a leadership for val in value: dico.append({k.rsplit('.', 1)[-1]: v for k, v in val.items()}) else: try: dico[line] = value except ValueOptionError as err: self.errors.append(str(err).replace('"', "'")) ############################################### # DOC ############################################### def display_doc(self): examples_mini = {} examples_all = {} print(f'---\ngitea: none\ninclude_toc: true\n---\n') print(f'# Variables') for family_path in self.objectspace.parents['.']: self.display_families(family_path, family_path, 2, False, examples_mini, examples_all, ) if examples_mini: print(f'# Example with mandatories variables') print() print('```') print(dump(examples_mini, default_flow_style=False, sort_keys=False), end='') print('```') print() print(f'# Example with all variables') print() print('```') print(dump(examples_all, default_flow_style=False, sort_keys=False), end='') print('```') def display_families(self, family_path, true_family_path, level, family_type, examples_mini, examples_all, ): variables = [] for idx, child_name in enumerate(self.objectspace.parents[true_family_path]): if child_name in self.objectspace.variables: properties = self.objectspace.properties[child_name] if ('hidden' in properties and properties['hidden'] is True) or \ ('disabled' in properties and properties['disabled'] is True): continue variable = self.objectspace.paths[child_name] variables.append(self.display_variable(variable, level, family_type, family_path, idx, examples_mini, examples_all, )) else: if variables: print(tabulate(variables, headers=['Parameter', 'Comment'], tablefmt="github")) print() variables = [] family = self.objectspace.paths[child_name] if family.type == 'dynamic': for value in family.variable.default: sub_family_path = f'{family_path}.{family.name.replace("{{ suffix }}", value)}' family_name = sub_family_path.rsplit('.', 1)[-1] examples_mini[family_name] = {} examples_all[family_name] = {} self.display_family(family, sub_family_path, level, ) self.display_families(sub_family_path, child_name, level + 1, family.type, examples_mini[family_name], examples_all[family_name], ) if not examples_mini[family_name]: del examples_mini[family_name] else: if family.type == 'leadership': examples_mini[family.name] = [] examples_all[family.name] = [] else: examples_mini[family.name] = {} examples_all[family.name] = {} self.display_family(family, child_name, level, ) sub_family_path = f'{family_path}.{family.name}' self.display_families(sub_family_path, child_name, level + 1, family.type, examples_mini[family.name], examples_all[family.name], ) if not examples_mini[family.name]: del examples_mini[family.name] if variables: print(tabulate(variables, headers=['Parameter', 'Comment'], tablefmt="github")) print() def display_family(self, family, family_path, level, ): display_path = family_path.split(".", 1)[-1] if family.name != family.description: title = f'{family.description} ({display_path})' else: title = f'{display_path}' print('#' * level, title) print() informations = self.objectspace.informations.get(family.path) if 'help' in informations: print(informations['help']) print() if family.type == 'leadership': print('This family is a leadership.') print() def display_variable(self, variable, level, family_type, family_path, index, examples_mini, examples_all, ): parameter = f'**{variable.name}**' subparameter = [] if 'mandatory' in self.objectspace.properties[variable.path]: subparameter.append('mandatory') mandatory = True else: mandatory = False if variable.path in self.objectspace.multis: if family_type == 'leadership' and index: multi = self.objectspace.multis[variable.path] == 'submulti' else: multi = True else: multi = False if multi: subparameter.append('multiple') if subparameter: parameter += "
`" + "`, `".join(subparameter) + '`' parameter += f'
**Type:** [`{variable.type}`]({ROUGAIL_VARIABLE_TYPE})' comment = [] if variable.name != variable.description: comment.append(variable.description + '.') informations = self.objectspace.informations.get(variable.path) if 'help' in informations: help_ = ' '.join([h.strip() for h in informations['help'].split('\n')]) if not help_.endswith('.'): help_ += '.' comment.append(help_) if variable.default is not None: default = variable.default comment.append(f'**Default:** {default}') if variable.type == 'choice': if isinstance(variable.choices, list): comment.append(f'**Choices:** {", ".join(variable.choices)}') else: comment.append(f'**Choices:** see variable "{variable.choices.variable.split(".", 1)[-1]}"') #choice example_mini = None example_all = None properties = self.conf.option(f'{family_path}.{variable.name}').property.get() if 'hidden' in properties or 'disabled' in properties: pass elif variable.test: example = variable.test if not multi: example = example[0] title = 'Example' if mandatory: example_mini = example example_all = example else: if mandatory: example_mini = example example_all = example example = ', '.join(example) if len(variable.test) > 1: title = 'Examples' else: title = 'Example' comment.append(f'**{title}:** {example}') elif variable.default is not None: example = variable.default example_all = example else: example = 'xxx' if mandatory: example_mini = example example_all = example if family_type == 'leadership': if not index: if example_mini is not None: for mini in example_mini: examples_mini.append({variable.name: mini}) if example_all is not None: for mall in example_all: examples_all.append({variable.name: mall}) else: if example_mini is not None: for idx in range(0, len(examples_mini)): examples_mini[idx][variable.name] = example_mini if example_all is not None: for idx in range(0, len(examples_all)): examples_all[idx][variable.name] = example_all else: if example_mini is not None: examples_mini[variable.name] = example_mini if example_all is not None: examples_all[variable.name] = example_all parameter += ' ' * (250 - len(parameter)) comment = '
'.join(comment) comment += ' ' * (250 - len(comment)) return parameter, comment ############################################### # MAIN ############################################### def main(args): inventory = Inventory() inventory.load() if args.list: inventory.load_inventory() print(dumps({'group': { 'hosts': inventory.get_hosts(), 'vars': {} } }) ) elif args.doc: inventory.display_doc() else: inventory.load_inventory() inventory.mandatory() inventory.display() if __name__ == "__main__": parser = ArgumentParser() parser.add_argument('--list', action='store_true') parser.add_argument('--host', action='store') parser.add_argument('--debug', action='store_true') parser.add_argument('--doc', action='store_true') args = parser.parse_args() try: main(args) except Exception as err: if args.debug: print_exc() if args.doc: exit(f'ERROR: {err}') print(dumps({'_errors': str(err)}))