470 lines
18 KiB
Python
470 lines
18 KiB
Python
|
#!/usr/bin/env python3
|
||
|
from rougail import Rougail, RougailConfig
|
||
|
from argparse import ArgumentParser
|
||
|
from pathlib import Path
|
||
|
#from pprint import pprint
|
||
|
from yaml import safe_load, dump
|
||
|
from json import dumps
|
||
|
from tiramisu.error import ValueOptionError, PropertiesOptionError
|
||
|
from traceback import print_exc
|
||
|
import tabulate as tabulate_module
|
||
|
from tabulate import tabulate
|
||
|
|
||
|
|
||
|
INV_DIR = 'inventory'
|
||
|
OPS_INV_DIR = 'Ops/group_vars'
|
||
|
ROUGAIL_VARIABLE_TYPE = 'https://forge.cloud.silique.fr/stove/rougail/src/branch/main/doc/variable/README.md#le-type-de-la-variable'
|
||
|
|
||
|
|
||
|
class Inventory:
|
||
|
|
||
|
###############################################
|
||
|
# Create TIRAMISU object
|
||
|
###############################################
|
||
|
|
||
|
def __init__(self):
|
||
|
tabulate_module.PRESERVE_WHITESPACE = True
|
||
|
RougailConfig['dictionaries_dir'] = ['Rougail/socle']
|
||
|
RougailConfig['variable_namespace'] = 'socle'
|
||
|
RougailConfig['tiramisu_cache'] = 'socle.py'
|
||
|
catalog_dir = Path('Rougail')
|
||
|
for extra in catalog_dir.iterdir():
|
||
|
if not extra.is_dir() or \
|
||
|
extra.name in ['socle', 'host', 'jinja_cache', INV_DIR]:
|
||
|
continue
|
||
|
RougailConfig['extra_dictionaries'][extra.name] = [extra.name]
|
||
|
RougailConfig['functions_file'] = 'Rougail/functions.py'
|
||
|
self.errors = []
|
||
|
self.warnings = []
|
||
|
|
||
|
def load(self):
|
||
|
rougail = Rougail()
|
||
|
self.conf = rougail.get_config()
|
||
|
self.conf.property.read_write()
|
||
|
self.objectspace = rougail.converted
|
||
|
# self.objectspace.annotate()
|
||
|
|
||
|
###############################################
|
||
|
# Read Ops file
|
||
|
###############################################
|
||
|
|
||
|
def load_inventory(self):
|
||
|
inventory_dir = Path(OPS_INV_DIR)
|
||
|
if not inventory_dir.is_dir():
|
||
|
return
|
||
|
with open('Ops/host.yml') as fh:
|
||
|
hostnames = fh.read().strip().split('\n')
|
||
|
self.conf.option('socle.hostnames').value.set(hostnames)
|
||
|
for file in inventory_dir.iterdir():
|
||
|
if file.suffix not in ['.yml', '.yaml']:
|
||
|
continue
|
||
|
with open(file) as fh:
|
||
|
values = safe_load(fh)
|
||
|
if not isinstance(values, dict):
|
||
|
continue
|
||
|
for key, value in values.items():
|
||
|
self.read_inventory(self.conf.option('socle'),
|
||
|
key,
|
||
|
value,
|
||
|
file,
|
||
|
)
|
||
|
|
||
|
def read_inventory(self,
|
||
|
conf,
|
||
|
key,
|
||
|
value,
|
||
|
file,
|
||
|
*,
|
||
|
index=None,
|
||
|
):
|
||
|
sub_conf = conf.option(key, index)
|
||
|
try:
|
||
|
isoptiondescription = sub_conf.isoptiondescription()
|
||
|
except AttributeError as err:
|
||
|
self.warnings.append(f'"{key}" est inconnu dans "{conf.path()}" mais est défini dans "{file}"')
|
||
|
return
|
||
|
if isoptiondescription:
|
||
|
if not sub_conf.isleadership():
|
||
|
if not isinstance(value, dict):
|
||
|
print('pffff')
|
||
|
return
|
||
|
for sub_key, sub_value in value.items():
|
||
|
self.read_inventory(sub_conf,
|
||
|
sub_key,
|
||
|
sub_value,
|
||
|
file,
|
||
|
)
|
||
|
else:
|
||
|
if not isinstance(value, list):
|
||
|
return
|
||
|
leader_option = sub_conf.leader().name()
|
||
|
leader_value = []
|
||
|
for leader in value:
|
||
|
if leader_option not in leader:
|
||
|
print('pffff')
|
||
|
return
|
||
|
leader_value.append(leader[leader_option])
|
||
|
self.read_inventory(sub_conf,
|
||
|
leader_option,
|
||
|
leader_value,
|
||
|
file,
|
||
|
)
|
||
|
for idx, sub_value in enumerate(value):
|
||
|
for sub_key, sub_value in sub_value.items():
|
||
|
if sub_key == leader_option:
|
||
|
continue
|
||
|
self.read_inventory(sub_conf,
|
||
|
sub_key,
|
||
|
sub_value,
|
||
|
file,
|
||
|
index=idx,
|
||
|
)
|
||
|
else:
|
||
|
try:
|
||
|
sub_conf.value.set(value)
|
||
|
except ValueOptionError as err:
|
||
|
self.errors.append(str(err).replace('"', "'"))
|
||
|
except PropertiesOptionError as err:
|
||
|
self.warnings.append(f'"{err}" mais est défini dans "{file}"')
|
||
|
|
||
|
###############################################
|
||
|
# Host
|
||
|
###############################################
|
||
|
|
||
|
def get_hosts(self):
|
||
|
return self.conf.option('socle.hostnames').value.get()
|
||
|
|
||
|
###############################################
|
||
|
# Search unspecified mandatories variables
|
||
|
###############################################
|
||
|
|
||
|
def mandatory(self):
|
||
|
if self.errors:
|
||
|
return
|
||
|
for idx, option in enumerate(self.conf.value.mandatory()):
|
||
|
if not idx:
|
||
|
self.errors.append("Les variables suivantes sont obligatoires mais n'ont pas de valeur :")
|
||
|
self.errors.append(f' - {option.doc()}')
|
||
|
|
||
|
###############################################
|
||
|
# Tiramisu to inventory
|
||
|
###############################################
|
||
|
|
||
|
def display(self):
|
||
|
ret = {}
|
||
|
if self.errors:
|
||
|
ret['_errors'] = self.errors
|
||
|
else:
|
||
|
self.conf.property.read_only()
|
||
|
for line, value in self.conf.value.get().items():
|
||
|
self.parse_line(line,
|
||
|
line,
|
||
|
value,
|
||
|
ret,
|
||
|
False,
|
||
|
)
|
||
|
if len(ret) == 1:
|
||
|
ret = ret[list(ret)[0]]
|
||
|
if self.warnings:
|
||
|
ret['_warnings'] = self.warnings
|
||
|
print(dumps(ret))
|
||
|
|
||
|
def parse_line(self,
|
||
|
full_path,
|
||
|
line,
|
||
|
value,
|
||
|
dico,
|
||
|
leadership,
|
||
|
):
|
||
|
if '.' in line:
|
||
|
# it's a dict
|
||
|
family, variable = line.split('.', 1)
|
||
|
if '.' not in variable and self.conf.option(full_path.rsplit('.', 1)[0]).isleadership():
|
||
|
dico.setdefault(family, [])
|
||
|
leadership = True
|
||
|
else:
|
||
|
dico.setdefault(family, {})
|
||
|
leadership = False
|
||
|
self.parse_line(full_path,
|
||
|
variable,
|
||
|
value,
|
||
|
dico[family],
|
||
|
leadership,
|
||
|
)
|
||
|
elif leadership:
|
||
|
# it's a leadership
|
||
|
for val in value:
|
||
|
dico.append({k.rsplit('.', 1)[-1]: v for k, v in val.items()})
|
||
|
else:
|
||
|
try:
|
||
|
dico[line] = value
|
||
|
except ValueOptionError as err:
|
||
|
self.errors.append(str(err).replace('"', "'"))
|
||
|
|
||
|
###############################################
|
||
|
# DOC
|
||
|
###############################################
|
||
|
|
||
|
def display_doc(self):
|
||
|
examples_mini = {}
|
||
|
examples_all = {}
|
||
|
print(f'---\ngitea: none\ninclude_toc: true\n---\n')
|
||
|
print(f'# Variables')
|
||
|
for family_path in self.objectspace.parents['.']:
|
||
|
self.display_families(family_path,
|
||
|
family_path,
|
||
|
2,
|
||
|
False,
|
||
|
examples_mini,
|
||
|
examples_all,
|
||
|
)
|
||
|
if examples_mini:
|
||
|
print(f'# Example with mandatories variables')
|
||
|
print()
|
||
|
print('```')
|
||
|
print(dump(examples_mini, default_flow_style=False, sort_keys=False), end='')
|
||
|
print('```')
|
||
|
print()
|
||
|
print(f'# Example with all variables')
|
||
|
print()
|
||
|
print('```')
|
||
|
print(dump(examples_all, default_flow_style=False, sort_keys=False), end='')
|
||
|
print('```')
|
||
|
|
||
|
def display_families(self,
|
||
|
family_path,
|
||
|
true_family_path,
|
||
|
level,
|
||
|
family_type,
|
||
|
examples_mini,
|
||
|
examples_all,
|
||
|
):
|
||
|
variables = []
|
||
|
for idx, child_name in enumerate(self.objectspace.parents[true_family_path]):
|
||
|
if child_name in self.objectspace.variables:
|
||
|
properties = self.objectspace.properties[child_name]
|
||
|
if ('hidden' in properties and properties['hidden'] is True) or \
|
||
|
('disabled' in properties and properties['disabled'] is True):
|
||
|
continue
|
||
|
variable = self.objectspace.paths[child_name]
|
||
|
variables.append(self.display_variable(variable,
|
||
|
level,
|
||
|
family_type,
|
||
|
family_path,
|
||
|
idx,
|
||
|
examples_mini,
|
||
|
examples_all,
|
||
|
))
|
||
|
else:
|
||
|
if variables:
|
||
|
print(tabulate(variables, headers=['Parameter', 'Comment'], tablefmt="github"))
|
||
|
print()
|
||
|
variables = []
|
||
|
|
||
|
family = self.objectspace.paths[child_name]
|
||
|
if family.type == 'dynamic':
|
||
|
for value in family.variable.default:
|
||
|
sub_family_path = f'{family_path}.{family.name.replace("{{ suffix }}", value)}'
|
||
|
family_name = sub_family_path.rsplit('.', 1)[-1]
|
||
|
examples_mini[family_name] = {}
|
||
|
examples_all[family_name] = {}
|
||
|
self.display_family(family,
|
||
|
sub_family_path,
|
||
|
level,
|
||
|
)
|
||
|
self.display_families(sub_family_path,
|
||
|
child_name,
|
||
|
level + 1,
|
||
|
family.type,
|
||
|
examples_mini[family_name],
|
||
|
examples_all[family_name],
|
||
|
)
|
||
|
if not examples_mini[family_name]:
|
||
|
del examples_mini[family_name]
|
||
|
else:
|
||
|
if family.type == 'leadership':
|
||
|
examples_mini[family.name] = []
|
||
|
examples_all[family.name] = []
|
||
|
else:
|
||
|
examples_mini[family.name] = {}
|
||
|
examples_all[family.name] = {}
|
||
|
self.display_family(family,
|
||
|
child_name,
|
||
|
level,
|
||
|
)
|
||
|
sub_family_path = f'{family_path}.{family.name}'
|
||
|
self.display_families(sub_family_path,
|
||
|
child_name,
|
||
|
level + 1,
|
||
|
family.type,
|
||
|
examples_mini[family.name],
|
||
|
examples_all[family.name],
|
||
|
)
|
||
|
if not examples_mini[family.name]:
|
||
|
del examples_mini[family.name]
|
||
|
if variables:
|
||
|
print(tabulate(variables, headers=['Parameter', 'Comment'], tablefmt="github"))
|
||
|
print()
|
||
|
|
||
|
def display_family(self,
|
||
|
family,
|
||
|
family_path,
|
||
|
level,
|
||
|
):
|
||
|
display_path = family_path.split(".", 1)[-1]
|
||
|
if family.name != family.description:
|
||
|
title = f'{family.description} ({display_path})'
|
||
|
else:
|
||
|
title = f'{display_path}'
|
||
|
print('#' * level, title)
|
||
|
print()
|
||
|
informations = self.objectspace.informations.get(family.path)
|
||
|
if 'help' in informations:
|
||
|
print(informations['help'])
|
||
|
print()
|
||
|
if family.type == 'leadership':
|
||
|
print('This family is a leadership.')
|
||
|
print()
|
||
|
|
||
|
def display_variable(self,
|
||
|
variable,
|
||
|
level,
|
||
|
family_type,
|
||
|
family_path,
|
||
|
index,
|
||
|
examples_mini,
|
||
|
examples_all,
|
||
|
):
|
||
|
parameter = f'**{variable.name}**'
|
||
|
subparameter = []
|
||
|
if 'mandatory' in self.objectspace.properties[variable.path]:
|
||
|
subparameter.append('mandatory')
|
||
|
mandatory = True
|
||
|
else:
|
||
|
mandatory = False
|
||
|
if variable.path in self.objectspace.multis:
|
||
|
if family_type == 'leadership' and index:
|
||
|
multi = self.objectspace.multis[variable.path] == 'submulti'
|
||
|
else:
|
||
|
multi = True
|
||
|
else:
|
||
|
multi = False
|
||
|
if multi:
|
||
|
subparameter.append('multiple')
|
||
|
if subparameter:
|
||
|
parameter += "<br/>`" + "`, `".join(subparameter) + '`'
|
||
|
parameter += f'<br/>**Type:** [`{variable.type}`]({ROUGAIL_VARIABLE_TYPE})'
|
||
|
comment = []
|
||
|
if variable.name != variable.description:
|
||
|
comment.append(variable.description + '.')
|
||
|
informations = self.objectspace.informations.get(variable.path)
|
||
|
if 'help' in informations:
|
||
|
help_ = ' '.join([h.strip() for h in informations['help'].split('\n')])
|
||
|
if not help_.endswith('.'):
|
||
|
help_ += '.'
|
||
|
comment.append(help_)
|
||
|
if variable.default is not None:
|
||
|
default = variable.default
|
||
|
comment.append(f'**Default:** {default}')
|
||
|
if variable.type == 'choice':
|
||
|
if isinstance(variable.choices, list):
|
||
|
comment.append(f'**Choices:** {", ".join(variable.choices)}')
|
||
|
else:
|
||
|
comment.append(f'**Choices:** see variable "{variable.choices.variable.split(".", 1)[-1]}"')
|
||
|
#choice
|
||
|
example_mini = None
|
||
|
example_all = None
|
||
|
properties = self.conf.option(f'{family_path}.{variable.name}').property.get()
|
||
|
if 'hidden' in properties or 'disabled' in properties:
|
||
|
pass
|
||
|
elif variable.test:
|
||
|
example = variable.test
|
||
|
if not multi:
|
||
|
example = example[0]
|
||
|
title = 'Example'
|
||
|
if mandatory:
|
||
|
example_mini = example
|
||
|
example_all = example
|
||
|
else:
|
||
|
if mandatory:
|
||
|
example_mini = example
|
||
|
example_all = example
|
||
|
example = ', '.join(example)
|
||
|
if len(variable.test) > 1:
|
||
|
title = 'Examples'
|
||
|
else:
|
||
|
title = 'Example'
|
||
|
comment.append(f'**{title}:** {example}')
|
||
|
elif variable.default is not None:
|
||
|
example = variable.default
|
||
|
example_all = example
|
||
|
else:
|
||
|
example = 'xxx'
|
||
|
if mandatory:
|
||
|
example_mini = example
|
||
|
example_all = example
|
||
|
if family_type == 'leadership':
|
||
|
if not index:
|
||
|
if example_mini is not None:
|
||
|
for mini in example_mini:
|
||
|
examples_mini.append({variable.name: mini})
|
||
|
if example_all is not None:
|
||
|
for mall in example_all:
|
||
|
examples_all.append({variable.name: mall})
|
||
|
else:
|
||
|
if example_mini is not None:
|
||
|
for idx in range(0, len(examples_mini)):
|
||
|
examples_mini[idx][variable.name] = example_mini
|
||
|
if example_all is not None:
|
||
|
for idx in range(0, len(examples_all)):
|
||
|
examples_all[idx][variable.name] = example_all
|
||
|
else:
|
||
|
if example_mini is not None:
|
||
|
examples_mini[variable.name] = example_mini
|
||
|
if example_all is not None:
|
||
|
examples_all[variable.name] = example_all
|
||
|
parameter += ' ' * (250 - len(parameter))
|
||
|
comment = '<br/>'.join(comment)
|
||
|
comment += ' ' * (250 - len(comment))
|
||
|
return parameter, comment
|
||
|
|
||
|
###############################################
|
||
|
# MAIN
|
||
|
###############################################
|
||
|
|
||
|
def main(args):
|
||
|
inventory = Inventory()
|
||
|
inventory.load()
|
||
|
if args.list:
|
||
|
inventory.load_inventory()
|
||
|
print(dumps({'group': {
|
||
|
'hosts': inventory.get_hosts(),
|
||
|
'vars': {}
|
||
|
}
|
||
|
})
|
||
|
)
|
||
|
elif args.doc:
|
||
|
inventory.display_doc()
|
||
|
else:
|
||
|
inventory.load_inventory()
|
||
|
inventory.mandatory()
|
||
|
inventory.display()
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
parser = ArgumentParser()
|
||
|
parser.add_argument('--list', action='store_true')
|
||
|
parser.add_argument('--host', action='store')
|
||
|
parser.add_argument('--debug', action='store_true')
|
||
|
parser.add_argument('--doc', action='store_true')
|
||
|
args = parser.parse_args()
|
||
|
try:
|
||
|
main(args)
|
||
|
except Exception as err:
|
||
|
if args.debug:
|
||
|
print_exc()
|
||
|
if args.doc:
|
||
|
exit(f'ERROR: {err}')
|
||
|
|
||
|
print(dumps({'_errors': str(err)}))
|