rougail-user-data-yaml/inventory.py
2024-08-05 09:57:10 +02:00

678 lines
27 KiB
Python

#!/usr/bin/env python3
from argparse import ArgumentParser
from pathlib import Path
from yaml import safe_load
from json import dumps
from traceback import print_exc
from os import environ
import logging
from rich.tree import Tree
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from ansible.parsing.vault import VaultLib, PromptVaultSecret
from ansible.module_utils._text import to_bytes
from tiramisu import PasswordOption, Config
from tiramisu.error import ValueOptionError, PropertiesOptionError, LeadershipError
from tiramisu.i18n import _
import rougail
from rougail import Rougail, RougailConfig
from rougail.utils import normalize_family
from rougail.annotator import CONVERT_OPTION
NAMESPACE = "socle"
NAMESPACE_HOSTS = "hosts"
VERSION_FILE = None
def tiramisu_display_name(kls) -> str:
"""Replace the Tiramisu display_name function to display path + description"""
try:
doc = kls.impl_get_information("doc", None)
name = kls.impl_getname()
except AttributeError:
doc = kls.opt.impl_get_information('doc', None)
name = kls.opt.impl_getname()
comment = f" ({doc})" if doc and doc != name else ""
return f"{kls.impl_getpath()}{comment}"
rougail.tiramisu_display_name = tiramisu_display_name
class DSOFPasswordOption(PasswordOption):
def __init__(self,
*args,
min_len=12,
max_len=None,
forbidden_char=[],
**kwargs):
extra = {}
extra = {'min_len': min_len}
if max_len is not None:
extra['max_len'] = max_len
if forbidden_char:
extra['forbidden_char'] = set(forbidden_char)
super().__init__(*args, extra=extra, **kwargs)
def validate(self,
value: str) -> None:
super().validate(value)
if len(value) <= self.impl_get_extra('min_len'):
raise ValueError(f'il faut au minimum {self.impl_get_extra("min_len")} caractères')
max_len = self.impl_get_extra('max_len')
if max_len and len(value) > max_len:
raise ValueError(f'il faut au maximum {max_len} caractères')
if self.impl_get_extra("forbidden_char"):
forbidden_char = set(value) & self.impl_get_extra("forbidden_char")
if forbidden_char:
raise ValueError(f'ne doit pas avoir les caracteres speciaux {",".join(forbidden_char)}')
CONVERT_OPTION["secret"] = dict(opttype="DSOFPasswordOption")
class DSOFRougail(Rougail):
def get_config(self):
"""Get Tiramisu Config"""
if not self.config:
tiram_obj = self.converted.save(self.rougailconfig["tiramisu_cache"])
optiondescription = {}
exec(tiram_obj, {"DSOFPasswordOption": DSOFPasswordOption}, optiondescription) # pylint: disable=W0122
self.config = Config(
optiondescription["option_0"],
display_name=tiramisu_display_name,
)
self.config.property.read_write()
for mode in self.rougailconfig['modes_level']:
self.config.permissive.add(mode)
return self.config
class Inventory:
###############################################
# Create TIRAMISU object
###############################################
def __init__(self, args, for_doc=False):
self.args = args
sub_inventory_dir = Path('inventaires') / 'rougail'
self.inside_git_dir = False
inventory_dir = None
if for_doc:
socle_inventory_dir = Path(__file__).parent.parent / 'inventaires' / "rougail"
if socle_inventory_dir.is_dir:
inventory_dir = str(socle_inventory_dir)
if not inventory_dir and not sub_inventory_dir.is_dir():
if self.args.debug:
print(f'cannot find {sub_inventory_dir}')
for git_file in [Path.cwd().parent / 'inventaires' / "rougail",
Path.cwd() / 'packaging' / 'inventaires' / "rougail",
]:
if git_file.is_dir():
if self.args.debug:
print('inside git dir !')
self.inside_git_dir = True
inventory_dir = git_file
break
if not inventory_dir:
inventory_dir = sub_inventory_dir
RougailConfig['dictionaries_dir'] = [inventory_dir]
RougailConfig['variable_namespace'] = NAMESPACE
RougailConfig['extra_dictionaries'][NAMESPACE_HOSTS] = ['inventaires/hosts/']
if for_doc:
project_name = Path().cwd().name.rsplit('-', 1)[0]
versions = {project_name: {'docker_version': None}}
else:
with VERSION_FILE.open() as fh:
versions = safe_load(fh)
for project, version in versions.items():
if "docker_version" not in version:
continue
project_name = self.get_project_name(project)
# get the first directory in ~/DSOF-SIx-xxxx/<version>/
if for_doc:
root_path = Path.cwd() / 'packaging'
elif self.inside_git_dir:
root_path = Path.cwd().parent.parent.parent / (project + '-packaging') / 'packaging'
else:
root_path = Path.home() / project / version['packaging_version']
if root_path.is_dir():
root_path = next(root_path.iterdir())
path = root_path / sub_inventory_dir
if path.is_dir():
RougailConfig['extra_dictionaries'][project_name] = [str(path)]
elif self.args.debug:
print(f'cannot find {path}')
#FIXME
#RougailConfig['tiramisu_cache'] = 'socle.py'
RougailConfig['functions_file'] = 'Rougail/functions.py'
self.errors = []
self.warnings = []
self.console = Console(force_terminal=True)
def get_project_name(self, project):
return normalize_family(project.split('-', 2)[-1])
def load(self):
rougail = DSOFRougail()
self.conf = rougail.get_config()
self.conf.property.read_write()
self.objectspace = rougail.converted
# self.objectspace.annotate()
###############################################
# Read Ops file
###############################################
def load_custom(self):
if not CUSTOM_FILE or not CUSTOM_FILE.is_file():
if self.args.debug:
print(f'cannot find {CUSTOM_FILE}')
return
prompt = PromptVaultSecret(INVENTORY_PASSWORD, 'default', ["Vault password: "])
if not INVENTORY_PASSWORD:
prompt.load()
vault = VaultLib([('default', prompt)])
with CUSTOM_FILE.open('rb') as fh:
values = safe_load(vault.decrypt(fh.read()))
for key, value in values.items():
self.read_inventory(self.conf,
key,
value,
CUSTOM_FILE,
)
def set_env(self):
if not CURRENT_ENV and self.inside_git_dir:
current_env = 'git'
else:
current_env = CURRENT_ENV
if not current_env:
self.errors.append(f"la variable d'environnement \"current_env\" est obligatoire")
return
self.conf.option(f"socle.env_name").value.set(current_env)
def set_versions(self):
if self.errors:
return
with VERSION_FILE.open() as fh:
versions = safe_load(fh)
for project, version in versions.items():
project_name = self.get_project_name(project)
if "docker_version" not in version:
continue
try:
self.conf.option(f"{project_name}.compose.image_tag").value.set(version["docker_version"])
except AttributeError:
pass
def read_inventory(self,
conf,
key,
value,
file,
*,
index=None,
):
sub_conf = conf.option(key, index)
try:
isoptiondescription = sub_conf.isoptiondescription()
except AttributeError as err:
# ugly
try:
try:
paths, options = conf.get()._children
except:
paths, options = conf.get().opt._children
if '{{ suffix }}' in paths:
option = options[paths.index('{{ suffix }}')]
path = option._suffixes.params.args[0].option._path
# ugly, assume that dynamic variable are in parent family
if '{{ suffix }}' in path:
cnt = path.count('.')
sub_path = sub_conf._path
subcnt = sub_path.count('.')
path = sub_path.rsplit('.', subcnt - cnt + 1)[0] + '.' + path.rsplit('.', 1)[-1]
values = self.conf.option(path).value.get()
key = normalize_family(key)
values.append(key)
self.conf.option(path).value.set(values)
self.read_inventory(conf, key, value, file, index=index)
return
except Exception:
if args.debug:
print_exc()
pass
if conf == self.conf:
path = NAMESPACE
else:
path = conf.path()
self.warnings.append(f'"{key}" est inconnu dans "{path}" mais est défini dans "{file}"')
return
except LeadershipError as err:
if args.debug:
print_exc()
self.errors.append(str(err))
return
if isoptiondescription:
if not sub_conf.isleadership():
if not isinstance(value, dict):
print('pffff1')
return
for sub_key, sub_value in value.items():
self.read_inventory(sub_conf,
sub_key,
sub_value,
file,
)
else:
if not isinstance(value, list):
return
leader_option = sub_conf.leader().name()
leader_value = []
for leader in value:
if leader_option not in leader:
self.warnings.append(f'cannot find leader "{sub_conf.leader().path()} in {list(leader)}')
return
leader_value.append(leader[leader_option])
self.read_inventory(sub_conf,
leader_option,
leader_value,
file,
)
for idx, sub_value in enumerate(value):
for sub_key, sub_value in sub_value.items():
if sub_key == leader_option:
continue
self.read_inventory(sub_conf,
sub_key,
sub_value,
file,
index=idx,
)
else:
try:
# sub_conf.permissive.set(frozenset(['advanced']))
sub_conf.value.set(value)
except ValueOptionError as err:
if args.debug:
print_exc()
self.errors.append(str(err).replace('"', "'"))
except PropertiesOptionError as err:
if args.debug:
print_exc()
self.warnings.append(f'"{err}" mais est défini dans "{file}"')
###############################################
# Host
###############################################
def get_hosts(self):
ret = {"_meta": {"hostvars": {}}, "all": {"children": ["ungrouped"]}}
if self.errors:
ret["_meta"]["hostvars"]["localhost"] = {'_errors': self.errors}
ret["ungrouped"] = {"hosts": ["localhost"]}
else:
inventories = {}
self.conf.property.read_only()
for line, value in self.conf.value.get().items():
self.parse_line(line,
line,
'',
value,
inventories,
False,
False,
)
if NAMESPACE_HOSTS not in inventories:
return ret
hostnames = inventories[NAMESPACE_HOSTS]['hostnames']
ret_hosts = {}
for name, hosts in hostnames.items():
if 'hosts' in hosts:
for idx, host in enumerate(hosts['hosts']):
index = str(idx + 1)
if idx < 9:
index = '0' + index
host_name = hosts['prefix_name'] + index
ret_hosts.setdefault(name, {})[host_name] = host
ret.setdefault(name, {}).setdefault('hosts', []).append(host_name)
else:
ret["all"]["children"].append(name)
ret[name] = hosts
for hosts in ret_hosts.values():
for host, domain_name in hosts.items():
ret['_meta']['hostvars'][host] = {'ansible_host': domain_name}
ret['_meta']['hostvars'][host].update(inventories)
#if CURRENT_NAMESPACE != NAMESPACE:
# ret['_meta']['hostvars'][host][CURRENT_NAMESPACE] = inventories[CURRENT_NAMESPACE]
# print(ret_hosts)
# ret['_meta']['hostvars']['localhost'] = inventories
return ret
def display_hosts(self):
self.conf.property.read_only()
hostnames = self.conf.option(NAMESPACE_HOSTS).option('hostnames')
ret_hosts = {}
for optiondescription in hostnames.list('optiondescription'):
hosts = optiondescription.value.get()
try:
prefix_name = optiondescription.option('prefix_name').value.get()
except AttributeError:
# it's a group
continue
for idx, host in enumerate(optiondescription.option('hosts').value.get()):
index = str(idx + 1)
if idx < 9:
index = '0' + index
host_name = prefix_name + index
ret_hosts[host_name] = host
print()
header = Table(show_header=False, title="Liste des adresses")
for key, value in ret_hosts.items():
header.add_row(key, value)
self.console.print(header)
###############################################
# Search unspecified mandatories variables
###############################################
def mandatory(self):
title = False
options_with_error = []
for option in self.conf.value.mandatory():
try:
option.value.get()
if not title:
self.errors.append("Les variables suivantes sont obligatoires mais n'ont pas de valeur :")
title = True
self.errors.append(f' - {option.doc()}')
except PropertiesOptionError:
options_with_error.append(option)
if not title:
for idx, option in enumerate(options_with_error):
if not idx:
self.errors.append("Les variables suivantes sont inaccessibles mais sont vides :")
self.errors.append(f' - {option.doc()}')
###############################################
# Tiramisu to inventory
###############################################
def rich(self, ret, tree):
for key, value in ret.items():
if isinstance(value, dict):
subtree = tree.add(f":open_file_folder: {key}",
guide_style="bold bright_blue",
)
self.rich(value, subtree)
elif isinstance(value, list):
subtree = tree.add(f":notebook: {key} :",
guide_style="bold bright_blue",
)
for val in value:
subtree.add(str(val))
else:
tree.add(f":notebook: {key} : {value}")
def rich_display(self):
header_variable = 'Variable\n'
header_variable += '[bright_blue]Variable non documentée[/bright_blue]\n'
header_variable += '[red1]Variable non documentée mais modifiée[/red1]'
if self.args.read_only:
header_variable += '\n[orange1]Variable non modifiable[/orange1]'
header_value = '[gold1]Valeur par défaut[/gold1]\n'
header_value += 'Valeur modifiée\n'
header_value += '([red1]Valeur par défaut originale[/red1])'
header = Table.grid(padding=1, collapse_padding=True)
header.pad_edge = False
header.add_row(header_variable, header_value)
header = Panel.fit(header, title="Légende")
self.console.print(header)
inventories = {}
if self.args.read_only:
self.conf.property.read_only()
else:
self.conf.property.read_write()
for line, value in self.conf.value.get().items():
self.parse_line(line,
line,
'',
value,
inventories,
False,
False,
for_doc=True,
)
for warning in self.warnings:
self.console.print(Tree(f":warning: {warning}"))
if self.errors:
self.display_errors()
tree = Tree(":open_file_folder: Inventaire",
guide_style="bold bright_blue",
)
self.rich(inventories, tree)
self.console.print(tree)
self.display_hosts()
def display_errors(self):
tree = Tree(":stop_sign: ERREURS",
guide_style="bold bright_red",
)
for error in self.errors:
tree.add(error)
self.console.print(tree)
exit(1)
def parse_line(self,
full_path,
line,
parent_path,
value,
dico,
leadership,
family_hidden,
*,
for_doc=False,
):
if '.' in line:
# it's a dict
family, variable = line.split('.', 1)
current_path = parent_path
if current_path:
current_path += '.'
current_path += family
if for_doc:
if 'hidden' in self.conf.option(current_path).property.get() or family_hidden:
family_hidden = True
family = f'[orange1]{family}[/orange1]'
elif 'advanced' in self.conf.option(current_path).property.get():
family = f'[bright_blue]{family}[/bright_blue]'
if '.' not in variable and self.conf.option(full_path.rsplit('.', 1)[0]).isleadership():
dico.setdefault(family, [])
leadership = True
else:
dico.setdefault(family, {})
leadership = False
self.parse_line(full_path,
variable,
current_path,
value,
dico[family],
leadership,
family_hidden,
for_doc=for_doc,
)
elif leadership:
# it's a leadership
for idx, val in enumerate(value):
dic = {k.rsplit('.', 1)[-1]: v for k, v in val.items()}
if for_doc:
leader = True
for k, v in val.items():
if leader:
is_default = self.conf.option(k).owner.isdefault()
properties = self.conf.option(k).property.get()
else:
is_default = self.conf.option(k, idx).owner.isdefault()
properties = self.conf.option(k, idx).property.get()
if self.conf.option(k).type() == _('password') and not self.args.show_password:
v = "*" * 10
subpath = k.rsplit('.', 1)[-1]
if 'hidden' in properties or family_hidden:
subpath = f'[orange1]{subpath}[/orange1]'
elif 'advanced' in properties:
if isdefault:
subpath = f'[bright_blue]{subpath}[/bright_blue]'
else:
subpath = f'[red1]{subpath}[/red1]'
if is_default:
v = '[gold1]' + str(v) + '[/gold1]'
dico.append(f'{subpath}: {v}')
leader = False
else:
dico.append(dic)
else:
# it's a variable
is_default = self.conf.option(full_path).owner.isdefault()
default_value = None
if for_doc:
mod_is_red = False
if not is_default:
true_default_value = self.conf.option(full_path).value.default()
if true_default_value and true_default_value != value:
if isinstance(true_default_value, list):
default_value = [f' ([red1]{true}[/red1])' for true in true_default_value]
else:
default_value = f' ([red1]{true_default_value}[/red1])'
mod_is_red = True
if self.conf.option(full_path).type() == _('password') and not self.args.show_password:
if isinstance(value, list):
value = ["*" * 10 for val in value]
else:
value = "*" * 10
if 'hidden' in self.conf.option(full_path).property.get() or family_hidden:
line = f'[orange1]{line}[/orange1]'
elif 'advanced' in self.conf.option(full_path).property.get():
if not mod_is_red:
line = f'[bright_blue]{line}[/bright_blue]'
else:
line = f'[red1]{line}[/red1]'
if is_default:
if isinstance(value, list):
dico[line] = ['[gold1]' + str(val) + '[/gold1]' for val in value]
else:
dico[line] = '[gold1]' + str(value) + '[/gold1]'
if (for_doc and not is_default) or not for_doc:
if default_value:
if isinstance(value, list):
len_value = len(value)
len_default_value = len(default_value)
len_values = max(len_value, len_default_value)
new_value = []
for idx in range(len_values):
new = ''
if idx < len_value:
new += value[idx]
if idx < len_default_value:
new += default_value[idx]
new_value.append(new)
value = new_value
else:
value = str(value) + default_value
dico[line] = value
def main(args):
inventory = Inventory(args)
inventory.load()
inventory.load_custom()
inventory.set_env()
inventory.set_versions()
if inventory.errors:
inventory.display_errors()
if args.hosts:
inventory.display_hosts()
elif args.list:
print(dumps(inventory.get_hosts(), ensure_ascii=False, indent=2)
)
else:
if not args.no_mandatory:
inventory.mandatory()
if args.host:
# inventory is already set during --list
print(dumps({}))
else:
inventory.rich_display()
def get_argsparse():
parser = ArgumentParser()
parser.add_argument('--list', action='store_true')
parser.add_argument('--host', action='store')
parser.add_argument('--hosts', action='store_true')
parser.add_argument('--debug', action='store_true')
parser.add_argument('--read_only', action='store_true')
parser.add_argument('--no_mandatory', action='store_true')
parser.add_argument('--show_password', action='store_true')
args = parser.parse_args()
if args.debug:
level = logging.DEBUG
logging.basicConfig(
level=level,
format='%(asctime)s - %(levelname)s - %(message)s'
)
return args
if __name__ == "__main__":
if 'CURRENT_ENV' not in environ:
CURRENT_ENV = environ.get('current_env')
else:
CURRENT_ENV = environ['CURRENT_ENV']
if 'VERSION_FILE' not in environ:
VERSION_FILE = Path('versions.yml')
else:
VERSION_FILE = Path(environ['VERSION_FILE'])
if 'INVENTORY_PASSWORD' in environ:
INVENTORY_PASSWORD = environ['INVENTORY_PASSWORD']
INVENTORY_PASSWORD = to_bytes(INVENTORY_PASSWORD, errors='strict', nonstring='simplerepr').strip()
else:
INVENTORY_PASSWORD = None
if CURRENT_ENV:
CUSTOM_FILE = Path.cwd().parent.parent / 'inventaires' / CURRENT_ENV / "custom.yml"
else:
CUSTOM_FILE = Path("custom.yml")
if not VERSION_FILE.is_file():
raise Exception(f'cannot find VERSION_FILE "{VERSION_FILE}"')
try:
args = get_argsparse()
except Exception as err:
print(dumps({'_errors': str(err)}, ensure_ascii=False, indent=2))
exit(1)
try:
main(args)
except Exception as err:
if args.debug:
print_exc()
if args.list or args.host:
print(dumps({'_errors': str(err)}, ensure_ascii=False, indent=2))
exit(1)
else:
tree = Tree(":stop_sign: ERREURS",
guide_style="bold bright_red",
)
tree.add(str(err))
Console().print(tree)
exit(1)