rougail/src/rougail/tiramisu.py

568 lines
20 KiB
Python

"""Redefine Tiramisu object
Created by:
EOLE (http://eole.orion.education.fr)
Copyright (C) 2005-2018
Forked by:
Cadoles (http://www.cadoles.com)
Copyright (C) 2019-2021
Silique (https://www.silique.fr)
Copyright (C) 2022-2025
This program is free software: you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from typing import Any
from importlib.machinery import SourceFileLoader as _SourceFileLoader
from importlib.util import (
spec_from_loader as _spec_from_loader,
module_from_spec as _module_from_spec,
)
from unicodedata import normalize, combining
from jinja2 import StrictUndefined, DictLoader
from jinja2.sandbox import SandboxedEnvironment
from re import findall
from tiramisu import (
DynOptionDescription,
calc_value,
function_waiting_for_error,
undefined,
)
from tiramisu.error import (
ValueWarning,
ConfigError,
PropertiesOptionError,
CancelParam,
errors,
)
ori_raise_carry_out_calculation_error = errors.raise_carry_out_calculation_error
try:
from .i18n import _
except ModuleNotFoundError:
# FIXME
def _(msg):
return msg
def display_xmlfiles(xmlfiles: list) -> str:
"""The function format xmlfiles informations to generate errors"""
if len(xmlfiles) == 1:
return '"' + xmlfiles[0] + '"'
return '"' + '", "'.join(xmlfiles[:-1]) + '"' + " and " + '"' + xmlfiles[-1] + '"'
def convert_boolean(value: str) -> bool:
"""Boolean coercion. The Rougail XML may contain srings like `True` or `False`"""
if isinstance(value, bool):
return value
value = value.lower()
if value == "true":
return True
elif value == "false":
return False
elif value in ["", None]:
return None
raise Exception(_('unknown boolean value "{0}"').format(value))
_ip_params = {
"cidr": {"description": _("IP must be in CIDR format")},
"private_only": {"description": _("private IP are allowed")},
"allow_reserved": {"description": _("reserved IP are allowed")},
}
_network_params = {
"cidr": {"description": _("network must be in CIDR format")},
"private_only": {"description": _("private network are allowed")},
"allow_reserved": {"description": _("reserved network are allowed")},
}
_port_params = {
"allow_range": {"description": _("can be range of port")},
"allow_protocol": {"description": _("can have the protocol")},
"allow_zero": {"description": _("port 0 is allowed")},
"allow_wellknown": {"description": _("well-known ports (1 to 1023) are allowed")},
"allow_registred": {"description": _("registred ports (1024 to 49151) are allowed")},
"allow_private": {"description": _("private ports (greater than 49152) are allowed")},
}
_domain_params = {
"type": {"description": _("type of domainname"), "choices": ('domainname', 'netbios', 'hostname'), 'doc': _("type {0}")},
"allow_startswith_dot": {"description": _("the domain name can starts by a dot")},
"allow_without_dot": {"description": _("the domain name can be a hostname")},
"allow_ip": {"description": _("the domain name can be an IP")},
"allow_cidr_network": {"description": _("the domain name can be network in CIDR format")},
"test_existence": {"description": _("the domain name must exist")},
}
_web_params = _port_params | _domain_params
CONVERT_OPTION = {
"string": dict(opttype="StrOption", example="example"),
"number": dict(opttype="IntOption",
func=int,
params={
"min_number": {"description": _("the minimum value"), 'doc': _("the minimum value is {0}")},
"max_number": {"description": _("the maximum value"), 'doc': _("the maximum value is {0}")},
},
example=42),
"integer": dict(opttype="IntOption",
params={
"min_integer": {"description": _("the minimum value"), 'doc': _("the minimum value is {0}")},
"max_integer": {"description": _("the maximum value"), 'doc': _("the maximum value is {0}")},
},
func=int,
example=42,
),
"float": dict(opttype="FloatOption", func=float, example=1.42),
"boolean": dict(opttype="BoolOption", func=convert_boolean),
"secret": dict(opttype="PasswordOption",
params={
"min_len": {"description": _("minimum characters length for the secret"), "doc": _("minimum length for the secret is {0} characters")},
"max_len": {"description": _("maximum characters length for the secret"), "doc": _("maximum length for the secret is {0} characters")},
"forbidden_char": {"description": _("forbidden characters"), "doc": _("forbidden characters: {0}")},
},
example="secrets"),
"mail": dict(opttype="EmailOption", example="user@example.net"),
"unix_filename": dict(opttype="FilenameOption",
msg="UNIX filename",
params={
"allow_relative": {"description": _("this filename could be a relative path")},
"test_existence": {"description": _("this file must exist")},
"types": {"description": _("file type allowed"), "doc": _("file type allowed: {0}"), "choices": ("file", "directory"), "multi": True},
},
example="/tmp/myfile.txt"),
"date": dict(opttype="DateOption", example="2000-01-01"),
"unix_user": dict(opttype="UsernameOption", example="username",
msg="UNIX user"
),
"ip": dict(
opttype="IPOption", initkwargs={"allow_reserved": True},
msg="IP",
params=_ip_params,
example="1.1.1.1"
),
"cidr": dict(opttype="IPOption", msg="CIDR", initkwargs={"cidr": True},
params=_ip_params,
example="1.1.1.0/24"),
"netmask": dict(opttype="NetmaskOption", example="255.255.255.0"),
"network": dict(opttype="NetworkOption",
params=_network_params,
example="1.1.1.0"),
"network_cidr": dict(
opttype="NetworkOption", initkwargs={"cidr": True}, example="1.1.1.0/24",
params=_network_params,
msg="network CIDR",
),
"broadcast": dict(opttype="BroadcastOption", example="1.1.1.255"),
"netbios": dict(
opttype="DomainnameOption",
initkwargs={"type": "netbios", "warnings_only": True},
params=_domain_params,
example="example",
),
"domainname": dict(
opttype="DomainnameOption",
initkwargs={"type": "domainname", "allow_ip": False},
params=_domain_params,
example="example.net",
),
"hostname": dict(
opttype="DomainnameOption",
initkwargs={"type": "hostname", "allow_ip": False},
params=_domain_params,
example="example",
),
"web_address": dict(
opttype="URLOption",
initkwargs={"allow_ip": False, "allow_without_dot": True},
msg="web address",
params=_web_params,
example="https://example.net",
),
"port": dict(
opttype="PortOption", initkwargs={"allow_private": True},
params=_port_params,
example="111", func=str,
),
"mac": dict(opttype="MACOption", example="00:00:00:00:00"),
"unix_permissions": dict(
opttype="PermissionsOption",
msg="UNIX permissions",
initkwargs={"warnings_only": True},
func=int,
example="644",
),
"choice": dict(opttype="ChoiceOption", example="a_choice"),
"regexp": dict(opttype="RegexpOption"),
#
"symlink": dict(opttype="SymLinkOption"),
}
RENAME_TYPE = {"number": "integer"}
def get_identifier_from_dynamic_family(true_name, name) -> str:
if true_name == "{{ identifier }}":
return name
regexp = true_name.replace("{{ identifier }}", "(.*)")
finded = findall(regexp, name)
if len(finded) != 1 or not finded[0]:
return None
return finded[0]
def raise_carry_out_calculation_error(subconfig, *args, **kwargs):
try:
ori_raise_carry_out_calculation_error(subconfig, *args, **kwargs)
except ConfigError as err:
ymlfiles = subconfig.config_bag.context.get_values().get_information(
subconfig, "ymlfiles", []
)
raise ConfigError(_("{0} in {1}").format(err, display_xmlfiles(ymlfiles)), subconfig=subconfig)
errors.raise_carry_out_calculation_error = raise_carry_out_calculation_error
global func
dict_env = {}
ENV = SandboxedEnvironment(loader=DictLoader(dict_env), undefined=StrictUndefined)
ENV.add_extension('jinja2.ext.do')
func = ENV.filters
ENV.compile_templates("jinja_caches", zip=None)
class JinjaError:
__slot__ = ("_err",)
def __init__(self, err):
self._err = err
def __str__(self):
raise self._err from self._err
def __repr__(self):
raise self._err from self._err
def __eq__(self, *args, **kwargs):
raise self._err from self._err
def __ge__(self, *args, **kwargs):
raise self._err from self._err
def __gt__(self, *args, **kwargs):
raise self._err from self._err
def __le__(self, *args, **kwargs):
raise self._err from self._err
def __lt__(self, *args, **kwargs):
raise self._err from self._err
def __ne__(self, *args, **kwargs):
raise self._err from self._err
def test_propertyerror(value: Any) -> bool:
return isinstance(value, JinjaError)
ENV.tests["propertyerror"] = test_propertyerror
def load_functions(path, dict_func=None):
global _SourceFileLoader, _spec_from_loader, _module_from_spec, func
if dict_func is None:
dict_func = func
loader = _SourceFileLoader("func", path)
spec = _spec_from_loader(loader.name, loader)
func_ = _module_from_spec(spec)
loader.exec_module(func_)
for function in dir(func_):
if function.startswith("_"):
continue
dict_func[function] = getattr(func_, function)
def normalize_family(family_name: str) -> str:
"""replace space, accent, uppercase, ... by valid character"""
if not family_name:
return
family_name = family_name.lower()
family_name = family_name.replace("-", "_").replace(" ", "_").replace(".", "_")
nfkd_form = normalize("NFKD", family_name)
family_name = "".join([c for c in nfkd_form if not combining(c)])
return family_name.lower()
def tiramisu_display_name(
kls,
subconfig,
with_quote: bool = False,
) -> str:
"""Replace the Tiramisu display_name function to display path + description"""
def get_path():
if description_type in ["description", "name", "name_and_description"]:
path = kls.impl_getname()
else:
path = kls.impl_getpath()
if "{{ identifier }}" in path and subconfig.identifiers:
path = path.replace(
"{{ identifier }}", normalize_family(str(subconfig.identifiers[-1]))
)
return path
config_bag = subconfig.config_bag
context = config_bag.context
values = context.get_values()
context_subconfig = context.get_root(config_bag)
description_type = values.get_information(
context_subconfig, "description_type", "name_and_description"
)
if description_type in ["description", "name_and_description", "path_and_description"]:
doc = values.get_information(subconfig, "doc", None)
description = doc if doc and doc != kls.impl_getname() else ""
if "{{ identifier }}" in description and subconfig.identifiers:
description = description.replace("{{ identifier }}", str(subconfig.identifiers[-1]))
if description_type in ["name", "path", "name_and_description", "path_and_description"]:
path = get_path()
if description_type in ["name_and_description", "path_and_description"]:
if description:
if with_quote:
description = f'"{path}" ({description})'
else:
description = f"{path} ({description})"
else:
if with_quote:
description = f'"{path}"'
else:
description = path
else:
if description_type in ["name", "path"]:
description = path
elif not description:
description = get_path()
if with_quote:
description = f'"{description}"'
return description
def rougail_calc_value(*args, __default_value=None, __internal_multi=False, **kwargs):
values = calc_value(*args, **kwargs)
if values is None and __internal_multi:
values = []
if __default_value is not None and values in [None, []]:
return __default_value
return values
def kw_to_string(kw, root=None):
for name, data in kw.items():
if root is None:
path = name
else:
path = root + "." + name
if isinstance(data, dict):
yield from kw_to_string(data, root=path)
else:
yield f"{path}={data}"
pass
@function_waiting_for_error
def jinja_to_function(
__internal_variable,
__internal_attribute,
__internal_jinja,
__internal_type,
__internal_multi,
__internal_files,
__default_value=None,
**kwargs,
):
global ENV, CONVERT_OPTION
kw = {}
for key, value in kwargs.items():
if isinstance(value, list):
val = []
for v in value:
if isinstance(v, PropertiesOptionError):
v = JinjaError(v)
if v is None:
v = ''
val.append(v)
value = val
else:
if isinstance(value, PropertiesOptionError):
value = JinjaError(value)
if value is None:
value = ''
if "." in key:
c_kw = kw
path, var = key.rsplit(".", 1)
if isinstance(value, CancelParam):
count_o_path = value.origin_path.count(".") - value.current_path.count(
"."
)
path = path.rsplit(".", count_o_path)[0]
for subkey in path.split("."):
c_kw = c_kw.setdefault(subkey, {})
if not isinstance(value, CancelParam):
c_kw[var] = value
else:
if key in kw:
raise ConfigError(
f'internal error, multi key for "{key}" in jinja_to_function'
)
kw[key] = value
try:
values = ENV.get_template(__internal_jinja).render(kw, **func).strip()
except Exception as err:
kw_str = ", ".join(kw_to_string(kw))
prefix = _('cannot calculate the variable "{0}"').format(__internal_variable)
msg = _('the attribute "{0}" in {1} with the parameters "{2}" causes the error: {3}').format(
__internal_attribute,
display_xmlfiles(__internal_files),
kw_str,
err,
)
raise ConfigError(msg, prefix=prefix) from err
convert = CONVERT_OPTION[__internal_type].get("func", str)
if __internal_multi:
values = [
convert(val.strip()) for val in values.split("\n") if val.strip() != ""
]
if not values and __default_value is not None:
return __default_value
return values
try:
values = convert(values)
except Exception as err:
prefix = _('cannot converting the variable "{0}"').format(__internal_variable)
msg = _('"{0}" is an invalid {1}').format(values, __internal_type)
if __internal_attribute != "default":
msg = _('the attribute "{0}" in {1} causes the error: {2}').format(
__internal_attribute,
display_xmlfiles(__internal_files),
msg,
)
raise ConfigError(msg, prefix=prefix) from err
values = values if values != "" and values != "None" else None
if values is None and __default_value is not None:
return __default_value
return values
def variable_to_property(*, prop, value=undefined, when, inverse, **kwargs):
if value is undefined:
return None
if isinstance(value, PropertiesOptionError):
raise value from value
if inverse:
is_match = value != when
else:
is_match = value == when
return prop if is_match else None
@function_waiting_for_error
def jinja_to_property(prop, description, when, inverse, **kwargs):
value = func["jinja_to_function"](**kwargs)
if kwargs["__internal_type"] == "string":
value = value is not None
return func["variable_to_property"](
prop=prop, value=value, when=when, inverse=inverse
)
@function_waiting_for_error
def jinja_to_property_help(prop, description, **kwargs):
if kwargs["__internal_type"] == "string":
description = func["jinja_to_function"](**kwargs)
return (prop, f'"{prop}" ({description})')
@function_waiting_for_error
def valid_with_jinja(warnings_only=False, description=None, **kwargs):
global ValueWarning
value = func["jinja_to_function"](**kwargs)
if value:
if description is None:
description = value
if warnings_only:
raise ValueWarning(description)
else:
raise ValueError(description)
func["calc_value"] = rougail_calc_value
func["jinja_to_function"] = jinja_to_function
func["jinja_to_property"] = jinja_to_property
func["jinja_to_property_help"] = jinja_to_property_help
func["variable_to_property"] = variable_to_property
func["valid_with_jinja"] = valid_with_jinja
func["normalize_family"] = normalize_family
class ConvertDynOptionDescription(DynOptionDescription):
"""Identifier could be an integer, we should convert it in str
Identifier could also contain invalid character, so we should "normalize" it
"""
@staticmethod
def convert_identifier_to_path(identifier):
if identifier is None:
return identifier
if not isinstance(identifier, str):
identifier = str(identifier)
return normalize_family(identifier)
def impl_getname(
self,
identifier=None,
) -> str:
"""get name"""
name = super().impl_getname(None)
if identifier is None:
return name
path_identifier = self.convert_identifier_to_path(identifier)
if "{{ identifier }}" in name:
return name.replace("{{ identifier }}", path_identifier)
return name + path_identifier
def impl_get_display_name(
self,
subconfig,
with_quote: bool = False,
) -> str:
display = super().impl_get_display_name(subconfig, with_quote=with_quote)
if "{{ identifier }}" in display:
return display.replace(
"{{ identifier }}",
self.convert_identifier_to_path(
self.get_identifiers(subconfig, from_display_name=True)[-1]
),
)
return display
def name_could_conflict(self, dynchild, child):
return (
get_identifier_from_dynamic_family(
dynchild.impl_getname(), child.impl_getname()
)
is not None
)