# -*- coding: utf-8 -*- "takes care of the option's values and multi values" # Copyright (C) 2013-2023 Team tiramisu (see AUTHORS for all contributors) # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by the # Free Software Foundation, either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # ____________________________________________________________ import weakref from typing import Optional, Any, Callable from .error import ConfigError, PropertiesOptionError from .setting import owners, undefined, forbidden_owners, OptionBag, ConfigBag from .autolib import Calculation, carry_out_calculation, Params from .i18n import _ class Values: """The `Config`'s root is indeed in charge of the `Option()`'s values, but the values are physicaly located here, in `Values`, wich is also responsible of a caching utility. """ __slots__ = ('_values', '_informations', '__weakref__', ) def __init__(self, default_values=None, ): """ Initializes the values's dict. :param storage: where values or owners are stored """ self._informations = {} # set default owner if not default_values: self._values = {None: {None: [None, owners.user]}} else: self._values = default_values #______________________________________________________________________ # get value def get_cached_value(self, option_bag: OptionBag, ) -> Any: """get value directly in cache if set otherwise calculated value and set it in cache :returns: value """ # try to retrive value in cache setting_properties = option_bag.config_bag.properties cache = option_bag.config_bag.context._impl_values_cache is_cached, value, validated = cache.getcache(option_bag.path, option_bag.config_bag.expiration_time, option_bag.index, setting_properties, option_bag.properties, 'value', ) # no cached value so get value if not is_cached: value = self.getvalue(option_bag) # validates and warns value if not validated: validate = option_bag.option.impl_validate(value, option_bag, check_error=True, ) if 'warnings' in setting_properties: option_bag.option.impl_validate(value, option_bag, check_error=False, ) # set value to cache if not is_cached: cache.setcache(option_bag.path, option_bag.index, value, option_bag.properties, setting_properties, validate, ) if isinstance(value, list): # return a copy, so value cannot be modified value = value.copy() # and return it return value def force_to_metaconfig(self, option_bag): # force_metaconfig_on_freeze in config => to metaconfig # force_metaconfig_on_freeze in option + config is kernelconfig => to metaconfig settings = option_bag.config_bag.context.cfgimpl_get_settings() if 'force_metaconfig_on_freeze' in option_bag.properties: settings = option_bag.config_bag.context.cfgimpl_get_settings() if 'force_metaconfig_on_freeze' in option_bag.option.impl_getproperties() and \ not settings._properties.get(option_bag.path, {}).get(None, frozenset()): # if force_metaconfig_on_freeze is only in option (not in config) return option_bag.config_bag.context.impl_type == 'config' else: return True return False def _do_value_list(self, value: Any, option_bag: OptionBag): ret = [] for val in value: if isinstance(val, (list, tuple)): ret.append(self._do_value_list(val, option_bag)) elif isinstance(val, Calculation): ret.append(val.execute(option_bag)) else: ret.append(val) return ret def getvalue(self, option_bag, ): """actually retrieves the value :param path: the path of the `Option` :param index: index for a follower `Option` :returns: value """ # get owner and value from store # index allowed only for follower index = option_bag.index is_follower = option_bag.option.impl_is_follower() if index is None or not is_follower: _index = None else: _index = index value, owner = self._values.get(option_bag.path, {}).get(_index, [undefined, owners.default]) if owner == owners.default or \ ('frozen' in option_bag.properties and \ ('force_default_on_freeze' in option_bag.properties or self.force_to_metaconfig(option_bag))): value = self.getdefaultvalue(option_bag) else: value = self.calc_value(option_bag, value) return value def calc_value(self, option_bag, value, reset_cache=True): if isinstance(value, Calculation): try: value = value.execute(option_bag) except ConfigError as err: msg = _(f'error when calculating "{option_bag.option.impl_get_display_name()}": {err} : {option_bag.path}') raise ConfigError(msg) from err elif isinstance(value, (list, tuple)): value = self._do_value_list(value, option_bag) if reset_cache: self.calculate_reset_cache(option_bag, value) return value def getdefaultvalue(self, option_bag, ): """get default value: - get parents config value or - get calculated value or - get default value """ moption_bag = self._get_modified_parent(option_bag) if moption_bag is not None: # retrieved value from parent config return moption_bag.config_bag.context.cfgimpl_get_values().get_cached_value(moption_bag) # now try to get default value: value = self.calc_value(option_bag, option_bag.option.impl_getdefault(), ) if option_bag.index is not None and isinstance(value, (list, tuple)): if value and option_bag.option.impl_is_submulti(): # first index is a list, assume other data are list too if isinstance(value[0], list): # if index, must return good value for this index if len(value) > option_bag.index: value = value[option_bag.index] else: # no value for this index, retrieve default multi value # default_multi is already a list for submulti value = self.calc_value(option_bag, option_bag.option.impl_getdefault_multi()) elif option_bag.option.impl_is_multi(): # if index, must return good value for this index if len(value) > option_bag.index: value = value[option_bag.index] else: # no value for this index, retrieve default multi value # default_multi is already a list for submulti value = self.calc_value(option_bag, option_bag.option.impl_getdefault_multi()) return value def calculate_reset_cache(self, option_bag, value): if not 'expire' in option_bag.properties: return cache = option_bag.config_bag.context._impl_values_cache is_cache, cache_value, validated = cache.getcache(option_bag.path, None, option_bag.index, option_bag.config_bag.properties, option_bag.properties, 'value') if not is_cache or cache_value == value: # calculation return same value as previous value, # so do not invalidate cache return # calculated value is a new value, so reset cache option_bag.config_bag.context.cfgimpl_reset_cache(option_bag) # and manage force_store_value self._set_force_value_suffix(option_bag) def isempty(self, opt, value, force_allow_empty_list=False, index=None): "convenience method to know if an option is empty" empty = opt._empty if index in [None, undefined] and opt.impl_is_multi(): isempty = value is None or (isinstance(value, list) and not force_allow_empty_list and value == []) or \ (isinstance(value, list) and None in value) or empty in value else: isempty = value is None or value == empty or (opt.impl_is_submulti() and value == []) return isempty #______________________________________________________________________ # set value def setvalue(self, value, option_bag, ): context = option_bag.config_bag.context owner = self.get_context_owner() if 'validator' in option_bag.config_bag.properties: self.setvalue_validation(value, option_bag) if isinstance(value, list): # copy value = value.copy() self._setvalue(option_bag, value, owner, ) setting_properties = option_bag.config_bag.properties validator = 'validator' in setting_properties and 'demoting_error_warning' not in setting_properties if validator: cache = option_bag.config_bag.context._impl_values_cache cache.setcache(option_bag.path, option_bag.index, value, option_bag.properties, setting_properties, validator) if 'force_store_value' in setting_properties and option_bag.option.impl_is_leader(): option_bag.option.impl_get_leadership().follower_force_store_value(self, value, option_bag, owners.forced, ) def setvalue_validation(self, value, option_bag): settings = option_bag.config_bag.context.cfgimpl_get_settings() # First validate properties with this value opt = option_bag.option settings.validate_frozen(option_bag) val = self.calc_value(option_bag, value, False) settings.validate_mandatory(val, option_bag) # Value must be valid for option opt.impl_validate(val, option_bag, check_error=True) if 'warnings' in option_bag.config_bag.properties: # No error found so emit warnings opt.impl_validate(value, option_bag, check_error=False) def _setvalue(self, option_bag: OptionBag, value: Any, owner: str, ) -> None: option_bag.config_bag.context.cfgimpl_reset_cache(option_bag) self.set_storage_value(option_bag.path, option_bag.index, value, owner, ) self._set_force_value_suffix(option_bag) def reduce_index(self, path, index): self._values[path][index - 1] = self._values[path].pop(index) def set_storage_value(self, path, index, value, owner, ): self._values.setdefault(path, {})[index] = [value, owner] def _set_force_value_suffix(self, option_bag: OptionBag, ) -> None: if 'force_store_value' not in option_bag.config_bag.properties: return for woption in option_bag.option._get_suffixes_dependencies(): option = woption() force_store_options = [] for coption in option.get_children_recursively(None, None, option_bag.config_bag, ): if 'force_store_value' in coption.impl_getproperties(): force_store_options.append(coption) if not force_store_options: continue rootpath = option.impl_getpath() settings = option_bag.config_bag.context.cfgimpl_get_settings() for suffix in option.get_suffixes(option_bag.config_bag): for coption in force_store_options: subpaths = [rootpath] + coption.impl_getpath()[len(rootpath) + 1:].split('.')[:-1] path_suffix = option.convert_suffix_to_path(suffix) subpath = '.'.join([subp + path_suffix for subp in subpaths]) doption = coption.to_dynoption(subpath, suffix, option, ) if coption.impl_is_follower(): leader = coption.impl_get_leadership().get_leader() loption_bag = OptionBag() loption_bag.set_option(leader, None, option_bag.config_bag, ) loption_bag.properties = frozenset() indexes = range(len(self.getvalue(loption_bag))) else: indexes = [None] for index in indexes: coption_bag = OptionBag() coption_bag.set_option(doption, index, option_bag.config_bag, ) coption_bag.properties = settings.getproperties(coption_bag) self._values.setdefault(coption_bag.path, {})[index] = [self.getvalue(coption_bag), owners.forced] def _get_modified_parent(self, option_bag: OptionBag) -> Optional[OptionBag]: """ Search in differents parents a Config with a modified value If not found, return None For follower option, return the Config where leader is modified """ def build_option_bag(option_bag, parent): doption_bag = option_bag.copy() config_bag = option_bag.config_bag.copy() config_bag.context = parent config_bag.unrestraint() doption_bag.config_bag = config_bag return doption_bag for parent in option_bag.config_bag.context.get_parents(): doption_bag = build_option_bag(option_bag, parent) if 'force_metaconfig_on_freeze' in option_bag.properties: # remove force_metaconfig_on_freeze only if option in metaconfig # hasn't force_metaconfig_on_freeze properties ori_properties = doption_bag.properties doption_bag.properties = doption_bag.config_bag.context.cfgimpl_get_settings().getproperties(doption_bag) if not self.force_to_metaconfig(doption_bag): doption_bag.properties = ori_properties - {'force_metaconfig_on_freeze'} else: doption_bag.properties = ori_properties parent_owner = parent.cfgimpl_get_values().getowner(doption_bag, only_default=True) if parent_owner != owners.default: return doption_bag return None #______________________________________________________________________ # owner def is_default_owner(self, option_bag, validate_meta=True): return self.getowner(option_bag, validate_meta=validate_meta, only_default=True) == owners.default def hasvalue(self, path, index=None, ): """if path has a value return: boolean """ has_path = path in self._values if index is None: return has_path elif has_path: return index in self._values[path] return False def getowner(self, option_bag, validate_meta=True, only_default=False): """ retrieves the option's owner :param opt: the `option.Option` object :param force_permissive: behaves as if the permissive property was present :returns: a `setting.owners.Owner` object """ context = option_bag.config_bag.context opt = option_bag.option if opt.impl_is_symlinkoption(): option_bag.ori_option = opt opt = opt.impl_getopt() option_bag.option = opt option_bag.path = opt.impl_getpath() settings = context.cfgimpl_get_settings() settings.validate_properties(option_bag) if 'frozen' in option_bag.properties and \ 'force_default_on_freeze' in option_bag.properties: return owners.default if only_default: if self.hasvalue(option_bag.path, option_bag.index): owner = 'not_default' else: owner = owners.default else: owner = self._values.get(option_bag.path, {}).get(option_bag.index, [undefined, owners.default])[1] if validate_meta is not False and (owner is owners.default or \ 'frozen' in option_bag.properties and 'force_metaconfig_on_freeze' in option_bag.properties): moption_bag = self._get_modified_parent(option_bag) if moption_bag is not None: owner = moption_bag.config_bag.context.cfgimpl_get_values().getowner(moption_bag, only_default=only_default) elif 'force_metaconfig_on_freeze' in option_bag.properties: return owners.default return owner def setowner(self, owner, option_bag, ): """ sets a owner to an option :param opt: the `option.Option` object :param owner: a valid owner, that is a `setting.owners.Owner` object """ opt = option_bag.option if opt.impl_is_symlinkoption(): raise ConfigError(_("can't set owner for the symlinkoption \"{}\"" "").format(opt.impl_get_display_name())) if owner in forbidden_owners: raise ValueError(_('set owner "{0}" is forbidden').format(str(owner))) if not self.hasvalue(option_bag.path, option_bag.index): raise ConfigError(_('no value for {0} cannot change owner to {1}' '').format(option_bag.path, owner)) option_bag.config_bag.context.cfgimpl_get_settings().validate_frozen(option_bag) self._values[option_bag.path][option_bag.index][1] = owner #______________________________________________________________________ # reset def reset(self, option_bag): context = option_bag.config_bag.context hasvalue = self.hasvalue(option_bag.path) setting_properties = option_bag.config_bag.properties if hasvalue and 'validator' in option_bag.config_bag.properties: fake_context = context._gen_fake_values() config_bag = option_bag.config_bag.copy() config_bag.remove_validation() config_bag.context = fake_context soption_bag = option_bag.copy() soption_bag.config_bag = config_bag fake_value = fake_context.cfgimpl_get_values() fake_value.reset(soption_bag) soption_bag.config_bag.properties = option_bag.config_bag.properties value = fake_value.getdefaultvalue(soption_bag) fake_value.setvalue_validation(value, soption_bag) opt = option_bag.option if opt.impl_is_leader(): opt.impl_get_leadership().reset(self, option_bag) if hasvalue: if 'force_store_value' in option_bag.config_bag.properties and 'force_store_value' in option_bag.properties: value = self.getdefaultvalue(option_bag) self._setvalue(option_bag, value, owners.forced, ) else: # for leader only value = None if option_bag.path in self._values: del self._values[option_bag.path] context.cfgimpl_reset_cache(option_bag) if 'force_store_value' in setting_properties and option_bag.option.impl_is_leader(): if value is None: value = self.getdefaultvalue(option_bag) option_bag.option.impl_get_leadership().follower_force_store_value(self, value, option_bag, owners.forced) def get_max_length(self, path): values = self._values.get(path, {}) if values: return max(values) + 1 return 0 def reset_follower(self, option_bag): if self.hasvalue(option_bag.path, index=option_bag.index): context = option_bag.config_bag.context setting_properties = option_bag.config_bag.properties if 'validator' in setting_properties: fake_context = context._gen_fake_values() fake_value = fake_context.cfgimpl_get_values() config_bag = option_bag.config_bag.copy() config_bag.remove_validation() config_bag.context = fake_context soption_bag = option_bag.copy() soption_bag.config_bag = config_bag fake_value.reset_follower(soption_bag) value = fake_value.getdefaultvalue(soption_bag) fake_value.setvalue_validation(value, soption_bag) if 'force_store_value' in setting_properties and 'force_store_value' in option_bag.properties: value = self.getdefaultvalue(option_bag) self._setvalue(option_bag, value, owners.forced, ) else: self.resetvalue_index(option_bag.path, option_bag.index, ) context.cfgimpl_reset_cache(option_bag) def resetvalue_index(self, path, index, ): if path in self._values and index in self._values[path]: del self._values[path][index] def reset_leadership(self, index, option_bag, subconfig): current_value = self.get_cached_value(option_bag) length = len(current_value) if index >= length: raise IndexError(_('index {} is greater than the length {} ' 'for option "{}"').format(index, length, option_bag.option.impl_get_display_name())) current_value.pop(index) subconfig.cfgimpl_get_description().pop(self, index, option_bag) self.setvalue(current_value, option_bag, ) #______________________________________________________________________ # information def set_information(self, config_bag, option_bag, key, value, ): """updates the information's attribute :param key: information's key (ex: "help", "doc" :param value: information's value (ex: "the help string") """ if option_bag is None: path = None else: path = option_bag.path self._informations.setdefault(path, {})[key] = value if path is not None: for option in option_bag.option.get_dependencies_information(itself=True): config_bag.context.cfgimpl_reset_cache(option_bag) def get_information(self, config_bag, option_bag, key, default, ): """retrieves one information's item :param key: the item string (ex: "help") """ if option_bag is None: path = None else: path = option_bag.path try: return self._informations[path][key] except KeyError as err: if option_bag: return option_bag.option.impl_get_information(key, default) if default is not undefined: return default raise ValueError(_("information's item not found: {0}").format(key)) def del_information(self, key, raises=True, path=None, ): if path in self._informations and key in self._informations[path]: del self._informations[path][key] elif raises: raise ValueError(_(f"information's item not found \"{key}\"")) def list_information(self, path=None, ): return list(self._informations.get(path, {}).keys()) #______________________________________________________________________ # mandatory warnings def _mandatory_warnings(self, context, config_bag, description, currpath, subconfig, od_config_bag): settings = context.cfgimpl_get_settings() for option in description.get_children(config_bag): name = option.impl_getname() if option.impl_is_optiondescription(): try: option_bag = OptionBag() option_bag.set_option(option, None, od_config_bag) option_bag.properties = settings.getproperties(option_bag) subsubconfig = subconfig.get_subconfig(option_bag) except PropertiesOptionError as err: pass else: for option in self._mandatory_warnings(context, config_bag, option, currpath + [name], subsubconfig, od_config_bag): yield option elif not option.impl_is_symlinkoption(): # don't verifying symlink try: if not option.impl_is_follower(): option_bag = OptionBag() option_bag.set_option(option, None, config_bag) option_bag.properties = settings.getproperties(option_bag) if 'mandatory' in option_bag.properties or 'empty' in option_bag.properties: subconfig.getattr(name, option_bag) else: for index in range(subconfig.cfgimpl_get_length()): option_bag = OptionBag() option_bag.set_option(option, index, config_bag) option_bag.properties = settings.getproperties(option_bag) if 'mandatory' in option_bag.properties or 'empty' in option_bag.properties: subconfig.getattr(name, option_bag) except PropertiesOptionError as err: if err.proptype in (['mandatory'], ['empty']): yield option.impl_getpath() except ConfigError: pass def mandatory_warnings(self, config_bag): """convenience function to trace Options that are mandatory and where no value has been set :returns: generator of mandatory Option's path """ context = config_bag.context # copy od_setting_properties = config_bag.properties - {'mandatory', 'empty'} setting_properties = set(config_bag.properties) - {'warnings'} setting_properties.update(['mandatory', 'empty']) nconfig_bag = ConfigBag(context=config_bag.context, properties=frozenset(setting_properties), permissives=config_bag.permissives) nconfig_bag.set_permissive() od_config_bag = ConfigBag(context=nconfig_bag.context, properties=frozenset(od_setting_properties), permissives=nconfig_bag.permissives) od_config_bag.set_permissive() descr = context.cfgimpl_get_description() for option in self._mandatory_warnings(context, nconfig_bag, descr, [], context, od_config_bag): yield option #____________________________________________________________ # default owner methods def set_context_owner(self, owner): ":param owner: sets the default value for owner at the Config level" if owner in forbidden_owners: raise ValueError(_('set owner "{0}" is forbidden').format(str(owner))) self._values[None][None][1] = owner def get_context_owner(self): return self._values[None][None][1]