rougail/src/rougail/user_data.py

534 lines
24 KiB
Python
Raw Normal View History

2024-11-28 08:30:47 +01:00
"""
Silique (https://www.silique.fr)
2026-01-03 16:54:12 +01:00
Copyright (C) 2022-2026
2024-11-28 08:30:47 +01:00
distribued with GPL-2 or later license
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
2025-01-02 21:06:13 +01:00
2024-11-28 08:30:47 +01:00
from typing import List
2025-12-21 22:20:39 +01:00
from tiramisu import Calculation, Params, ParamValue, owners
2025-03-19 11:43:04 +01:00
from tiramisu.error import (
PropertiesOptionError,
2025-05-12 08:45:39 +02:00
AttributeOptionError,
2025-03-19 11:43:04 +01:00
LeadershipError,
ConfigError,
CancelParam,
display_list,
2025-03-19 11:43:04 +01:00
)
from .utils import undefined, get_properties_to_string
from .tiramisu import (
normalize_family,
CONVERT_OPTION,
get_identifier_from_dynamic_family,
)
2025-04-09 09:01:48 +02:00
from .error import DictConsistencyError
2024-11-28 08:30:47 +01:00
2025-02-07 08:08:55 +01:00
from .i18n import _
2024-11-28 08:30:47 +01:00
2025-12-21 22:20:39 +01:00
class UserData:
2024-11-28 08:30:47 +01:00
def __init__(self, config) -> None:
self.config = config
2025-12-21 22:20:39 +01:00
def user_data(
self,
2025-12-21 22:20:39 +01:00
user_data: List[dict],
*,
2025-12-21 22:20:39 +01:00
invalid_user_data_error: bool = False,
unknown_user_data_error: bool = False,
):
2024-12-11 20:54:03 +01:00
self.values = {}
self.errors = []
self.warnings = []
2025-12-21 22:20:39 +01:00
self.invalid_user_data_error = invalid_user_data_error
if self.invalid_user_data_error:
self.invalids = self.errors
else:
self.invalids = self.warnings
2025-12-21 22:20:39 +01:00
if unknown_user_data_error:
self.unknowns = self.errors
2025-05-12 19:25:50 +02:00
else:
self.unknowns = self.warnings
self.show_secrets = False
2025-12-21 22:20:39 +01:00
self._populate_values(user_data)
2024-12-11 20:54:03 +01:00
self._auto_configure_dynamics()
self._populate_config()
self.properties_to_string = get_properties_to_string()
self._populate_error_warnings()
return {
"errors": self.errors,
"warnings": self.warnings,
}
2024-12-11 20:54:03 +01:00
2025-12-21 22:20:39 +01:00
def _populate_values(self, user_data):
for datas in user_data:
2024-11-28 08:30:47 +01:00
options = datas.get("options", {})
2025-02-07 08:08:55 +01:00
source = datas["source"]
2024-11-28 08:30:47 +01:00
for name, data in datas.get("values", {}).items():
2024-12-11 20:54:03 +01:00
self.values[name] = {
2025-02-07 08:08:55 +01:00
"source": source,
2024-11-28 08:30:47 +01:00
"values": data,
"options": options.copy(),
}
2024-12-11 20:54:03 +01:00
self.errors.extend(datas.get("errors", []))
self.warnings.extend(datas.get("warnings", []))
def _get_variable(self, config):
2025-12-23 20:42:10 +01:00
try:
for subconfig in config:
if subconfig.isoptiondescription():
yield from self._get_variable(subconfig)
else:
yield subconfig
except (ConfigError, ValueError) as err:
err.prefix = ""
if self.invalid_user_data_error:
msg = str(err)
2024-12-11 20:54:03 +01:00
else:
2025-12-23 20:42:10 +01:00
msg = _('{0}, it will be ignored').format(err)
self.invalids.append({msg: err.subconfig})
2024-12-11 20:54:03 +01:00
def _auto_configure_dynamics(self):
cache = {}
added = []
for path, data in list(self.values.items()):
try:
option = self.config.option(path)
option.name()
except (ConfigError, PropertiesOptionError):
pass
except AttributeError:
2025-04-09 09:01:48 +02:00
self._not_found_is_dynamic(self.config, path, cache, added, data)
2024-12-11 20:54:03 +01:00
2025-04-09 09:01:48 +02:00
def _not_found_is_dynamic(self, config, path, cache, added, data):
2025-01-02 21:06:13 +01:00
"""if path is not found, check if parent is a dynamic family"""
2024-12-11 20:54:03 +01:00
current_path = ""
identifiers = []
# get parent
for name in path.split(".")[:-1]:
if current_path:
current_path += "."
current_path += name
if current_path in cache:
config, identifier = cache[current_path]
identifiers.append(identifier)
continue
tconfig = config.option(name)
try:
tconfig.group_type()
# object exists, so current config is the temporary config
config = tconfig
if config.isdynamic(only_self=True):
identifiers.append(config.identifiers()[-1])
except AttributeError:
# try to found the good dynamic family
try:
lists = config.list(uncalculated=True)
except PropertiesOptionError:
lists = []
for tconfig in lists:
2024-12-11 20:54:03 +01:00
if not tconfig.isdynamic(only_self=True):
# it's not a dynamic variable
continue
identifier = get_identifier_from_dynamic_family(
2025-10-16 08:21:07 +02:00
tconfig.name(uncalculated=True), name
)
if identifier == "{{ identifier }}":
continue
2025-04-09 09:01:48 +02:00
if identifier != normalize_family(identifier):
2025-05-12 08:45:39 +02:00
msg = _(
'cannot load variable path "{0}", the identifier "{1}" is not valid in {2}'
).format(path, identifier, data["source"])
self.invalids.append(msg)
2025-04-09 09:01:48 +02:00
continue
2024-12-11 20:54:03 +01:00
if identifier is None:
# it's a dynamic variable but doesn't match the current name
continue
2025-04-09 09:01:48 +02:00
# get the variable associate to this dynamic family
2024-12-11 20:54:03 +01:00
dynamic_variable = tconfig.information.get(
"dynamic_variable",
None,
)
if not dynamic_variable:
# it's the good dynamic variable but it's not linked to a variable
# so cannot change the variable
continue
2025-01-02 21:06:13 +01:00
option_type = self.config.option(dynamic_variable).information.get(
"type"
)
dyn_options_values = (
self.config.option(dynamic_variable).get().impl_getdefault()
)
if "{{ identifier }}" in dynamic_variable:
2024-12-11 20:54:03 +01:00
for s in identifiers:
dynamic_variable = dynamic_variable.replace(
"{{ identifier }}", str(s), 1
)
2025-04-09 09:01:48 +02:00
# do not add values in variable if has already a value
2024-12-11 20:54:03 +01:00
if dynamic_variable not in self.values and not dyn_options_values:
self.values[dynamic_variable] = {"values": []}
added.append(dynamic_variable)
elif dynamic_variable not in added:
continue
config = tconfig
identifiers.append(identifier)
2025-01-02 21:06:13 +01:00
typ = CONVERT_OPTION.get(option_type, {}).get("func")
2024-12-11 20:54:03 +01:00
if typ:
identifier = typ(identifier)
2025-01-02 21:06:13 +01:00
if identifier not in self.values[dynamic_variable]["values"]:
self.values[dynamic_variable]["values"].append(identifier)
2024-12-11 20:54:03 +01:00
cache[current_path] = config, identifier
break
2025-05-12 08:45:39 +02:00
def convert_value(self, path, option, options, value):
# converted value
needs_convert = options.get("needs_convert", False)
if option.ismulti():
if options.get("multi_separator") and not isinstance(value, list):
value = value.split(options["multi_separator"])
self.values[path]["values"] = value
if option.issubmulti():
value = [[val] for val in value]
2025-11-21 08:32:33 +01:00
if option.issubmulti():
for idx, val in enumerate(value):
if isinstance(val, list):
value[idx] = [convert_value(option, v, needs_convert) for v in val]
elif isinstance(value, list):
value = [convert_value(option, val, needs_convert) for val in value]
2025-05-12 08:45:39 +02:00
if needs_convert:
self.values[path]["values"] = value
2025-11-21 08:32:33 +01:00
self.values[path]["options"]["needs_convert"] = needs_convert
elif not isinstance(value, list):
value = convert_value(option, value, needs_convert)
2025-05-12 08:45:39 +02:00
return value
2024-12-11 20:54:03 +01:00
def _populate_config(self):
dynamics_variable = []
2024-12-11 20:54:03 +01:00
while self.values:
2024-11-28 08:30:47 +01:00
value_is_set = False
for option in self._get_variable(self.config):
values_path = path = option.path()
2024-12-11 20:54:03 +01:00
if path not in self.values:
if path in dynamics_variable or not option.isdynamic():
continue
values_path = option.path(uncalculated=True)
if values_path not in self.values:
continue
options = self.values[values_path].get("options", {})
2025-03-19 11:43:04 +01:00
if (
options.get("allow_secrets_variables", True) is False
and option.type() == "password"
):
2025-12-21 22:20:39 +01:00
self.errors.append({
2025-03-19 11:43:04 +01:00
_(
2025-12-21 22:20:39 +01:00
'the variable contains secrets and should not be defined in {0}'
).format(self.values[values_path]["source"]): option._subconfig}
2025-03-19 11:43:04 +01:00
)
self.values.pop(path)
2025-02-07 08:08:55 +01:00
continue
2025-05-12 08:45:39 +02:00
value = self.convert_value(
path, option, options, self.values[values_path]["values"]
2025-05-12 08:45:39 +02:00
)
2024-11-28 08:30:47 +01:00
index = option.index()
if index is not None:
if isinstance(value, tuple):
self.values[values_path]["values"] = []
2025-11-06 21:33:24 +01:00
# for i in range(len(option.parent().leader().value.get())):
2025-10-07 21:11:42 +02:00
for i in range(option.value.len()):
self.values[values_path]["values"].append(value)
value = self.values[values_path]["values"]
2024-11-28 08:30:47 +01:00
if not isinstance(value, list) or index >= len(value):
continue
value = value[index]
2025-12-23 10:10:38 +01:00
option_without_index = option.index(None)
else:
option_without_index = option
2025-05-12 08:45:39 +02:00
if option.isleader():
# set value for a leader, it began to remove all values!
2025-10-07 21:11:42 +02:00
len_leader = option.value.len()
2025-05-12 08:45:39 +02:00
if len_leader:
for idx in range(len_leader - 1, -1, -1):
option.value.pop(idx)
2024-11-28 08:30:47 +01:00
try:
self.set_value(option, value, options)
2024-11-28 08:30:47 +01:00
value_is_set = True
except Exception as err:
if path != option.path():
self.values[option.path()] = self.values.pop(values_path)
else:
if "source" in self.values[values_path]:
option_without_index.information.set(
"loaded_from",
2025-11-06 21:33:24 +01:00
_("loaded from {0}").format(
self.values[values_path]["source"]
),
)
2024-11-28 08:30:47 +01:00
# value is correctly set, remove variable to the set
if index is not None:
# if it's a follower waiting for all followers are sets
self.values[values_path]["values"][index] = undefined
for tmp_value in self.values[values_path]["values"]:
if tmp_value != undefined:
break
else:
if path in self.values:
self.values.pop(path)
else:
dynamics_variable.append(path)
elif path in self.values:
2024-12-11 20:54:03 +01:00
self.values.pop(path)
else:
dynamics_variable.append(path)
2024-11-28 08:30:47 +01:00
if not value_is_set:
break
2024-12-11 20:54:03 +01:00
2025-05-12 08:45:39 +02:00
def _display_value(self, option, value):
if not self.show_secrets and option.type() == "password":
return "*" * 10
return value
2024-12-11 20:54:03 +01:00
def _populate_error_warnings(self):
2024-11-28 08:30:47 +01:00
# we don't find variable, apply value just to get error or warning messages
2025-05-12 08:45:39 +02:00
for path, options in self.values.items():
value = options["values"]
2025-11-06 21:33:24 +01:00
if options.get("secret_manager"):
option = self.config.forcepermissive.option(path)
else:
option = self.config.option(path)
2024-11-28 08:30:47 +01:00
try:
2025-01-02 21:06:13 +01:00
if option.isoptiondescription():
if value:
2025-12-21 22:20:39 +01:00
if self.invalid_user_data_error:
msg = _(
'is a family so we cannot set the value "{0}", it has been loading from {1}'
)
else:
msg = _(
'is a family so we cannot set the value "{0}", it will be ignored when loading from {1}'
)
self.invalids.append({msg.format(
2025-05-12 08:45:39 +02:00
self._display_value(option, value),
options["source"],
2025-12-21 22:20:39 +01:00
): option._subconfig}
2025-03-19 11:43:04 +01:00
)
2025-01-02 21:06:13 +01:00
continue
2025-12-23 20:42:10 +01:00
except ConfigError as err:
self.invalids.append({
_("{0}, it has been loaded from {1}").format(err, options["source"]): option._subconfig}
)
continue
except PropertiesOptionError as err:
2025-12-21 22:20:39 +01:00
self.unknowns.append({
_("{0}, it has been loaded from {1}").format(err, options["source"]): option._subconfig}
)
2025-12-21 22:20:39 +01:00
continue
2025-05-12 08:45:39 +02:00
except AttributeOptionError as err:
if err.code == "option-not-found":
2025-12-21 22:20:39 +01:00
if self.invalid_user_data_error:
msg = _(
'variable or family "{0}" does not exist, it has been loading from {1}'
)
else:
msg = _(
2025-05-12 08:45:39 +02:00
'variable or family "{0}" does not exist, it will be ignored when loading from {1}'
2025-12-21 22:20:39 +01:00
)
self.unknowns.append(msg.format(err.path, options["source"]))
2025-05-12 08:45:39 +02:00
elif err.code == "option-dynamic":
2025-12-21 22:20:39 +01:00
if self.invalid_user_data_error:
msg = _(
'"{0}" is the name of a dynamic family, it has been loading from {1}'
)
else:
msg = _(
'"{0}" is the name of a dynamic family, it will be ignored when loading from {1}'
)
self.invalids.append(
2025-12-21 22:20:39 +01:00
{msg.format(option.description(with_quote=True), options["source"]): option._subconfig}
2025-05-12 08:45:39 +02:00
)
else:
self.invalids.append(
2025-05-12 08:45:39 +02:00
_("{0} loaded from {1}").format(err, options["source"])
)
continue
value = self.convert_value(
path, option, self.values[path].get("options", {}), value
)
if option.isfollower():
indexes = range(len(value))
2025-05-12 19:25:50 +02:00
values = value
2025-05-12 08:45:39 +02:00
else:
indexes = [None]
for index in indexes:
try:
if option.isfollower():
2025-05-12 19:25:50 +02:00
value = values[index]
if value is undefined or isinstance(value, CancelParam):
2024-11-28 08:30:47 +01:00
continue
2025-12-23 10:10:38 +01:00
option.index(index).value.set(value)
2025-05-12 08:45:39 +02:00
else:
self.set_value(option, value, options.get("options", {}))
2025-05-12 08:45:39 +02:00
except PropertiesOptionError as err:
if err.code == "property-error":
2025-11-06 21:33:24 +01:00
properties = display_list(
[_(prop) for prop in err.proptype], add_quote=False
)
2025-12-21 22:20:39 +01:00
err_path = err.subconfig.path
2025-12-22 09:58:44 +01:00
err_description = err.subconfig.option.impl_get_display_name(err.subconfig, with_quote=True)
2025-05-12 08:45:39 +02:00
display_name = option.description(with_quote=True)
if index is not None:
if path == err_path:
2025-12-21 22:20:39 +01:00
if self.invalid_user_data_error:
msg = _(
'variable {0} at index "{1}" is {2}, it has been loading from {3}'
)
else:
msg = _(
2025-05-12 08:45:39 +02:00
'variable {0} at index "{1}" is {2}, it will be ignored when loading from {3}'
2025-12-21 22:20:39 +01:00
)
self.unknowns.append({
msg.format(
2025-05-12 08:45:39 +02:00
display_name,
index,
properties,
options["source"],
2025-12-21 22:20:39 +01:00
): option._subconfig}
2025-05-12 08:45:39 +02:00
)
else:
2025-12-21 22:20:39 +01:00
if self.invalid_user_data_error:
msg = _(
'family {0} is {1}, {2} at index "{3}", it has been loading from {4}'
)
else:
msg = _(
'family {0} is {1}, {2} at index "{3}", it will be ignored when loading from {4}'
)
self.unknowns.append({
msg.format(
err_description,
2025-05-12 08:45:39 +02:00
properties,
display_name,
index,
options["source"],
2025-12-21 22:20:39 +01:00
): option._subconfig}
2025-05-12 08:45:39 +02:00
)
else:
if path == err_path:
2025-12-21 22:20:39 +01:00
if self.invalid_user_data_error:
msg = _(
2025-12-22 19:05:05 +01:00
"variable has propery {0}, it has been loading from {1}"
2025-12-21 22:20:39 +01:00
)
else:
msg = _(
2025-12-22 19:05:05 +01:00
"variable has property {0}, it will be ignored when loading from {1}"
2025-05-12 08:45:39 +02:00
)
2025-12-21 22:20:39 +01:00
self.unknowns.append({
msg.format(
properties, options["source"]
): option._subconfig}
2025-05-12 08:45:39 +02:00
)
else:
2025-12-21 22:20:39 +01:00
if self.invalid_user_data_error:
msg = _(
2025-12-22 19:05:05 +01:00
"family {0} has property {1}, so cannot access to {2}, it has been loading from {3}"
2025-12-21 22:20:39 +01:00
)
else:
msg = _(
2025-12-22 19:05:05 +01:00
"family {0} has property {1}, so cannot access to {2}, it will be ignored when loading from {3}"
2025-12-21 22:20:39 +01:00
)
self.unknowns.append({
msg.format(
err_description,
2025-05-12 08:45:39 +02:00
properties,
display_name,
options["source"],
2025-12-21 22:20:39 +01:00
): option._subconfig}
2025-05-12 08:45:39 +02:00
)
else:
2025-12-21 22:20:39 +01:00
self.unknowns.append({
_("{0} in {1}").format(err, options["source"]): option._subconfig}
2025-05-12 08:45:39 +02:00
)
except LeadershipError as err:
2025-12-21 22:20:39 +01:00
self.unknowns.append({_("{0} in {1}").format(err, options["source"]): option._subconfig})
2025-12-22 19:05:05 +01:00
except ConfigError as err:
err.prefix = ""
if self.invalid_user_data_error:
msg = _('{0}, it has been loading from {1}').format(err, options["source"])
else:
msg = _('{0}, it will be ignored when loading from {1}').format(err, options["source"])
self.invalids.append({msg: option._subconfig})
2025-05-12 08:45:39 +02:00
except ValueError as err:
err.prefix = ""
2025-12-22 19:05:05 +01:00
type_ = option.type(translation=True)
msg = _('the value "{0}" is an invalid {1}, {2}').format(
self._display_value(option, value),
type_,
err,
2025-12-21 22:20:39 +01:00
)
2025-12-22 19:05:05 +01:00
if self.invalid_user_data_error:
msg += _(', it has been loading from {0}').format(options["source"])
2025-05-12 08:45:39 +02:00
else:
2025-12-22 19:05:05 +01:00
msg += _(', it will be ignored when loading from {0}').format(options["source"])
self.invalids.append({msg: option._subconfig})
except AttributeOptionError as err:
2025-12-23 10:10:38 +01:00
err.prefix = ""
if err.code == "option-dynamic":
continue
raise err from err
def set_value(self, option, value, options):
is_secret_manager = options.get("secret_manager", False)
if is_secret_manager and isinstance(value, tuple):
# it's a function
2025-12-21 22:20:39 +01:00
params = tuple([ParamValue(val) for val in value[1:]])
value = Calculation(value[0], Params(params, kwargs={"option": ParamValue(option)}))
2025-12-23 10:10:38 +01:00
option.forcepermissive()
add_validation = True
else:
add_validation = False
option.value.set(value)
2025-12-23 10:10:38 +01:00
if add_validation:
option.property.add("validator")
2024-11-28 08:30:47 +01:00
2025-11-21 08:32:33 +01:00
def convert_value(option, value, needs_convert):
2024-11-28 08:30:47 +01:00
if value == "":
return None
option_type = option.information.get("type")
2025-11-21 08:32:33 +01:00
if option_type == "port":
func = CONVERT_OPTION[option_type]["func"]
try:
return func(value)
except:
return value
2025-01-02 21:06:13 +01:00
if option_type == "choice":
try:
choices = option.value.list()
if value not in choices and isinstance(value, str):
# FIXME add other tests (boolean, float, ...)
if value.isnumeric() and int(value) in choices:
value = int(value)
except:
pass
2024-11-28 08:30:47 +01:00
func = CONVERT_OPTION.get(option_type, {}).get("func")
if func:
2025-05-12 08:45:39 +02:00
try:
return func(value)
except:
pass
2024-11-28 08:30:47 +01:00
return value