rougail-output-ansible/src/rougail/output_ansible/__init__.py

192 lines
8.1 KiB
Python

"""
Silique (https://www.silique.fr)
Copyright (C) 2022-2026
This program is free software: you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from typing import Optional
from json import dumps
from tiramisu import groups
from rougail.tiramisu import normalize_family
from .i18n import _
from .__version__ import __version__
from ..output_json import RougailOutputJson
class RougailOutputAnsible(RougailOutputJson):
output_name = "ansible"
def __init__(
self,
config: "Config",
*,
rougailconfig: "RougailConfig" = None,
**kwargs,
) -> None:
super().__init__(config, rougailconfig=rougailconfig, **kwargs)
def exporter(self) -> None:
self.host_namespace = self.rougailconfig["ansible.host_namespace"]
self.export_warnings = self.rougailconfig["ansible.export_warnings"]
self.no_namespace_in_vars = self.rougailconfig["ansible.no_namespace_in_vars"]
self.hosts = {}
super().exporter()
self.json_to_ansible()
# never return code 1, error are in the output data
return True
def parse_variable(self, option, child, namespace):
if self.support_namespace and namespace and "ansible_host" in option.information.get("tags", tuple()):
hosts = option.value.get()
if not isinstance(hosts, list):
hosts = [hosts]
if namespace in self.hosts:
self.hosts[namespace].update(hosts)
else:
self.hosts[namespace] = hosts
super().parse_variable(option, child, namespace)
def manage_errors(self) -> bool:
if not super().manage_errors():
if not self.support_namespace:
self.errors.append(_("no namespace configured"))
hosts_config = self.config.option(self.host_namespace)
try:
if hosts_config.group_type() != groups.namespace:
hosts_config = None
except AttributeError:
hosts_config = None
if not hosts_config:
if not self.hosts:
self.errors.append(
_('cannot find host namespace "{0}"').format(self.host_namespace)
)
else:
try:
hosts_config.option("hostnames").name()
except AttributeError:
self.errors.append(
_(
'malformated host namespace "{0}", should have the "hostnames" key'
).format(self.host_namespace)
)
# error is added, so replay manage_errors
return super().manage_errors()
return True
def json_to_ansible(self):
ret = {"_meta": {"hostvars": {}}}
namespaces = {}
extra_vars = {}
if "_warnings" in self.dico:
_warnings = self.dico.pop("_warnings")
if self.export_warnings:
ret["_meta"]["hostvars"]["localhost"] = {
"_warnings": _warnings,
}
ret["ungrouped"] = {"hosts": ["localhost"]}
if "_errors" in self.dico:
ret["_meta"]["hostvars"].setdefault("localhost", {})["_errors"] = (
self.dico.pop("_errors")
)
if "ungrouped" not in ret:
ret["ungrouped"] = {"hosts": ["localhost"]}
if self.host_namespace in self.dico:
hosts = self.dico.pop(self.host_namespace)
# manage groups
if "hostnames" not in hosts:
ret["_meta"]["hostvars"].setdefault("localhost", {}).setdefault(
"_errors", []
).append(
_('cannot find "hostnames" in "{0}" namespace').format(
self.host_namespace
)
)
if "ungrouped" not in ret:
ret["ungrouped"] = {"hosts": ["localhost"]}
hostnames = {}
else:
hostnames = hosts["hostnames"]
ret_hosts = {}
for name, hosts in hostnames.items():
if "hosts" in hosts:
if "vars" in hosts and "all" in hosts["vars"]:
current_extra_vars = hosts["vars"]["all"]
else:
current_extra_vars = {}
for idx, host in enumerate(hosts["hosts"]):
index = str(idx + 1)
if idx < 9:
index = "0" + index
if "prefix_name" in hosts:
host_name = hosts["prefix_name"] + index
else:
host_name = host
if "namespaces" in hosts:
namespaces[host] = hosts["namespaces"]
ret_hosts.setdefault(name, {})[host_name] = host
ret.setdefault(name, {}).setdefault("hosts", []).append(
host_name
)
if "vars" in hosts and str(idx) in hosts["vars"]:
extra_vars[host_name] = current_extra_vars.copy()
extra_vars[host_name].update(hosts["vars"][str(idx)])
elif current_extra_vars:
extra_vars[host_name] = current_extra_vars
else:
ret[name] = hosts
# manage hostsnames and vars in hostsname
for hosts in ret_hosts.values():
for host, domain_name in hosts.items():
ret["_meta"]["hostvars"][host] = {"ansible_host": domain_name}
if self.no_namespace_in_vars:
if host in namespaces:
for namespace in namespaces[host]:
if namespace in self.dico:
ret["_meta"]["hostvars"][host].update(self.dico[namespace])
else:
for ns_vars in self.dico.values():
ret["_meta"]["hostvars"][host].update(ns_vars)
else:
if host in namespaces:
for namespace in namespaces[host]:
if namespace in self.dico:
ret["_meta"]["hostvars"][host][namespace] = self.dico[namespace]
else:
ret["_meta"]["hostvars"][host].update(self.dico)
if host in extra_vars:
ret["_meta"]["hostvars"][host].update(extra_vars[host])
# manage hostnames define with tag ansible_host and add groups
for namespace, hosts in self.hosts.items():
if namespace not in ret:
ret[namespace] = {"hosts": []}
for host in hosts:
if host not in ret["_meta"]["hostvars"]:
ret["_meta"]["hostvars"][host] = {"ansible_host": host}
if self.no_namespace_in_vars:
ret["_meta"]["hostvars"][host].update(self.dico[namespace])
elif namespace not in ret["_meta"]["hostvars"][host]:
ret["_meta"]["hostvars"][host][namespace] = self.dico[namespace]
if host not in ret[namespace]["hosts"]:
ret[namespace]["hosts"].append(host)
self.dico = ret
RougailOutput = RougailOutputAnsible
__all__ = ("RougailOutputAnsible",)