""" Silique (https://www.silique.fr) Copyright (C) 2022-2025 distribued with GPL-2 or later license This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ from typing import List from re import findall from rougail.utils import normalize_family, undefined from tiramisu import Calculation from tiramisu.error import ( PropertiesOptionError, AttributeOptionError, LeadershipError, ConfigError, CancelParam, ) from .error import DictConsistencyError from .i18n import _ from .object_model import CONVERT_OPTION class UserDatas: def __init__(self, config) -> None: self.config = config def user_datas(self, user_datas: List[dict], *, return_values_not_error=False, user_datas_type: str="user_datas", ): self.values = {} self.errors = [] self.warnings = [] self.show_secrets = False if user_datas_type == "user_datas": self._populate_values(user_datas) else: self.values = user_datas self._auto_configure_dynamics() self._populate_config() if return_values_not_error: return self.values else: self._populate_error_warnings() return { "errors": self.errors, "warnings": self.warnings, } def _populate_values(self, user_datas): for datas in user_datas: options = datas.get("options", {}) source = datas["source"] for name, data in datas.get("values", {}).items(): self.values[name] = { "source": source, "values": data, "options": options.copy(), } self.errors.extend(datas.get("errors", [])) self.warnings.extend(datas.get("warnings", [])) def _get_variable(self, config): for subconfig in config: if subconfig.isoptiondescription(): yield from self._get_variable(subconfig) else: yield subconfig def _auto_configure_dynamics(self): cache = {} added = [] for path, data in list(self.values.items()): try: option = self.config.option(path) option.name() except (ConfigError, PropertiesOptionError): pass except AttributeError: self._not_found_is_dynamic(self.config, path, cache, added, data) def _not_found_is_dynamic(self, config, path, cache, added, data): """if path is not found, check if parent is a dynamic family""" current_path = "" identifiers = [] # get parent for name in path.split(".")[:-1]: if current_path: current_path += "." current_path += name if current_path in cache: config, identifier = cache[current_path] identifiers.append(identifier) continue tconfig = config.option(name) try: tconfig.group_type() # object exists, so current config is the temporary config config = tconfig if config.isdynamic(only_self=True): identifiers.append(config.identifiers()[-1]) except AttributeError: # try to found the good dynamic family for tconfig in config.list(uncalculated=True): if not tconfig.isdynamic(only_self=True): # it's not a dynamic variable continue identifier = self._get_identifier(tconfig.name(), name) if identifier != normalize_family(identifier): msg = _( 'cannot load variable path "{0}", the identifier "{1}" is not valid in {2}' ).format(path, identifier, data["source"]) self.warnings.append(msg) continue if identifier is None: # it's a dynamic variable but doesn't match the current name continue # get the variable associate to this dynamic family dynamic_variable = tconfig.information.get( "dynamic_variable", None, ) if not dynamic_variable: # it's the good dynamic variable but it's not linked to a variable # so cannot change the variable continue option_type = self.config.option(dynamic_variable).information.get( "type" ) dyn_options_values = ( self.config.option(dynamic_variable).get().impl_getdefault() ) if "{{ identifier }}" in dynamic_variable: for s in identifiers: dynamic_variable = dynamic_variable.replace( "{{ identifier }}", str(s), 1 ) # do not add values in variable if has already a value if dynamic_variable not in self.values and not dyn_options_values: self.values[dynamic_variable] = {"values": []} added.append(dynamic_variable) elif dynamic_variable not in added: continue config = tconfig identifiers.append(identifier) typ = CONVERT_OPTION.get(option_type, {}).get("func") if typ: identifier = typ(identifier) if identifier not in self.values[dynamic_variable]["values"]: self.values[dynamic_variable]["values"].append(identifier) cache[current_path] = config, identifier break def convert_value(self, path, option, options, value): # converted value needs_convert = options.get("needs_convert", False) if option.ismulti(): if options.get("multi_separator") and not isinstance(value, list): value = value.split(options["multi_separator"]) self.values[path]["values"] = value if option.issubmulti(): value = [[val] for val in value] if needs_convert: if option.issubmulti(): for idx, val in enumerate(value): value[idx] = [convert_value(option, v) for v in val] else: value = [convert_value(option, val) for val in value] self.values[path]["values"] = value self.values[path]["options"]["needs_convert"] = False elif needs_convert: value = convert_value(option, value) return value def _populate_config(self): while self.values: value_is_set = False for option in self._get_variable(self.config): path = option.path() if path not in self.values: continue options = self.values[path].get("options", {}) if ( options.get("allow_secrets_variables", True) is False and option.type() == "password" ): self.errors.append( _( 'the variable "{0}" contains secrets and should not be defined in {1}' ).format(path, self.values[path]["source"]) ) continue value = self.convert_value( path, option, options, self.values[path]["values"] ) index = option.index() if index is not None: if not isinstance(value, list) or index >= len(value): continue value = value[index] option_without_index = self.config.option(path) else: option_without_index = option if option.isleader(): len_leader = len(option.value.get()) if len_leader: for idx in range(len_leader - 1, -1, -1): option.value.pop(idx) try: option.value.set(value) value_is_set = True except Exception: if path != option.path(): self.values[option.path()] = self.values.pop(path) else: if "source" in self.values[path]: option_without_index.information.set( "loaded_from", _("loaded from {0}").format(self.values[path]["source"]) ) # value is correctly set, remove variable to the set if index is not None: # if it's a follower waiting for all followers are sets self.values[path]["values"][index] = undefined for tmp_value in self.values[path]["values"]: if tmp_value != undefined: break else: self.values.pop(path) else: self.values.pop(path) if not value_is_set: break def _get_identifier(self, true_name, name) -> str: if true_name == "{{ identifier }}": return name regexp = true_name.replace("{{ identifier }}", "(.*)") finded = findall(regexp, name) if len(finded) != 1 or not finded[0]: return None return finded[0] def _display_value(self, option, value): if not self.show_secrets and option.type() == "password": return "*" * 10 return value def _populate_error_warnings(self): # we don't find variable, apply value just to get error or warning messages for path, options in self.values.items(): value = options["values"] option = self.config.option(path) try: if option.isoptiondescription(): if value: self.warnings.append( _( 'cannot set the value "{0}" to the family {1}, it will be ignored when loading from {2}' ).format( self._display_value(option, value), option.description(with_quote=True), options["source"], ) ) continue except AttributeOptionError as err: if err.code == "option-not-found": self.warnings.append( _( 'variable or family "{0}" does not exist, it will be ignored when loading from {1}' ).format(err.path, options["source"]) ) elif err.code == "option-dynamic": self.warnings.append( _( '"{0}" is the name of a dynamic family, it will be ignored when loading from {1}' ).format(option.description(with_quote=True), options["source"]) ) else: self.warnings.append( _("{0} loaded from {1}").format(err, options["source"]) ) continue value = self.convert_value( path, option, self.values[path].get("options", {}), value ) if option.isfollower(): indexes = range(len(value)) values = value else: indexes = [None] for index in indexes: try: if option.isfollower(): value = values[index] if value is undefined or isinstance(value, CancelParam): continue self.config.option(path, index).value.set(value) else: option.value.set(value) except PropertiesOptionError as err: if err.code == "property-error": properties = err.display_properties( force_property=True, add_quote=False ) err_path = err._subconfig.path display_name = option.description(with_quote=True) if index is not None: if path == err_path: self.warnings.append( _( 'variable {0} at index "{1}" is {2}, it will be ignored when loading from {3}' ).format( display_name, index, properties, options["source"], ) ) else: self.warnings.append( _( 'family {0} is {1}, {2} at index "{3}" will be ignored when loading from {4}' ).format( err._name, properties, display_name, index, options["source"], ) ) else: if path == err_path: self.warnings.append( _( "variable {0} is {1}, it will be ignored when loading from {2}" ).format( display_name, properties, options["source"] ) ) else: self.warnings.append( _( "family {0} is {1}, {2} will be ignored when loading from {3}" ).format( err._name, properties, display_name, options["source"], ) ) else: self.warnings.append( _("{0} in {1}").format(err, options["source"]) ) except LeadershipError as err: self.warnings.append(_("{0} in {1}").format(err, options["source"])) except ValueError as err: err.prefix = "" if index is not None: self.warnings.append( _( 'the value "{0}" is invalid for {1} at index "{2}", {3}, it will be ignored when loading from {4}' ).format( self._display_value(option, value), option.description(with_quote=True), index, err, options["source"], ) ) else: self.warnings.append( _( 'the value "{0}" is invalid for {1}, {2}, it will be ignored when loading from {3}' ).format( self._display_value(option, value), option.description(with_quote=True), err, options["source"], ) ) def convert_value(option, value): if value == "": return None option_type = option.information.get("type") if option_type == "choice": choices = option.value.list() if value not in choices and isinstance(value, str): # FIXME add other tests (boolean, float, ...) if value.isnumeric() and int(value) in choices: value = int(value) func = CONVERT_OPTION.get(option_type, {}).get("func") if func: try: return func(value) except: pass return value