669 lines
29 KiB
Python
669 lines
29 KiB
Python
"""
|
|
Silique (https://www.silique.fr)
|
|
Copyright (C) 2022-2026
|
|
|
|
distribued with GPL-2 or later license
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; either version 2 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
"""
|
|
|
|
from typing import List
|
|
|
|
from tiramisu import Calculation, Params, ParamValue, owners
|
|
from tiramisu.error import (
|
|
PropertiesOptionError,
|
|
AttributeOptionError,
|
|
LeadershipError,
|
|
ConfigError,
|
|
CancelParam,
|
|
display_list,
|
|
)
|
|
from .utils import undefined, get_properties_to_string
|
|
from .tiramisu import (
|
|
normalize_family,
|
|
CONVERT_OPTION,
|
|
get_identifier_from_dynamic_family,
|
|
)
|
|
from .error import DictConsistencyError
|
|
|
|
from .i18n import _
|
|
|
|
|
|
class UserData:
|
|
def __init__(self, config) -> None:
|
|
self.config = config
|
|
|
|
def user_data(
|
|
self,
|
|
user_data: List[dict],
|
|
*,
|
|
invalid_user_data_error: bool = False,
|
|
unknown_user_data_error: bool = False,
|
|
):
|
|
self.values = {}
|
|
self.errors = []
|
|
self.warnings = []
|
|
self.invalid_user_data_error = invalid_user_data_error
|
|
self.unknown_user_data_error = unknown_user_data_error
|
|
if self.invalid_user_data_error:
|
|
self.invalids = self.errors
|
|
else:
|
|
self.invalids = self.warnings
|
|
if self.unknown_user_data_error:
|
|
self.unknowns = self.errors
|
|
else:
|
|
self.unknowns = self.warnings
|
|
self.show_secrets = False
|
|
self._populate_values(user_data)
|
|
self._auto_configure_dynamics()
|
|
self._populate_config()
|
|
self.properties_to_string = get_properties_to_string()
|
|
self._populate_error_warnings()
|
|
return {
|
|
"errors": self.errors,
|
|
"warnings": self.warnings,
|
|
}
|
|
|
|
def _populate_values(self, user_data):
|
|
for datas in user_data:
|
|
options = datas.get("options", {})
|
|
source = datas["source"]
|
|
for name, data in datas.get("values", {}).items():
|
|
self.values.setdefault(name, []).append(
|
|
{
|
|
"source": source,
|
|
"values": data,
|
|
"options": options.copy(),
|
|
})
|
|
self.errors.extend(datas.get("errors", []))
|
|
self.warnings.extend(datas.get("warnings", []))
|
|
|
|
def _get_variable(self, config):
|
|
try:
|
|
for subconfig in config:
|
|
if subconfig.isoptiondescription():
|
|
yield from self._get_variable(subconfig)
|
|
else:
|
|
yield subconfig
|
|
except (ConfigError, ValueError) as err:
|
|
err.prefix = ""
|
|
if self.invalid_user_data_error:
|
|
msg = str(err)
|
|
else:
|
|
msg = _('{0}, it will be ignored').format(err)
|
|
self.invalids.append({msg: err.subconfig})
|
|
|
|
def _auto_configure_dynamics(self):
|
|
cache = {}
|
|
added = []
|
|
for path, data in list(self.values.items()):
|
|
try:
|
|
option = self.config.option(path)
|
|
option.name()
|
|
except (ConfigError, PropertiesOptionError):
|
|
pass
|
|
except AttributeError:
|
|
self._not_found_is_dynamic(self.config, path, cache, added, data[-1])
|
|
|
|
def _not_found_is_dynamic(self, config, path, cache, added, data):
|
|
"""if path is not found, check if parent is a dynamic family"""
|
|
current_path = ""
|
|
identifiers = []
|
|
# get parent
|
|
for name in path.split(".")[:-1]:
|
|
if current_path:
|
|
current_path += "."
|
|
current_path += name
|
|
if current_path in cache:
|
|
config, identifier = cache[current_path]
|
|
identifiers.append(identifier)
|
|
continue
|
|
tconfig = config.option(name)
|
|
try:
|
|
tconfig.group_type()
|
|
# object exists, so current config is the temporary config
|
|
config = tconfig
|
|
if config.isdynamic(only_self=True):
|
|
identifiers.append(config.identifiers()[-1])
|
|
except AttributeError:
|
|
# try to found the good dynamic family
|
|
try:
|
|
lists = config.list(uncalculated=True)
|
|
except PropertiesOptionError:
|
|
lists = []
|
|
for tconfig in lists:
|
|
if not tconfig.isdynamic(only_self=True):
|
|
# it's not a dynamic variable
|
|
continue
|
|
identifier = get_identifier_from_dynamic_family(
|
|
tconfig.name(uncalculated=True), name
|
|
)
|
|
if identifier == "{{ identifier }}":
|
|
continue
|
|
if identifier != normalize_family(identifier):
|
|
msg = _(
|
|
'cannot load variable path "{0}", the identifier "{1}" is not valid in {2}'
|
|
).format(path, identifier, data["source"])
|
|
self.invalids.append(msg)
|
|
continue
|
|
if identifier is None:
|
|
# it's a dynamic variable but doesn't match the current name
|
|
continue
|
|
# get the variable associate to this dynamic family
|
|
dynamic_variable = tconfig.information.get(
|
|
"dynamic_variable",
|
|
None,
|
|
)
|
|
if not dynamic_variable:
|
|
# it's the good dynamic variable but it's not linked to a variable
|
|
# so cannot change the variable
|
|
continue
|
|
option_type = self.config.option(dynamic_variable).information.get(
|
|
"type"
|
|
)
|
|
dyn_options_values = (
|
|
self.config.option(dynamic_variable).get().impl_getdefault()
|
|
)
|
|
if "{{ identifier }}" in dynamic_variable:
|
|
for s in identifiers:
|
|
dynamic_variable = dynamic_variable.replace(
|
|
"{{ identifier }}", str(s), 1
|
|
)
|
|
# do not add values in variable if has already a value
|
|
if dynamic_variable not in self.values and not dyn_options_values:
|
|
self.values[dynamic_variable] = [{"values": []}]
|
|
added.append(dynamic_variable)
|
|
elif dynamic_variable not in added:
|
|
continue
|
|
config = tconfig
|
|
identifiers.append(identifier)
|
|
typ = CONVERT_OPTION.get(option_type, {}).get("func")
|
|
if typ:
|
|
identifier = typ(identifier)
|
|
if identifier not in self.values[dynamic_variable][-1]["values"]:
|
|
self.values[dynamic_variable][-1]["values"].append(identifier)
|
|
cache[current_path] = config, identifier
|
|
break
|
|
|
|
def convert_value(self, path, option, options, value):
|
|
# converted value
|
|
needs_convert = options.get("needs_convert", False)
|
|
if option.ismulti():
|
|
if options.get("multi_separator") and not isinstance(value, list):
|
|
value = value.split(options["multi_separator"])
|
|
self.values[path][-1]["values"] = value
|
|
if option.issubmulti():
|
|
value = [[val] for val in value]
|
|
if option.issubmulti():
|
|
for idx, val in enumerate(value):
|
|
if isinstance(val, list):
|
|
value[idx] = [convert_value(option, v, needs_convert) for v in val]
|
|
elif isinstance(value, list):
|
|
value = [convert_value(option, val, needs_convert) for val in value]
|
|
if needs_convert:
|
|
self.values[path][-1]["values"] = value
|
|
self.values[path][-1]["options"]["needs_convert"] = needs_convert
|
|
elif not isinstance(value, list):
|
|
value = convert_value(option, value, needs_convert)
|
|
return value
|
|
|
|
def _populate_config(self):
|
|
dynamics_variable = []
|
|
while self.values:
|
|
value_is_set = False
|
|
for option in self._get_variable(self.config):
|
|
values_path = path = option.path()
|
|
if path not in self.values:
|
|
if path in dynamics_variable or not option.isdynamic():
|
|
continue
|
|
values_path = option.path(uncalculated=True)
|
|
if values_path not in self.values:
|
|
continue
|
|
if option.type() == "password":
|
|
one_is_in_error = False
|
|
for values in self.values[values_path]:
|
|
if values.get("options", {}).get("allow_secrets_variables", True) is False:
|
|
one_is_in_error = True
|
|
self.errors.append({
|
|
_(
|
|
'the variable contains secrets and should not be defined in {0}'
|
|
).format(values["source"]): option._subconfig}
|
|
)
|
|
if one_is_in_error:
|
|
self.values.pop(values_path)
|
|
continue
|
|
values = self.values[values_path][-1]
|
|
options = values.get("options", {})
|
|
value = self.convert_value(
|
|
path, option, options, values["values"]
|
|
)
|
|
index = option.index()
|
|
if index is not None:
|
|
if isinstance(value, tuple):
|
|
values["values"] = []
|
|
# for i in range(len(option.parent().leader().value.get())):
|
|
for i in range(option.value.len()):
|
|
values["values"].append(value)
|
|
value = values["values"]
|
|
if not isinstance(value, list) or index >= len(value):
|
|
continue
|
|
value = value[index]
|
|
option_without_index = option.index(None)
|
|
else:
|
|
option_without_index = option
|
|
if option.isleader():
|
|
# set value for a leader, it began to remove all values!
|
|
len_leader = option.value.len()
|
|
if len_leader:
|
|
for idx in range(len_leader - 1, -1, -1):
|
|
option.value.pop(idx)
|
|
if value is undefined:
|
|
continue
|
|
try:
|
|
self.set_value(option, value, options, index)
|
|
value_is_set = True
|
|
except Exception as err:
|
|
pass
|
|
# if path != option.path():
|
|
# self.values[option.path()] = self.values.pop(values_path)
|
|
else:
|
|
# value is correctly set, remove variable to the set
|
|
if index is not None:
|
|
# if it's a follower waiting for all followers are sets
|
|
values["values"][index] = undefined
|
|
for tmp_value in values["values"]:
|
|
if tmp_value != undefined:
|
|
break
|
|
else:
|
|
if path in self.values:
|
|
self.values.pop(path)
|
|
else:
|
|
dynamics_variable.append(path)
|
|
elif path in self.values:
|
|
self.values.pop(path)
|
|
else:
|
|
dynamics_variable.append(path)
|
|
if not value_is_set:
|
|
break
|
|
|
|
def _display_value(self, option, value):
|
|
if not self.show_secrets and option.type() == "password":
|
|
if not isinstance(value, list):
|
|
value = "*" * 10
|
|
else:
|
|
value = ["*" * 10 for val in value]
|
|
if isinstance(value, list):
|
|
value = display_list(value, add_quote=True)
|
|
else:
|
|
value = '"' + str(value) + '"'
|
|
return value
|
|
|
|
def _populate_error_warnings(self):
|
|
# we don't find variable, apply value just to get error or warning messages
|
|
for path, full_options in self.values.items():
|
|
options = full_options[-1]
|
|
value = options["values"]
|
|
if options.get("secret_manager"):
|
|
option = self.config.forcepermissive.option(path)
|
|
else:
|
|
option = self.config.option(path)
|
|
try:
|
|
if option.isoptiondescription():
|
|
if value:
|
|
if self.invalid_user_data_error:
|
|
msg = _(
|
|
'it\'s a family so we cannot set the value {0}, it has been loading from {1}'
|
|
)
|
|
else:
|
|
msg = _(
|
|
'it\'s a family so we cannot set the value {0}, it will be ignored when loading from {1}'
|
|
)
|
|
self.invalids.append({msg.format(
|
|
self._display_value(option, value),
|
|
options["source"],
|
|
): option._subconfig}
|
|
)
|
|
continue
|
|
if option.issymlinkoption():
|
|
err = _('it\'s a symlink option so we cannot set the value {0}').format(self._display_value(option, value))
|
|
if self.invalid_user_data_error:
|
|
msg = _('{0}, it has been loading from {1}').format(err, options["source"])
|
|
else:
|
|
msg = _('{0}, it will be ignored when loading from {1}').format(err, options["source"])
|
|
self.unknowns.append({msg: option._subconfig})
|
|
continue
|
|
except ConfigError as err:
|
|
self.invalids.append({
|
|
_("{0}, it has been loaded from {1}").format(err, options["source"]): option._subconfig}
|
|
)
|
|
continue
|
|
except PropertiesOptionError as err:
|
|
self.unknowns.append({
|
|
_("{0}, it has been loaded from {1}").format(err, options["source"]): option._subconfig}
|
|
)
|
|
continue
|
|
|
|
except AttributeOptionError as err:
|
|
if err.code == "option-not-found":
|
|
err_path = err.path
|
|
if "." not in err_path:
|
|
subconfig = None
|
|
child_name = err_path
|
|
else:
|
|
parent_path, child_name = err_path.rsplit('.', 1)
|
|
subconfig = self.config.option(parent_path)
|
|
subconfig._set_subconfig()
|
|
err_msg = _(
|
|
'variable or family "{0}" does not exist so cannot load "{1}"'
|
|
).format(child_name, path)
|
|
if self.unknown_user_data_error:
|
|
msg = _(
|
|
'{0}, it has been loading from {1}'
|
|
)
|
|
else:
|
|
msg = _(
|
|
'{0}, it will be ignored when loading from {1}'
|
|
)
|
|
msg = msg.format(err_msg, options["source"])
|
|
if subconfig is not None:
|
|
msg = {msg: subconfig._subconfig}
|
|
self.unknowns.append(msg)
|
|
elif err.code == "option-dynamic":
|
|
if self.invalid_user_data_error:
|
|
msg = _(
|
|
'"{0}" is the name of a dynamic family, it has been loading from {1}'
|
|
)
|
|
else:
|
|
msg = _(
|
|
'"{0}" is the name of a dynamic family, it will be ignored when loading from {1}'
|
|
)
|
|
self.invalids.append({msg.format(option.description(with_quote=True), options["source"]): option._subconfig})
|
|
else:
|
|
self.invalids.append(
|
|
_("{0} loaded from {1}").format(err, options["source"])
|
|
)
|
|
continue
|
|
value = self.convert_value(
|
|
path, option, self.values[path][-1].get("options", {}), value
|
|
)
|
|
if option.isfollower():
|
|
if not isinstance(value, tuple):
|
|
indexes = range(len(value))
|
|
else:
|
|
try:
|
|
indexes = range(len(option.parent().leader().value.get()))
|
|
except:
|
|
continue
|
|
values = value
|
|
else:
|
|
indexes = [None]
|
|
for index in indexes:
|
|
try:
|
|
if option.isfollower():
|
|
if not isinstance(value, tuple):
|
|
value = values[index]
|
|
if value is undefined or isinstance(value, CancelParam):
|
|
continue
|
|
index_ = index
|
|
else:
|
|
index_ = None
|
|
if value is undefined:
|
|
continue
|
|
self.set_value(option, value, options.get("options", {}), index_)
|
|
except PropertiesOptionError as err:
|
|
if err.code == "property-error":
|
|
properties = display_list(
|
|
[_(prop) for prop in err.proptype], add_quote=False
|
|
)
|
|
err_path = err.subconfig.path
|
|
err_description = err.subconfig.option.impl_get_display_name(err.subconfig, with_quote=True)
|
|
display_name = option.description(with_quote=True)
|
|
if index is not None:
|
|
if path == err_path:
|
|
if self.unknown_user_data_error:
|
|
msg = _(
|
|
'variable {0} at index "{1}" is {2}, it has been loading from {3}'
|
|
)
|
|
else:
|
|
msg = _(
|
|
'variable {0} at index "{1}" is {2}, it will be ignored when loading from {3}'
|
|
)
|
|
self.unknowns.append({
|
|
msg.format(
|
|
display_name,
|
|
index,
|
|
properties,
|
|
options["source"],
|
|
): option._subconfig}
|
|
)
|
|
else:
|
|
if self.unknown_user_data_error:
|
|
msg = _(
|
|
'family {0} is {1}, {2} at index "{3}", it has been loading from {4}'
|
|
)
|
|
else:
|
|
msg = _(
|
|
'family {0} is {1}, {2} at index "{3}", it will be ignored when loading from {4}'
|
|
)
|
|
self.unknowns.append({
|
|
msg.format(
|
|
err_description,
|
|
properties,
|
|
display_name,
|
|
index,
|
|
options["source"],
|
|
): option._subconfig}
|
|
)
|
|
else:
|
|
if path == err_path:
|
|
if self.unknown_user_data_error:
|
|
msg = _(
|
|
"variable has propery {0}, it has been loading from {1}"
|
|
)
|
|
else:
|
|
msg = _(
|
|
"variable has property {0}, it will be ignored when loading from {1}"
|
|
)
|
|
self.unknowns.append({
|
|
msg.format(
|
|
properties, options["source"]
|
|
): option._subconfig}
|
|
)
|
|
else:
|
|
if not options.get("options", {}).get("secret_manager", False):
|
|
if self.unknown_user_data_error:
|
|
msg = _(
|
|
"family {0} has property {1}, so cannot access to {2}, it has been loading from {3}"
|
|
)
|
|
else:
|
|
msg = _(
|
|
"family {0} has property {1}, so cannot access to {2}, it will be ignored when loading from {3}"
|
|
)
|
|
self.unknowns.append({
|
|
msg.format(
|
|
err_description,
|
|
properties,
|
|
display_name,
|
|
options["source"],
|
|
): option._subconfig}
|
|
)
|
|
else:
|
|
if self.unknown_user_data_error:
|
|
msg = _(
|
|
"{0}, it has been loading from {1}"
|
|
)
|
|
else:
|
|
msg = _(
|
|
"{0}, it will be ignored when loading from {1}"
|
|
)
|
|
self.unknowns.append({
|
|
msg.format(err, options["source"]): option._subconfig}
|
|
)
|
|
except LeadershipError as err:
|
|
if self.unknown_user_data_error:
|
|
msg = _(
|
|
"{0}, it has been loading from {1}"
|
|
)
|
|
else:
|
|
msg = _(
|
|
"{0}, it will be ignored when loading from {1}"
|
|
)
|
|
self.unknowns.append({
|
|
msg.format(err, options["source"]): option._subconfig}
|
|
)
|
|
except ConfigError as err:
|
|
err.prefix = ""
|
|
if self.invalid_user_data_error:
|
|
msg = _('{0}, it has been loading from {1}').format(err, options["source"])
|
|
else:
|
|
msg = _('{0}, it will be ignored when loading from {1}').format(err, options["source"])
|
|
self.invalids.append({msg: option._subconfig})
|
|
except ValueError as err:
|
|
err.prefix = ""
|
|
type_ = option.type(translation=True)
|
|
msg = _('the value {0} is an invalid {1}, {2}').format(
|
|
self._display_value(option, value),
|
|
type_,
|
|
err,
|
|
)
|
|
if self.invalid_user_data_error:
|
|
msg += _(', it has been loading from {0}').format(options["source"])
|
|
else:
|
|
msg += _(', it will be ignored when loading from {0}').format(options["source"])
|
|
self.invalids.append({msg: option._subconfig})
|
|
except AttributeOptionError as err:
|
|
err.prefix = ""
|
|
if err.code == "option-dynamic":
|
|
continue
|
|
raise err from err
|
|
|
|
def set_value(self, option, value, options, index):
|
|
is_secret_manager = options.get("secret_manager", False)
|
|
option_without_index = option
|
|
if is_secret_manager and isinstance(value, tuple):
|
|
# it's a function
|
|
params = tuple([ParamValue(val) for val in value[1:]])
|
|
option.information.set('secret_manager', True)
|
|
if index is not None:
|
|
option = option.forcepermissive.index(index)
|
|
value = Calculation(value[0], Params(params, kwargs={"option": ParamValue(option)}))
|
|
option = option.forcepermissive
|
|
add_validation = True
|
|
else:
|
|
if index is not None:
|
|
option = option.index(index)
|
|
add_validation = False
|
|
if isinstance(value, list) and undefined in value:
|
|
val = []
|
|
for v in value:
|
|
if v is undefined:
|
|
val.append(None)
|
|
else:
|
|
val.append(v)
|
|
value = val
|
|
option.value.set(value)
|
|
if add_validation:
|
|
option.property.add("validator")
|
|
path = option.path()
|
|
if "source" in self.values[path][-1]:
|
|
if option.isfollower():
|
|
key = f"loaded_from_{index}"
|
|
else:
|
|
key = "loaded_from"
|
|
value = _("loaded from {0}").format(
|
|
self.values[path][-1]["source"]
|
|
)
|
|
if options.get("secret_manager"):
|
|
# FIXME (true_config ???)
|
|
default = option.value.default()
|
|
if option.isleader():
|
|
default = default[0]
|
|
value = _('{0} (the search key is "{1}")').format(value, default)
|
|
option_without_index.information.set(
|
|
key,
|
|
value,
|
|
)
|
|
|
|
|
|
def convert_value(option, value, needs_convert):
|
|
if value == "":
|
|
return None
|
|
option_type = option.information.get("type")
|
|
if option_type == "port":
|
|
func = CONVERT_OPTION[option_type]["func"]
|
|
try:
|
|
return func(value)
|
|
except:
|
|
return value
|
|
if option_type == "choice":
|
|
try:
|
|
choices = option.value.list()
|
|
if value not in choices and isinstance(value, str):
|
|
# FIXME add other tests (boolean, float, ...)
|
|
if value.isnumeric() and int(value) in choices:
|
|
value = int(value)
|
|
except:
|
|
pass
|
|
func = CONVERT_OPTION.get(option_type, {}).get("func")
|
|
if func:
|
|
try:
|
|
return func(value)
|
|
except:
|
|
pass
|
|
return value
|
|
|
|
|
|
def mandatories(config, errors: list) -> None:
|
|
try:
|
|
mandatories = config.value.mandatory()
|
|
except (ConfigError, PropertiesOptionError, ValueError) as err:
|
|
try:
|
|
subconfig = err.subconfig
|
|
except AttributeError:
|
|
subconfig = None
|
|
if subconfig:
|
|
err.prefix = ""
|
|
errors.append({str(err): subconfig})
|
|
else:
|
|
errors.append(str(err))
|
|
else:
|
|
for option in mandatories:
|
|
_populate_mandatory(option, errors)
|
|
|
|
|
|
def _populate_mandatory(option, errors: list) -> None:
|
|
try:
|
|
option.value.get()
|
|
except PropertiesOptionError as err:
|
|
index = err.subconfig.index
|
|
if "empty" in err.proptype:
|
|
msg = _("null is not a valid value for a multi")
|
|
elif "mandatory" in err.proptype:
|
|
if index is None:
|
|
msg = _("mandatory variable but has no value")
|
|
else:
|
|
msg = _('mandatory variable at index "{0}" but has no value').format(index)
|
|
else:
|
|
if index is None:
|
|
msg = _("mandatory variable but is inaccessible and has no value or has null in value")
|
|
else:
|
|
msg = _('mandatory variable at index "{0}" but is inaccessible and has no value or has null in value').format(index)
|
|
else:
|
|
proptype = option.value.mandatory(return_type=True)
|
|
if proptype == "empty":
|
|
msg = _("null is not a valid value for a multi")
|
|
elif proptype == "mandatory":
|
|
msg = _("mandatory variable but has no value")
|
|
errors.append({msg: option._subconfig})
|