diff --git a/src/rougail/user_datas.py b/src/rougail/user_datas.py new file mode 100644 index 000000000..ea0617c81 --- /dev/null +++ b/src/rougail/user_datas.py @@ -0,0 +1,253 @@ +""" +Silique (https://www.silique.fr) +Copyright (C) 2024 + +distribued with GPL-2 or later license + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 2 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +""" +from typing import List +from re import findall + +from tiramisu import undefined +from tiramisu.error import PropertiesOptionError, LeadershipError, ConfigError + +from .object_model import CONVERT_OPTION + + +class UserDatas: + def __init__(self, config) -> None: + self.config = config + + def user_datas(self, user_datas: List[dict]): + values = {} + errors = [] + warnings = [] + for datas in user_datas: + options = datas.get("options", {}) + for name, data in datas.get("values", {}).items(): + values[name] = { + "values": data, + "options": options.copy(), + } + errors.extend(datas.get("errors", [])) + warnings.extend(datas.get("warnings", [])) + self._auto_configure_dynamics(values) + while values: + value_is_set = False + for option in self._get_variable(self.config): + path = option.path() + if path not in values: + continue + options = values[path].get("options", {}) + value = values[path]["values"] + + # converted value + if option.ismulti(): + if options.get("multi_separator") and not isinstance(value, list): + value = value.split(options["multi_separator"]) + values[path]["values"] = value + if option.issubmulti(): + value = [[val] for val in value] + if options.get("needs_convert"): + if option.issubmulti(): + for idx, val in enumerate(value): + value[idx] = [convert_value(option, v) for v in val] + else: + value = [convert_value(option, val) for val in value] + values[path]["values"] = value + values[path]["options"]["needs_convert"] = False + elif options.get("needs_convert"): + value = convert_value(option, value) + index = option.index() + if index is not None: + if not isinstance(value, list) or index >= len(value): + continue + value = value[index] + try: + option.value.set(value) + value_is_set = True + # value is correctly set, remove variable to the set + if index is not None: + # if it's a follower waiting for all followers are sets + values[path]["values"][index] = undefined + if set(values[path]["values"]) == {undefined}: + values.pop(path) + else: + values.pop(path) + except Exception: + if path != option.path(): + values[option.path()] = values.pop(path) + if not value_is_set: + break + # we don't find variable, apply value just to get error or warning messages + for path, data in values.items(): + try: + option = self.config.option(path) + value = data["values"] + if option.isfollower(): + for index, val in enumerate(value): + if val is undefined: + continue + self.config.option(path, index).value.set(val) + else: + option.value.set(value) + except AttributeError as err: + errors.append(str(err)) + except (ValueError, LeadershipError) as err: + errors.append(str(err)) + except PropertiesOptionError as err: + warnings.append(str(err)) + return { + "errors": errors, + "warnings": warnings, + } + + def _get_variable(self, config): + for subconfig in config: + if subconfig.isoptiondescription(): + yield from self._get_variable(subconfig) + else: + yield subconfig + + def _auto_configure_dynamics( + self, + values, + ): + cache = {} + added = [] + for path, data in list(values.items()): + value = data["values"] + try: + option = self.config.option(path) + option.name() + except (ConfigError, PropertiesOptionError): + pass + except AttributeError: + config = self.config + current_path = "" + for name in path.split(".")[:-1]: + if current_path: + current_path += "." + current_path += name + if current_path in cache: + config, identifier = cache[current_path] + else: + tconfig = config.option(name) + try: + tconfig.group_type() + config = tconfig + except AttributeError: + for tconfig in config.list(uncalculated=True): + if tconfig.isdynamic(only_self=True): + identifier = self._get_identifier( + tconfig.name(), name + ) + if identifier is None: + continue + dynamic_variable = tconfig.information.get( + "dynamic_variable", + None, + ) + if not dynamic_variable: + continue + option_type = self.config.option( + dynamic_variable + ).information.get("type") + identifiers = tconfig.identifiers() + if identifiers: + for s in identifiers: + dynamic_variable = dynamic_variable.replace( + "{{ identifier }}", str(s), 1 + ) + if dynamic_variable not in values and not self.config.option(dynamic_variable).get().impl_getdefault(): + values[dynamic_variable] = {"values": []} + added.append(dynamic_variable) + elif dynamic_variable not in added: + continue + config = tconfig + typ = CONVERT_OPTION.get(option_type, {}).get( + "func" + ) + if typ: + identifier = typ(identifier) + if ( + identifier + not in values[dynamic_variable]["values"] + ): + values[dynamic_variable]["values"].append( + identifier + ) + cache[current_path] = config, identifier + break + else: + if option.isdynamic(): + parent_option = self.config.option(path.rsplit(".", 1)[0]) + identifier = self._get_identifier( + parent_option.name(uncalculated=True), + parent_option.name(), + ) + dynamic_variable = None + while True: + dynamic_variable = parent_option.information.get( + "dynamic_variable", + None, + ) + if dynamic_variable: + break + parent_option = self.config.option( + parent_option.path().rsplit(".", 1)[0] + ) + if "." not in parent_option.path(): + parent_option = None + break + if not parent_option: + continue + identifiers = parent_option.identifiers() + for identifier in identifiers: + dynamic_variable = dynamic_variable.replace( + "{{ identifier }}", str(identifier), 1 + ) + if dynamic_variable not in values and not self.config.option(dynamic_variable).get().impl_getdefault(): + values[dynamic_variable] = {"values": []} + added.append(dynamic_variable) + elif dynamic_variable not in added: + continue + option_type = option.information.get("type") + typ = CONVERT_OPTION.get(option_type, {}).get("func") + if typ: + identifier = typ(identifier) + if identifier not in values[dynamic_variable]["values"]: + values[dynamic_variable]["values"].append(identifier) + cache[option.path()] = option, identifier + + def _get_identifier(self, true_name, name) -> str: + if true_name == "{{ identifier }}": + return name + regexp = true_name.replace("{{ identifier }}", "(.*)") + finded = findall(regexp, name) + if len(finded) != 1 or not finded[0]: + return None + return finded[0] + + +def convert_value(option, value): + if value == "": + return None + option_type = option.information.get("type") + func = CONVERT_OPTION.get(option_type, {}).get("func") + if func: + return func(value) + return value