# -*- coding: utf-8 -*- # Copyright (C) 2012-2026 Team tiramisu (see AUTHORS for all contributors) # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by the # Free Software Foundation, either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # # The original `Config` design model is unproudly borrowed from # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ """options handler global entry point """ import weakref from copy import copy, deepcopy from typing import Optional, List, Any, Union from os.path import commonprefix from .error import ( PropertiesOptionError, ConfigError, ConflictError, LeadershipError, AttributeOptionError, ) from .option import DynOptionDescription, Leadership, Option from .setting import ConfigBag, Settings, undefined, groups from .value import Values, owners from .i18n import _ from .cacheobj import Cache from .autolib import Calculation from . import autolib def get_common_path(path1, path2): if None in (path1, path2): return None common_path = commonprefix([path1, path2]) all_paths = [path1, path2] if common_path in all_paths: # od.st is not the common_path of od.st_in all_paths.remove(common_path) if all_paths[0].startswith(common_path + '.'): return common_path if common_path.endswith("."): return common_path[:-1] elif "." in common_path: return common_path.rsplit(".", 1)[0] return None class CCache: __slots__ = tuple() # ============================================================================= # CACHE def reset_cache( self, subconfig, resetted_opts=None, ): """reset all settings in cache""" if resetted_opts is None: resetted_opts = [] if subconfig is not None: if "cache" not in subconfig.config_bag.properties: return subconfig.config_bag.properties = subconfig.config_bag.properties - { "cache" } self.reset_one_option_cache( subconfig, resetted_opts, False, ) subconfig.config_bag.properties = subconfig.config_bag.properties | { "cache" } else: self._impl_values_cache.reset_all_cache() # pylint: disable=no-member self.properties_cache.reset_all_cache() # pylint: disable=no-member def reset_one_option_cache( self, subconfig, resetted_opts, is_default, *, force=False, ): """reset cache for one option""" if not force and subconfig.path in resetted_opts: return resetted_opts.append(subconfig.path) config_bag = subconfig.config_bag if not force: # if is_default and config_bag.context.get_owner(subconfig) != owners.default: # return for is_default, woption in subconfig.option.get_dependencies(subconfig.option): option = woption() if option.issubdyn(): # it's an option in dynoptiondescription, remove cache for all generated option if option.impl_getpath() == subconfig.option.impl_getpath(): force = True subconfig = subconfig.parent.parent self.reset_cache_dyn_option( subconfig, option, resetted_opts, is_default, force, ) elif option.impl_is_dynoptiondescription(): self.reset_cache_dyn_optiondescription( option, config_bag, resetted_opts, is_default, ) else: option_subconfig = self.get_sub_config( config_bag, option.impl_getpath(), None, properties=None, validate_properties=False, ) self.reset_one_option_cache( option_subconfig, resetted_opts, is_default, ) del option subconfig.option.reset_cache( subconfig.path, config_bag, resetted_opts, ) def get_dynamic_from_dyn_optiondescription(self, config_bag, option): path = option.impl_getpath() if "." in path: parent_path = path.rsplit(".", 1)[0] parent_subconfig = self.get_sub_config( config_bag, parent_path, None, properties=None, validate_properties=False, ) else: parent_subconfig = self.get_root(config_bag) return parent_subconfig.dyn_to_subconfig( option, False, ) def reset_cache_dyn_optiondescription( self, option, config_bag, resetted_opts, is_default, ): # reset cache for all chidren for subconfig in self.get_dynamic_from_dyn_optiondescription( config_bag, option, ): self.reset_one_option_cache( subconfig, resetted_opts, is_default, ) for walk_subconfig in self.walk( subconfig, no_value=True, validate_properties=False, ): self.reset_one_option_cache( walk_subconfig, resetted_opts, is_default, ) def get_dynamic_from_dyn_option(self, subconfig, option): config_bag = subconfig.config_bag sub_paths = option.impl_getpath() if not subconfig.path: current_paths = [] current_paths_max_index = 0 else: current_paths = subconfig.path.split(".") current_paths_max_index = len(current_paths) - 1 current_subconfigs = [] parent = subconfig while True: current_subconfigs.insert(0, parent) parent = parent.parent if not parent or parent.path is None: break currents = [self.get_root(config_bag)] for idx, sub_path in enumerate(sub_paths.split(".")): new_currents = [] for current in currents: sub_option = current.option.get_child( sub_path, config_bag, current, allow_dynoption=True, ) if sub_option.impl_is_dynoptiondescription(): if ( idx <= current_paths_max_index and sub_option == current_subconfigs[idx].option ): new_currents.append(current_subconfigs[idx]) else: new_currents.extend( list( current.dyn_to_subconfig( sub_option, False, ) ) ) else: new_currents.append( current.get_child( sub_option, None, False, properties=None, ), ) currents = new_currents return currents def reset_cache_dyn_option( self, subconfig, option, resetted_opts, is_default, force, ): for dyn_option_subconfig in self.get_dynamic_from_dyn_option(subconfig, option): self.reset_one_option_cache( dyn_option_subconfig, resetted_opts, is_default, force=force, ) class SubConfig: __slots__ = ( "config_bag", "option", "parent", "index", "path", "true_path", "_properties", "apply_requires", "transitive_properties", "is_dynamic", "is_dynamic_without_identifiers", "identifiers", "_length", ) def __init__( self, option: Option, index: Optional[int], path: str, config_bag: ConfigBag, parent: Optional["SubConfig"], identifiers: Optional[list[str]], *, true_path: Optional[str] = None, # for python 3.9 properties: Union[list[str], undefined] = undefined, properties=undefined, validate_properties: bool = True, check_dynamic_without_identifiers: bool = True, ) -> None: self.index = index self.identifiers = identifiers self.option = option self.config_bag = config_bag self.parent = parent self._length = None self.path = path if true_path is None: true_path = path is_follower = ( not option.impl_is_optiondescription() and option.impl_is_follower() ) self.apply_requires = not is_follower or index is not None self.true_path = true_path if self.option.impl_is_dynoptiondescription(): self.is_dynamic = True self.is_dynamic_without_identifiers = identifiers is None or ( parent and identifiers == parent.identifiers ) if ( check_dynamic_without_identifiers and parent and parent.is_dynamic and parent.is_dynamic_without_identifiers and self.is_dynamic_without_identifiers != parent.is_dynamic_without_identifiers ): raise AttributeOptionError(true_path, "option-dynamic") elif parent: self.is_dynamic = parent.is_dynamic self.is_dynamic_without_identifiers = parent.is_dynamic_without_identifiers else: self.is_dynamic = False self.is_dynamic_without_identifiers = False self._properties = properties if validate_properties: if self.path and self._properties is undefined: settings = config_bag.context.get_settings() self._properties = settings.getproperties( self, apply_requires=False, ) self.config_bag.context.get_settings().validate_properties(self) self._properties = undefined self.config_bag.context.get_settings().validate_properties(self) if self.apply_requires and self.option.impl_is_optiondescription(): if self.path and self.properties is not None: settings = config_bag.context.get_settings() self.transitive_properties = settings.calc_transitive_properties( self, self.properties, ) else: self.transitive_properties = frozenset() @property def properties(self): if self._properties is undefined: if self.path is None: self._properties = frozenset() else: settings = self.config_bag.context.get_settings() self._properties = frozenset() self._properties = settings.getproperties( self, apply_requires=self.apply_requires, ) return self._properties @properties.setter def properties(self, properties): self._properties = properties def __repr__(self): return f"" def dyn_to_subconfig( self, child: Option, validate_properties: bool, *, true_path: Optional[str] = None, ) -> List["SubConfig"]: config_bag = self.config_bag for identifier in child.get_identifiers(self): try: name = child.impl_getname(identifier) if not validate_properties: properties = None else: properties = undefined yield self.get_child( child, None, validate_properties, identifier=identifier, name=name, properties=properties, true_path=true_path, ) except PropertiesOptionError as err: if err.proptype in (["mandatory"], ["empty"]): raise err def get_leadership_children( self, validate_properties, ): # it's a leadership so walk to leader and follower # followers has specific length leader, *followers = self.option.get_children() yield self.get_child( leader, None, validate_properties, ) for idx in range(self.get_length_leadership()): for follower in followers: try: yield self.get_child( follower, idx, validate_properties, ) except PropertiesOptionError as err: if err.proptype in (["mandatory"], ["empty"]): raise err from err def get_children( self, validate_properties, *, uncalculated: bool = False, with_index: bool = True, check_dynamic_without_identifiers: bool = True, ): if self.option.impl_is_leadership() and not uncalculated and with_index: yield from self.get_leadership_children(validate_properties) else: children_name = [] for child in self.option.get_children(): if child.impl_is_dynoptiondescription() and not uncalculated: for dyn_child in self.dyn_to_subconfig( child, validate_properties, ): yield dyn_child if child.could_conflict: name = dyn_child.path if name in children_name: raise ConflictError( _('option name "{0}" is not unique in {1}').format( name, self.option.impl_get_display_name( self, with_quote=True ), ) ) children_name.append(name) else: try: yield self.get_child( child, None, validate_properties, check_dynamic_without_identifiers=check_dynamic_without_identifiers, ) except PropertiesOptionError as err: if err.proptype in (["mandatory"], ["empty"]): raise err if child.could_conflict: name = child.impl_getpath() if name in children_name: raise ConflictError( _('option name "{0}" is not unique in {1}').format( name, self.option.impl_get_display_name( self, with_quote=True ), ) ) children_name.append(name) def get_child( self, option: Option, index: Optional[int], validate_properties: bool, *, properties=undefined, identifier: Optional[str] = None, name: Optional[str] = None, check_index: bool = True, config_bag: ConfigBag = None, true_path: Optional[str] = None, check_dynamic_without_identifiers: bool = True, ) -> "SubConfig": # pylint: disable=too-many-branches,too-many-locals,too-many-arguments if config_bag is None: config_bag = self.config_bag if not self.option.impl_is_optiondescription(): raise TypeError(f'"{self.path}" is not an optiondescription') path = self.get_path( name, option, ) if identifier is None: identifiers = self.identifiers else: if self.identifiers: identifiers = self.identifiers + [identifier] else: identifiers = [identifier] subsubconfig = SubConfig( option, index, path, config_bag, self, identifiers, properties=properties, validate_properties=validate_properties, true_path=true_path, check_dynamic_without_identifiers=check_dynamic_without_identifiers, ) if check_index and index is not None: if option.impl_is_optiondescription() or not option.impl_is_follower(): raise ConfigError("index must be set only with a follower option", subconfig=subsubconfig,) length = self.get_length_leadership() if index >= length: raise LeadershipError( subsubconfig, "leadership-greater", index=index, length=length ) return subsubconfig def get_path( self, name: str, option: Option, ) -> str: if name is None: name = option.impl_getname() if self.path is None: path = name else: path = self.path + "." + name return path def get_length_leadership(self): """Get the length of leader option (useful to know follower's length)""" if self._length is None: cconfig_bag = self.config_bag.copy() cconfig_bag.remove_validation() leader = self.option.get_leader() path = self.get_path( None, leader, ) subconfig = SubConfig( leader, None, path, cconfig_bag, self, self.identifiers, validate_properties=False, ) #FIXME #self._length = len(cconfig_bag.context.get_value(subconfig)) length = len(cconfig_bag.context.get_value(subconfig)) return length def get_common_child( self, search_option: "BaseOption", true_path: Optional[str] = None, validate_properties: bool = True, check_dynamic_without_identifiers: bool = True, ): current_option_path = self.option.impl_getpath() search_option_path = search_option.impl_getpath() common_path = get_common_path(current_option_path, search_option_path) config_bag = self.config_bag index = None if ( not self.option.impl_is_optiondescription() and self.option.impl_is_follower() and search_option.impl_is_follower() and self.parent.option == search_option.impl_get_leadership() ): index = self.index search_child_number = 0 parents = [self.parent] else: if common_path: common_parent_number = common_path.count(".") + 1 parent_count = current_option_path.count(".") - common_parent_number if parent_count >= 0: parent = self.parent for idx in range(parent_count): parent = parent.parent elif parent_count == 0: parent = self.parent else: # so -1 parent = self parents = [parent] else: common_parent_number = 0 parents = [config_bag.context.get_root(config_bag)] search_child_number = search_option_path.count(".") - common_parent_number subconfigs_is_a_list = False if search_child_number: if common_parent_number: parent_paths = search_option_path.rsplit(".", search_child_number + 1)[ 1:-1 ] else: parent_paths = search_option_path.split(".")[:-1] for parent_path in parent_paths: new_parents = [] for parent in parents: sub_option = parent.option.get_child( parent_path, config_bag, parent, allow_dynoption=True, ) if sub_option.impl_is_dynoptiondescription(): new_parents.extend( parent.dyn_to_subconfig( sub_option, True, true_path=true_path, ) ) subconfigs_is_a_list = True else: new_parents.append( parent.get_child( sub_option, None, validate_properties, true_path=true_path, check_dynamic_without_identifiers=check_dynamic_without_identifiers, config_bag=config_bag, ) ) parents = new_parents subconfigs = [] for parent in parents: try: ret = parent.get_child( search_option, index, validate_properties, check_dynamic_without_identifiers=check_dynamic_without_identifiers, config_bag=config_bag, ) except PropertiesOptionError as err: ret = err subconfigs.append(ret) if subconfigs_is_a_list: return subconfigs return subconfigs[0] def change_context(self, context) -> "SubConfig": config_bag = self.config_bag.copy() config_bag.context = context return SubConfig( self.option, self.index, self.path, config_bag, self.parent, self.identifiers, true_path=self.true_path, validate_properties=False, ) class _Config(CCache): """Sub configuration management entry. Tree if OptionDescription's responsability. SubConfig are generated on-demand. A Config is also a SubConfig. Root Config is call context below """ __slots__ = ( "_impl_context", "_impl_descr", "_impl_path", ) def __init__( self, descr, context, subpath=None, ): """Configuration option management class :param descr: describes the configuration schema :type descr: an instance of ``option.OptionDescription`` :param context: the current root config :type context: `Config` :type subpath: `str` with the path name """ # main option description self._impl_descr = descr self._impl_context = context self._impl_path = subpath def get_description(self): """get root description""" assert self._impl_descr is not None, _( "there is no option description for this config" " (may be GroupConfig)" ) return self._impl_descr def get_settings(self): """get settings object""" return self._impl_settings # pylint: disable=no-member def get_values(self): """get values object""" return self._impl_values # pylint: disable=no-member def get_values_cache(self): """get cache for values""" return self._impl_values_cache # pylint: disable=no-member # # ============================================================================= # # WALK def walk_valid_value( self, subconfig, only_mandatory, ): value = self.get_value( subconfig, need_help=False, ) ori_config_bag = subconfig.config_bag config_bag = ori_config_bag.copy() if only_mandatory: config_bag.properties |= {"mandatory", "empty"} subconfig.config_bag = config_bag self.get_settings().validate_mandatory( subconfig, value, ) subconfig.config_bag = ori_config_bag return value def get_root( self, config_bag: ConfigBag, ) -> SubConfig: return SubConfig( config_bag.context.get_description(), None, None, config_bag, None, None, ) def get_sub_config( self, config_bag, path, index, *, validate_properties: bool = True, properties=undefined, true_path: Optional[str] = None, allow_dynoption: bool = False, valid_conflict: bool = True, ): subconfig = self.get_root(config_bag) if path is None: paths = [] len_path = 0 else: if "." in path: paths = path.split(".") else: paths = [path] len_path = len(paths) - 1 for idx, name in enumerate(paths): if idx != len_path: index_ = None true_path_ = None else: index_ = index true_path_ = true_path if not subconfig.option.impl_is_optiondescription(): raise TypeError(f'"{subconfig.true_path}" is not an optiondescription') option = subconfig.option.get_child( name, config_bag, subconfig, with_identifier=True, allow_dynoption=allow_dynoption, ) if isinstance(option, tuple): identifier, option = option else: identifier = None if valid_conflict and option.could_conflict: for ref in option.could_conflict: child = ref() if child.impl_is_dynoptiondescription(): for dyn_child in subconfig.dyn_to_subconfig( child, validate_properties, ): if path == dyn_child.path: raise ConflictError( _('option name "{0}" is not unique in {1}').format( name, option.impl_get_display_name( subconfig, with_quote=True ), ) ) elif child.impl_getname() == name: raise ConflictError( _('option name "{0}" is not unique in {1}').format( name, self.option.impl_get_display_name( self, with_quote=True ), ) ) subconfig = subconfig.get_child( option, index_, validate_properties, properties=properties, name=name, identifier=identifier, true_path=true_path_, ) return subconfig def walk( self, root_subconfig: SubConfig, *, no_value: bool = False, only_mandatory: bool = False, validate_properties: bool = True, ): if only_mandatory or no_value: ret = [] else: ret = {} for subconfig in root_subconfig.get_children(validate_properties): # pylint: disable=too-many-branches,too-many-locals,too-many-arguments, if only_mandatory and subconfig.option.impl_is_symlinkoption(): continue if subconfig.option.impl_is_optiondescription(): values = self.walk( subconfig, no_value=no_value, only_mandatory=only_mandatory, validate_properties=validate_properties, ) if only_mandatory or no_value: ret.extend(values) else: ret[subconfig] = values else: if no_value: ret.append(subconfig) else: option = self.walk_option( subconfig, only_mandatory, ) if only_mandatory: if option: ret.append(subconfig) elif option[0]: ret[subconfig] = option[1] return ret def walk_option( self, subconfig: SubConfig, only_mandatory: bool, ): try: value = self.walk_valid_value( subconfig, only_mandatory, ) except PropertiesOptionError as err: if err.proptype in (["mandatory"], ["empty"]): if only_mandatory: return True else: raise err from err else: if not only_mandatory: return True, value if only_mandatory: return False return False, None # ============================================================================= # Manage value def set_value( self, subconfig, value: Any, ) -> Any: """set value""" self.get_settings().validate_properties(subconfig) return self.get_values().set_value(subconfig, value) def get_value( self, subconfig, need_help=True, ): """ :return: option's value if name is an option name, OptionDescription otherwise """ original_index = subconfig.index subconfig = self._get( subconfig, need_help, ) if isinstance(subconfig, list): value = [] follower_subconfig = None is_follower = not subconfig or subconfig[0].option.impl_is_follower() for sconfig in subconfig: if not is_follower or follower_subconfig is None: follower_subconfig = self.get_sub_config( sconfig.config_bag, sconfig.path, sconfig.index, ) else: follower_subconfig = follower_subconfig.parent.get_child( sconfig.option, sconfig.index, False, ) value.append( self.get_value( follower_subconfig, need_help=need_help, ) ) else: value = self.get_values().get_cached_value(subconfig) if subconfig.option.impl_is_follower(): length = subconfig.parent.get_length_leadership() follower_len = self.get_values().get_max_length(subconfig.path) if follower_len > length: option_name = subconfig.option.impl_get_display_name( subconfig, with_quote=True ) raise LeadershipError( subconfig, "leadership-follower-greater", index=follower_len, length=length, ) self.get_settings().validate_mandatory( subconfig, value, ) if original_index != subconfig.index: value = value[original_index] return value def _get( self, subconfig: "SubConfig", need_help: bool, validate_properties: bool = True, ) -> "OptionBag": # pylint: disable=too-many-locals option = subconfig.option if not option.impl_is_symlinkoption(): return subconfig suboption = option.impl_getopt() if suboption.issubdyn(): dynopt = suboption.getsubdyn() return subconfig.get_common_child( suboption, true_path=subconfig.path, validate_properties=validate_properties, ) if suboption.impl_is_follower() and subconfig.index is None: subconfig = self.get_sub_config( subconfig.config_bag, # pylint: disable=no-member suboption.impl_getpath(), None, validate_properties=validate_properties, true_path=subconfig.path, ) leadership_length = subconfig.parent.get_length_leadership() ret = [] follower = subconfig.option parent = subconfig.parent for idx in range(leadership_length): ret.append( parent.get_child( follower, idx, True, ) ) return ret if suboption.impl_is_leader(): index = None else: index = subconfig.index s_subconfig = self.get_sub_config( subconfig.config_bag, # pylint: disable=no-member suboption.impl_getpath(), index, validate_properties=validate_properties, true_path=subconfig.path, ) return self._get( s_subconfig, need_help, ) def get_owner( self, subconfig: "SubConfig", *, validate_meta=True, ): """get owner""" subconfigs = self._get( subconfig, need_help=True, ) if isinstance(subconfigs, list): for sc in subconfigs: owner = self.get_owner( sc, validate_meta=validate_meta, ) if owner != owners.default: break else: owner = owners.default else: owner = self.get_values().getowner(subconfigs, validate_meta=validate_meta) return owner class _CommonConfig(_Config): "abstract base class for the Config, KernelGroupConfig and the KernelMetaConfig" __slots__ = ( "_impl_values", "_impl_values_cache", "_impl_settings", "properties_cache", "_impl_permissives_cache", "parents", "impl_type", ) def _impl_build_all_caches(self, descr): if not descr.impl_already_build_caches(): descr._group_type = groups.root # pylint: disable=protected-access descr._build_cache( self._display_name ) # pylint: disable=no-member,protected-access if not hasattr(descr, "_cache_force_store_values"): raise ConfigError( _("option description seems to be part of an other " "config") ) def get_parents(self): """get parents""" for parent in self.parents: # pylint: disable=no-member yield parent() # information def impl_set_information( self, config_bag, key, value, ): """updates the information's attribute :param key: information's key (ex: "help", "doc" :param value: information's value (ex: "the help string") """ self._impl_values.set_information( None, # pylint: disable=no-member key, value, ) for option in self.get_description()._cache_dependencies_information.get( key, [] ): # pylint: disable=protected-access # option_bag = OptionBag(option, # None, # config_bag, # properties=None, # ) option_bag = None self.reset_cache(option_bag) def impl_get_information( self, subconfig, key, default, ): """retrieves one information's item :param key: the item string (ex: "help") """ return self._impl_values.get_information( None, # pylint: disable=no-member key, default, ) def impl_del_information( self, key, raises=True, ): """delete an information""" self._impl_values.del_information( key, # pylint: disable=no-member raises, ) def impl_list_information(self): """list information keys for context""" return self._impl_values.list_information() # pylint: disable=no-member def gen_fake_context(self) -> "KernelConfig": """generate a fake values to improve validation when assign a new value""" export = deepcopy(self.get_values()._values) # pylint: disable=protected-access fake_context = KernelConfig( self._impl_descr, force_values=export, force_settings=self.get_settings(), name=self._impl_name, # pylint: disable=no-member ) fake_context.parents = self.parents # pylint: disable=no-member return fake_context def duplicate( self, force_values=None, force_settings=None, metaconfig_prefix=None, child=None, deep=None, name=None, ): """duplication config""" # pylint: disable=too-many-arguments if name is None: name = self._impl_name # pylint: disable=no-member if isinstance(self, KernelConfig): duplicated_config = KernelConfig( self._impl_descr, _duplicate=True, force_values=force_values, force_settings=force_settings, name=name, ) else: duplicated_config = KernelMetaConfig( [], _duplicate=True, optiondescription=self._impl_descr, name=name, ) duplicated_values = duplicated_config.get_values() duplicated_settings = duplicated_config.get_settings() duplicated_values._values = deepcopy( self.get_values()._values ) # pylint: disable=protected-access duplicated_values._informations = deepcopy( self.get_values()._informations ) # pylint: disable=protected-access duplicated_settings._properties = deepcopy( self.get_settings()._properties ) # pylint: disable=protected-access duplicated_settings._permissives = deepcopy( self.get_settings()._permissives ) # pylint: disable=protected-access duplicated_settings.ro_append = self.get_settings().ro_append duplicated_settings.rw_append = self.get_settings().rw_append duplicated_settings.ro_remove = self.get_settings().ro_remove duplicated_settings.rw_remove = self.get_settings().rw_remove # duplicated_settings.default_properties = self.get_settings().default_properties duplicated_config.reset_cache(None, None) if child is not None: duplicated_config._impl_children.append( child ) # pylint: disable=protected-access child.parents.append(weakref.ref(duplicated_config)) if self.parents: # pylint: disable=no-member if deep is not None: for parent in self.parents: # pylint: disable=no-member wparent = parent() if wparent not in deep: deep.append(wparent) subname = wparent.impl_getname() if metaconfig_prefix: subname = metaconfig_prefix + subname duplicated_config = wparent.duplicate( deep=deep, metaconfig_prefix=metaconfig_prefix, child=duplicated_config, name=subname, ) else: duplicated_config.parents = self.parents # pylint: disable=no-member for parent in self.parents: # pylint: disable=no-member parent()._impl_children.append( duplicated_config ) # pylint: disable=protected-access return duplicated_config def get_config_path(self): """get config path""" path = self.impl_getname() for parent in self.parents: # pylint: disable=no-member wparent = parent() if wparent is None: # pragma: no cover raise ConfigError( _("parent of {0} not already exists").format(self._impl_name) ) # pylint: disable=no-member path = parent().get_config_path() + "." + path return path def impl_getname(self): """get config name""" return self._impl_name # pylint: disable=no-member # ____________________________________________________________ class KernelConfig(_CommonConfig): """main configuration management entry""" # pylint: disable=too-many-instance-attributes __slots__ = ( "__weakref__", "_impl_name", "_display_name", "_impl_symlink", "_storage", ) impl_type = "config" def __init__( self, descr, force_values=None, force_settings=None, name=None, display_name=None, _duplicate=False, ): """Configuration option management class :param descr: describes the configuration schema :type descr: an instance of ``option.OptionDescription`` :param context: the current root config :type context: `Config` """ # pylint: disable=too-many-arguments,too-many-arguments self._display_name = display_name self.parents = [] self._impl_symlink = [] self._impl_name = name if isinstance(descr, Leadership): raise ConfigError( _("cannot set leadership object has root optiondescription") ) if isinstance(descr, DynOptionDescription): msg = _("cannot set dynoptiondescription object has root optiondescription") raise ConfigError(msg) if force_settings is not None and force_values is not None: self._impl_settings = force_settings self._impl_permissives_cache = Cache() self.properties_cache = Cache() self._impl_values = Values(force_values) self._impl_values_cache = Cache() else: self._impl_settings = Settings() self._impl_permissives_cache = Cache() self.properties_cache = Cache() self._impl_values = Values() self._impl_values_cache = Cache() self._impl_context = weakref.ref(self) if None in [force_settings, force_values]: self._impl_build_all_caches(descr) super().__init__( descr, self._impl_context, None, ) class KernelGroupConfig(_CommonConfig): """Group a config with same optiondescription tree""" __slots__ = ( "__weakref__", "_impl_children", "_impl_name", "_display_name", ) impl_type = "group" def __init__( self, children, display_name=None, name=None, _descr=None, ): # pylint: disable=super-init-not-called names = [] for child in children: if not isinstance(child, (KernelConfig, KernelGroupConfig)): raise TypeError( _("child must be a Config, GroupConfig, MixConfig or MetaConfig") ) name_ = child._impl_name names.append(name_) if len(names) != len(set(names)): while range(1, len(names) + 1): name = names.pop(0) if name in names: raise ConflictError( _( "config name must be uniq in " 'groupconfig for "{0}"' ).format(name) ) self._impl_children = children self.parents = [] self._display_name = display_name if name: self._impl_name = name self._impl_context = weakref.ref(self) self._impl_descr = _descr self._impl_path = None def get_children(self): """get all children""" return self._impl_children def reset_cache( self, subconfig, resetted_opts=None, ): if resetted_opts is None: resetted_opts = [] if isinstance(self, KernelMixConfig): super().reset_cache( subconfig, resetted_opts=copy(resetted_opts), ) for child in self._impl_children: if subconfig is not None: parent_subconfig = subconfig.change_context(child) else: parent_subconfig = None child.reset_cache( parent_subconfig, resetted_opts=copy(resetted_opts), ) def set_value( self, subconfig, value, only_config=False, ): """Setattr not in current KernelGroupConfig, but in each children""" ret = [] for child in self._impl_children: cconfig_bag = subconfig.config_bag.copy() cconfig_bag.context = child if isinstance(child, KernelGroupConfig): ret.extend( child.set_value( subconfig, value, only_config=only_config, ) ) else: settings = child.get_settings() properties = settings.get_context_properties() permissives = settings.get_context_permissives() cconfig_bag.properties = properties cconfig_bag.permissives = permissives try: # GROUP coption_bag = child.get_sub_config( cconfig_bag, subconfig.path, subconfig.index, validate_properties=False, ) child.set_value( coption_bag, value, ) except PropertiesOptionError as err: # pylint: disable=protected-access ret.append( PropertiesOptionError( err.subconfig, err.proptype, err._settings, err._opt_type, err._name, err._orig_opt, ) ) except (ValueError, LeadershipError, AttributeError) as err: ret.append(err) return ret def find_group( self, config_bag, byname=None, bypath=undefined, byoption=undefined, byvalue=undefined, raise_if_not_found=True, _sub=False, ): """Find first not in current KernelGroupConfig, but in each children""" # pylint: disable=too-many-arguments # if KernelMetaConfig, all children have same OptionDescription in # context so search only one time the option for all children if bypath is undefined and byname is not None and self.impl_type == "meta": root_option_bag = OptionBag( self.get_description(), None, config_bag, ) next( self.find( root_option_bag, bytype=None, byname=byname, byvalue=undefined, raise_if_not_found=raise_if_not_found, with_option=True, ) ) byname = None ret = [] for child in self._impl_children: if isinstance(child, KernelGroupConfig): ret.extend( child.find_group( byname=byname, bypath=bypath, byoption=byoption, byvalue=byvalue, config_bag=config_bag, raise_if_not_found=False, _sub=True, ) ) else: cconfig_bag = config_bag.copy() cconfig_bag.context = child if cconfig_bag.properties is None: settings = child.get_settings() properties = settings.get_context_properties() permissives = settings.get_context_permissives() cconfig_bag.properties = properties cconfig_bag.permissives = permissives root_option_bag = OptionBag( child.get_description(), None, cconfig_bag, ) try: next( child.find( root_option_bag, None, byname, byvalue, raise_if_not_found=False, only_path=bypath, only_option=byoption, ) ) ret.append(child) except StopIteration: pass if not _sub: self._find_return_results( ret != [], # pylint: disable=use-implicit-booleaness-not-comparison raise_if_not_found, ) return ret def reset( self, path: str, only_children: bool, config_bag: ConfigBag, ) -> None: """reset value for specified path""" for child in self._impl_children: settings = child.get_settings() cconfig_bag = config_bag.copy() cconfig_bag.context = child settings = child.get_settings() properties = settings.get_context_properties() permissives = settings.get_context_permissives() cconfig_bag.properties = properties cconfig_bag.permissives = permissives cconfig_bag.remove_validation() # GROUP subconfig = child.get_sub_config( cconfig_bag, path, None, validate_properties=False, ) child.get_values().reset(subconfig) def getconfig( self, name: str, ) -> KernelConfig: """get a child from a config name""" for child in self._impl_children: if name == child.impl_getname(): return child raise ConfigError(_('unknown config "{}"').format(name)) class KernelMixConfig(KernelGroupConfig): """Kernel mixconfig: this config can have differents optiondescription tree""" # pylint: disable=too-many-instance-attributes __slots__ = ( "_impl_symlink", "_storage", ) impl_type = "mix" def __init__( self, optiondescription, children, name=None, display_name=None, _duplicate=False, ): self._impl_name = name self._impl_symlink = [] for child in children: if not isinstance(child, (KernelConfig, KernelMixConfig)): raise TypeError(_("child must be a Config, MixConfig or MetaConfig")) child.parents.append(weakref.ref(self)) self._impl_settings = Settings() self._impl_settings._properties = deepcopy(self._impl_settings._properties) self._impl_settings._permissives = deepcopy(self._impl_settings._permissives) self._impl_permissives_cache = Cache() self.properties_cache = Cache() self._impl_values = Values() self._impl_values._values = deepcopy(self._impl_values._values) self._impl_values_cache = Cache() self._display_name = display_name self._impl_build_all_caches(optiondescription) super().__init__( children, _descr=optiondescription, display_name=display_name, ) def set_value( self, subconfig, value, only_config=False, force_default=False, force_dont_change_value=False, force_default_if_same=False, ): """only_config: could be set if you want modify value in all Config included in this KernelMetaConfig """ # pylint: disable=too-many-branches,too-many-nested-blocks,too-many-locals,too-many-arguments ret = [] if only_config: if force_default or force_default_if_same or force_dont_change_value: raise ValueError( _( "force_default, force_default_if_same or " "force_dont_change_value cannot be set with" " only_config" ) ) else: if force_default or force_default_if_same or force_dont_change_value: if force_default and force_dont_change_value: raise ValueError( _( "force_default and force_dont_change_value" " cannot be set together" ) ) for child in self._impl_children: cconfig_bag = subconfig.config_bag.copy() cconfig_bag.context = child settings = child.get_settings() properties = settings.get_context_properties() cconfig_bag.properties = properties cconfig_bag.permissives = settings.get_context_permissives() try: if self.impl_type == "meta": obj = self else: obj = child validate_properties = ( not force_default and not force_default_if_same ) # MIX moption_bag = obj.get_sub_config( cconfig_bag, subconfig.path, subconfig.index, validate_properties=validate_properties, ) if force_default_if_same: if not child.get_values().hasvalue( subconfig.path, index=subconfig.index ): child_value = undefined else: child_value = child.get_value(moption_bag) if force_default or ( force_default_if_same and value == child_value ): child.get_values().reset(moption_bag) continue if force_dont_change_value: child_value = child.get_value(moption_bag) if value != child_value: child.set_value( moption_bag, child_value, ) except PropertiesOptionError as err: # pylint: disable=protected-access ret.append( PropertiesOptionError( err.subconfig, err.proptype, err._settings, err._opt_type, err._name, err._orig_opt, ) ) except (ValueError, LeadershipError, AttributeError) as err: ret.append(err) try: # MIX moption_bag = self.get_sub_config( subconfig.config_bag, subconfig.path, subconfig.index, validate_properties=not only_config, ) if only_config: ret = super().set_value( moption_bag, value, only_config=only_config, ) else: _CommonConfig.set_value( self, moption_bag, value, ) except (PropertiesOptionError, ValueError, LeadershipError) as err: ret.append(err) return ret def reset( self, path: str, only_children: bool, config_bag: ConfigBag, ) -> None: """reset value for a specified path""" # pylint: disable=arguments-differ rconfig_bag = config_bag.copy() rconfig_bag.remove_validation() if self.impl_type == "meta": # MIX subconfig = self.get_sub_config( config_bag, path, None, validate_properties=True, ) elif not only_children: try: # MIX subconfig = self.get_sub_config( rconfig_bag, path, None, validate_properties=True, ) except AttributeError: only_children = True for child in self._impl_children: rconfig_bag.context = child try: if self.impl_type == "meta": moption_bag = subconfig moption_bag.config_bag = rconfig_bag else: # MIX moption_bag = child.get_sub_config( rconfig_bag, path, None, validate_properties=True, ) child.get_values().reset(moption_bag) except AttributeError: pass if isinstance(child, KernelMixConfig): child.reset( path, False, rconfig_bag, ) if not only_children: subconfig.config_bag = config_bag self.get_values().reset(subconfig) def new_config( self, name=None, type_="config", ): """Create a new config/metaconfig/mixconfig and add it to this MixConfig""" if name: for child in self._impl_children: if child.impl_getname() == name: raise ConflictError( _("config name must be uniq in " "groupconfig for {0}").format( child ) ) assert type_ in ("config", "metaconfig", "mixconfig"), _( "unknown type {}" ).format(type_) if type_ == "config": config = KernelConfig(self._impl_descr, name=name) elif type_ == "metaconfig": config = KernelMetaConfig( [], optiondescription=self._impl_descr, name=name, ) elif type_ == "mixconfig": config = KernelMixConfig( children=[], optiondescription=self._impl_descr, name=name, ) # Copy context properties/permissives settings = config.get_settings() properties = settings.get_context_properties() settings.set_context_properties( properties, config, ) settings.set_context_permissives(settings.get_context_permissives()) settings.ro_append = settings.ro_append settings.rw_append = settings.rw_append settings.ro_remove = settings.ro_remove settings.rw_remove = settings.rw_remove # settings.default_properties = settings.default_properties config.parents.append(weakref.ref(self)) self._impl_children.append(config) return config def add_config( self, config, ): """Add a child config to a mix config""" if not config.impl_getname(): raise ConfigError(_("config added has no name, the name is mandatory")) if config.impl_getname() in [ child.impl_getname() for child in self._impl_children ]: raise ConflictError( _('config name "{0}" is not uniq in ' 'groupconfig "{1}"').format( config.impl_getname(), self.impl_getname() ), ) config.parents.append(weakref.ref(self)) self._impl_children.append(config) config.reset_cache(None, None) def remove_config( self, name, ): """Remove a child config to a mix config by it's name""" for current_index, child in enumerate(self._impl_children): if name == child.impl_getname(): child.reset_cache(None, None) break else: raise ConfigError(_("cannot find the config {0}").format(name)) for child_index, parent in enumerate(child.parents): if parent() == self: break else: # pragma: no cover raise ConfigError( _("cannot find the config {0}").format(self.impl_getname()) ) self._impl_children.pop(current_index) child.parents.pop(child_index) return child class KernelMetaConfig(KernelMixConfig): """Meta config""" __slots__ = tuple() impl_type = "meta" def __init__( self, children, optiondescription=None, name=None, display_name=None, _duplicate=False, ): descr = None if optiondescription is not None: if not _duplicate: new_children = [] for child_name in children: assert isinstance(child_name, str), _( "MetaConfig with optiondescription" " must have string has child, " "not {}" ).format(child_name) new_children.append( KernelConfig(optiondescription, name=child_name) ) children = new_children descr = optiondescription for child in children: if __debug__ and not isinstance(child, (KernelConfig, KernelMetaConfig)): raise TypeError(_("child must be a Config or MetaConfig")) if descr is None: descr = child.get_description() if child.impl_getname() is None: raise ConfigError( _("children in MetaConfig must have name") ) elif descr is not child.get_description(): raise ValueError( _( "all config in metaconfig must " "have the same optiondescription" ) ) super().__init__( descr, children, name=name, display_name=display_name, ) def add_config( self, config, ): if self._impl_descr is not config.get_description(): raise ValueError(_("metaconfig must have the same optiondescription")) super().add_config(config)