tiramisu/tiramisu/config.py

1843 lines
63 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2012-2025 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# The original `Config` design model is unproudly borrowed from
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence
# ____________________________________________________________
"""options handler global entry point
"""
import weakref
from copy import copy, deepcopy
from typing import Optional, List, Any, Union
from os.path import commonprefix
from .error import PropertiesOptionError, ConfigError, ConflictError, LeadershipError
from .option import DynOptionDescription, Leadership, Option
from .setting import ConfigBag, Settings, undefined, groups
from .value import Values, owners
from .i18n import _
from .cacheobj import Cache
from .autolib import Calculation
from . import autolib
def get_common_path(path1, path2):
common_path = commonprefix([path1, path2])
if common_path in [path1, path2]:
return common_path
if common_path.endswith("."):
return common_path[:-1]
elif "." in common_path:
return common_path.rsplit(".", 1)[0]
return None
class CCache:
__slots__ = tuple()
# =============================================================================
# CACHE
def reset_cache(
self,
subconfig,
resetted_opts=None,
):
"""reset all settings in cache"""
if resetted_opts is None:
resetted_opts = []
if subconfig is not None:
if "cache" not in subconfig.config_bag.properties:
return
subconfig.config_bag.properties = subconfig.config_bag.properties - {
"cache"
}
self.reset_one_option_cache(
subconfig,
resetted_opts,
)
subconfig.config_bag.properties = subconfig.config_bag.properties | {
"cache"
}
else:
self._impl_values_cache.reset_all_cache() # pylint: disable=no-member
self.properties_cache.reset_all_cache() # pylint: disable=no-member
def reset_one_option_cache(
self,
subconfig,
resetted_opts,
):
"""reset cache for one option"""
if subconfig.path in resetted_opts:
return
resetted_opts.append(subconfig.path)
config_bag = subconfig.config_bag
for woption in subconfig.option.get_dependencies(subconfig.option):
option = woption()
if option.issubdyn():
# it's an option in dynoptiondescription, remove cache for all generated option
self.reset_cache_dyn_option(
subconfig,
option,
resetted_opts,
)
elif option.impl_is_dynoptiondescription():
self.reset_cache_dyn_optiondescription(
option,
config_bag,
resetted_opts,
)
else:
option_subconfig = self.get_sub_config(
config_bag,
option.impl_getpath(),
None,
properties=None,
validate_properties=False,
)
self.reset_one_option_cache(
option_subconfig,
resetted_opts,
)
del option
subconfig.option.reset_cache(
subconfig.path,
config_bag,
resetted_opts,
)
def get_dynamic_from_dyn_optiondescription(self, config_bag, option):
path = option.impl_getpath()
if "." in path:
parent_path = path.rsplit(".", 1)[0]
parent_subconfig = self.get_sub_config(
config_bag,
parent_path,
None,
properties=None,
validate_properties=False,
)
else:
parent_subconfig = self.get_root(config_bag)
return parent_subconfig.dyn_to_subconfig(
option,
False,
)
def reset_cache_dyn_optiondescription(
self,
option,
config_bag,
resetted_opts,
):
# reset cache for all chidren
for subconfig in self.get_dynamic_from_dyn_optiondescription(
config_bag,
option,
):
self.reset_one_option_cache(
subconfig,
resetted_opts,
)
for walk_subconfig in self.walk(
subconfig,
no_value=True,
validate_properties=False,
):
self.reset_one_option_cache(
walk_subconfig,
resetted_opts,
)
def get_dynamic_from_dyn_option(self, subconfig, option):
config_bag = subconfig.config_bag
sub_paths = option.impl_getpath()
current_paths = subconfig.path.split(".")
current_paths_max_index = len(current_paths) - 1
current_subconfigs = []
parent = subconfig
while True:
current_subconfigs.insert(0, parent)
parent = parent.parent
if parent.path is None:
break
currents = [self.get_root(config_bag)]
for idx, sub_path in enumerate(sub_paths.split(".")):
new_currents = []
for current in currents:
sub_option = current.option.get_child(
sub_path,
config_bag,
current,
allow_dynoption=True,
)
if sub_option.impl_is_dynoptiondescription():
if (
idx <= current_paths_max_index
and sub_option == current_subconfigs[idx].option
):
new_currents.append(current_subconfigs[idx])
else:
new_currents.extend(
list(
current.dyn_to_subconfig(
sub_option,
False,
)
)
)
else:
new_currents.append(
current.get_child(
sub_option,
None,
False,
properties=None,
),
)
currents = new_currents
return currents
def reset_cache_dyn_option(
self,
subconfig,
option,
resetted_opts,
):
for dyn_option_subconfig in self.get_dynamic_from_dyn_option(subconfig, option):
self.reset_one_option_cache(
dyn_option_subconfig,
resetted_opts,
)
class SubConfig:
__slots__ = (
"config_bag",
"option",
"parent",
"index",
"path",
"true_path",
"_properties",
"apply_requires",
"transitive_properties",
"is_dynamic",
"identifiers",
"_length",
)
def __init__(
self,
option: Option,
index: Optional[int],
path: str,
config_bag: ConfigBag,
parent: Optional["SubConfig"],
identifiers: Optional[list[str]],
*,
true_path: Optional[str] = None,
# for python 3.9 properties: Union[list[str], undefined] = undefined,
properties=undefined,
validate_properties: bool = True,
) -> None:
self.index = index
self.identifiers = identifiers
self.option = option
self.config_bag = config_bag
self.parent = parent
self._length = None
self.path = path
if true_path is None:
true_path = path
is_follower = (
not option.impl_is_optiondescription() and option.impl_is_follower()
)
self.apply_requires = not is_follower or index is not None
self.true_path = true_path
if parent and parent.is_dynamic or self.option.impl_is_dynoptiondescription():
self.is_dynamic = True
else:
self.is_dynamic = False
self._properties = properties
if validate_properties:
if self.path and self._properties is undefined:
settings = config_bag.context.get_settings()
self._properties = settings.getproperties(
self,
apply_requires=False,
)
self.config_bag.context.get_settings().validate_properties(self)
self._properties = undefined
self.config_bag.context.get_settings().validate_properties(self)
if self.apply_requires and self.option.impl_is_optiondescription():
if self.path and self.properties is not None:
settings = config_bag.context.get_settings()
self.transitive_properties = settings.calc_transitive_properties(
self,
self.properties,
)
else:
self.transitive_properties = frozenset()
@property
def properties(self):
if self._properties is undefined:
if self.path is None:
self._properties = frozenset()
else:
settings = self.config_bag.context.get_settings()
self._properties = frozenset()
self._properties = settings.getproperties(
self,
apply_requires=self.apply_requires,
)
return self._properties
@properties.setter
def properties(self, properties):
self._properties = properties
def __repr__(self):
return f"<SubConfig path={self.path}, index={self.index}>"
def dyn_to_subconfig(
self,
child: Option,
validate_properties: bool,
*,
true_path: Optional[str] = None,
) -> List["SubConfig"]:
config_bag = self.config_bag
for identifier in child.get_identifiers(self):
try:
name = child.impl_getname(identifier)
if not validate_properties:
properties = None
else:
properties = undefined
yield self.get_child(
child,
None,
validate_properties,
identifier=identifier,
name=name,
properties=properties,
true_path=true_path,
)
except PropertiesOptionError as err:
if err.proptype in (["mandatory"], ["empty"]):
raise err
def get_leadership_children(
self,
validate_properties,
):
# it's a leadership so walk to leader and follower
# followers has specific length
leader, *followers = self.option.get_children()
yield self.get_child(
leader,
None,
validate_properties,
)
for idx in range(self.get_length_leadership()):
for follower in followers:
try:
yield self.get_child(
follower,
idx,
validate_properties,
)
except PropertiesOptionError as err:
if err.proptype in (["mandatory"], ["empty"]):
raise err from err
def get_children(
self,
validate_properties,
*,
uncalculated: bool = False,
with_index: bool = True,
):
if self.option.impl_is_leadership() and not uncalculated and with_index:
yield from self.get_leadership_children(validate_properties)
else:
children_name = []
for child in self.option.get_children():
if child.impl_is_dynoptiondescription() and not uncalculated:
for dyn_child in self.dyn_to_subconfig(
child,
validate_properties,
):
yield dyn_child
if child.could_conflict:
name = dyn_child.path
if name in children_name:
raise ConflictError(
_("option name \"{0}\" is not unique in {1}").format(
name,
self.option.impl_get_display_name(
self, with_quote=True
),
)
)
children_name.append(name)
else:
try:
yield self.get_child(
child,
None,
validate_properties,
)
except PropertiesOptionError as err:
if err.proptype in (["mandatory"], ["empty"]):
raise err
if child.could_conflict:
name = child.impl_getpath()
if name in children_name:
raise ConflictError(
_("option name \"{0}\" is not unique in {1}").format(
name,
self.option.impl_get_display_name(
self, with_quote=True
),
)
)
children_name.append(name)
def get_child(
self,
option: Option,
index: Optional[int],
validate_properties: bool,
*,
properties=undefined,
allow_dynoption: bool = False,
identifier: Optional[str] = None,
name: Optional[str] = None,
check_index: bool = True,
config_bag: ConfigBag = None,
true_path: Optional[str] = None,
) -> "SubConfig":
# pylint: disable=too-many-branches,too-many-locals,too-many-arguments
if config_bag is None:
config_bag = self.config_bag
if not self.option.impl_is_optiondescription():
raise TypeError(f'"{self.path}" is not an optiondescription')
path = self.get_path(
name,
option,
)
if identifier is None:
identifiers = self.identifiers
else:
if self.identifiers:
identifiers = self.identifiers + [identifier]
else:
identifiers = [identifier]
subsubconfig = SubConfig(
option,
index,
path,
self.config_bag,
self,
identifiers,
properties=properties,
validate_properties=validate_properties,
true_path=true_path,
)
if check_index and index is not None:
if option.impl_is_optiondescription() or not option.impl_is_follower():
raise ConfigError("index must be set only with a follower option")
length = self.get_length_leadership()
if index >= length:
raise LeadershipError(
subsubconfig, "leadership-greater", index=index, length=length
)
return subsubconfig
def get_path(
self,
name: str,
option: Option,
) -> str:
if name is None:
name = option.impl_getname()
if self.path is None:
path = name
else:
path = self.path + "." + name
return path
def get_length_leadership(self):
"""Get the length of leader option (useful to know follower's length)"""
if self._length is None:
cconfig_bag = self.config_bag.copy()
cconfig_bag.remove_validation()
leader = self.option.get_leader()
path = self.get_path(
None,
leader,
)
subconfig = SubConfig(
leader,
None,
path,
cconfig_bag,
self,
self.identifiers,
validate_properties=False,
)
self._length = len(cconfig_bag.context.get_value(subconfig))
return self._length
def get_common_child(
self,
search_option: "BaseOption",
true_path: Optional[str] = None,
validate_properties: bool = True,
):
current_option_path = self.option.impl_getpath()
search_option_path = search_option.impl_getpath()
common_path = get_common_path(current_option_path, search_option_path)
config_bag = self.config_bag
index = None
if (
not self.option.impl_is_optiondescription()
and self.option.impl_is_follower()
and search_option.impl_is_follower()
and self.parent.option == search_option.impl_get_leadership()
):
index = self.index
search_child_number = 0
parents = [self.parent]
else:
if common_path:
parent = self.parent
common_parent_number = common_path.count(".") + 1
for idx in range(current_option_path.count(".") - common_parent_number):
parent = parent.parent
parents = [parent]
else:
common_parent_number = 0
parents = [config_bag.context.get_root(config_bag)]
search_child_number = search_option_path.count(".") - common_parent_number
subconfigs_is_a_list = False
if search_child_number:
if common_parent_number:
parent_paths = search_option_path.rsplit(".", search_child_number + 1)[
1:-1
]
else:
parent_paths = search_option_path.split(".")[:-1]
for parent_path in parent_paths:
new_parents = []
for parent in parents:
sub_option = parent.option.get_child(
parent_path,
config_bag,
parent,
allow_dynoption=True,
)
if sub_option.impl_is_dynoptiondescription():
new_parents.extend(
parent.dyn_to_subconfig(
sub_option,
True,
true_path=true_path,
)
)
subconfigs_is_a_list = True
else:
new_parents.append(
parent.get_child(
sub_option,
None,
validate_properties,
true_path=true_path,
)
)
parents = new_parents
subconfigs = []
for parent in parents:
subconfigs.append(
parent.get_child(
search_option,
index,
validate_properties,
)
)
if subconfigs_is_a_list:
return subconfigs
return subconfigs[0]
def change_context(self, context) -> "SubConfig":
config_bag = self.config_bag.copy()
config_bag.context = context
return SubConfig(
self.option,
self.index,
self.path,
config_bag,
self.parent,
self.identifiers,
true_path=self.true_path,
validate_properties=False,
)
class _Config(CCache):
"""Sub configuration management entry.
Tree if OptionDescription's responsability. SubConfig are generated
on-demand. A Config is also a SubConfig.
Root Config is call context below
"""
__slots__ = (
"_impl_context",
"_impl_descr",
"_impl_path",
)
def __init__(
self,
descr,
context,
subpath=None,
):
"""Configuration option management class
:param descr: describes the configuration schema
:type descr: an instance of ``option.OptionDescription``
:param context: the current root config
:type context: `Config`
:type subpath: `str` with the path name
"""
# main option description
self._impl_descr = descr
self._impl_context = context
self._impl_path = subpath
def get_description(self):
"""get root description"""
assert self._impl_descr is not None, _(
"there is no option description for this config" " (may be GroupConfig)"
)
return self._impl_descr
def get_settings(self):
"""get settings object"""
return self._impl_settings # pylint: disable=no-member
def get_values(self):
"""get values object"""
return self._impl_values # pylint: disable=no-member
def get_values_cache(self):
"""get cache for values"""
return self._impl_values_cache # pylint: disable=no-member
# # =============================================================================
# # WALK
def walk_valid_value(
self,
subconfig,
only_mandatory,
):
value = self.get_value(
subconfig,
need_help=False,
)
ori_config_bag = subconfig.config_bag
config_bag = ori_config_bag.copy()
if only_mandatory:
config_bag.properties |= {"mandatory", "empty"}
subconfig.config_bag = config_bag
self.get_settings().validate_mandatory(
subconfig,
value,
)
subconfig.config_bag = ori_config_bag
return value
def get_root(
self,
config_bag: ConfigBag,
) -> SubConfig:
return SubConfig(
config_bag.context.get_description(),
None,
None,
config_bag,
None,
None,
)
def get_sub_config(
self,
config_bag,
path,
index,
*,
validate_properties: bool = True,
properties=undefined,
true_path: Optional[str] = None,
allow_dynoption: bool = False,
valid_conflict: bool = True,
):
subconfig = self.get_root(config_bag)
if path is None:
paths = []
len_path = 0
else:
if "." in path:
paths = path.split(".")
else:
paths = [path]
len_path = len(paths) - 1
for idx, name in enumerate(paths):
if idx != len_path:
index_ = None
true_path_ = None
else:
index_ = index
true_path_ = true_path
if not subconfig.option.impl_is_optiondescription():
raise TypeError(f'"{subconfig.true_path}" is not an optiondescription')
option = subconfig.option.get_child(
name,
config_bag,
subconfig,
with_identifier=True,
allow_dynoption=allow_dynoption,
)
if isinstance(option, tuple):
identifier, option = option
else:
identifier = None
if valid_conflict and option.could_conflict:
for ref in option.could_conflict:
child = ref()
if child.impl_is_dynoptiondescription():
for dyn_child in subconfig.dyn_to_subconfig(
child,
validate_properties,
):
if path == dyn_child.path:
raise ConflictError(
_("option name \"{0}\" is not unique in {1}").format(
name,
option.impl_get_display_name(
subconfig, with_quote=True
),
)
)
elif child.impl_getname() == name:
raise ConflictError(
_("option name \"{0}\" is not unique in {1}").format(
name,
self.option.impl_get_display_name(
self, with_quote=True
),
)
)
subconfig = subconfig.get_child(
option,
index_,
validate_properties,
properties=properties,
name=name,
identifier=identifier,
true_path=true_path_,
)
return subconfig
def walk(
self,
root_subconfig: SubConfig,
*,
no_value: bool = False,
only_mandatory: bool = False,
validate_properties: bool = True,
):
if only_mandatory or no_value:
ret = []
else:
ret = {}
for subconfig in root_subconfig.get_children(validate_properties):
# pylint: disable=too-many-branches,too-many-locals,too-many-arguments,
if only_mandatory and subconfig.option.impl_is_symlinkoption():
continue
if subconfig.option.impl_is_optiondescription():
values = self.walk(
subconfig,
no_value=no_value,
only_mandatory=only_mandatory,
validate_properties=validate_properties,
)
if only_mandatory or no_value:
ret.extend(values)
else:
ret[subconfig] = values
else:
if no_value:
ret.append(subconfig)
else:
option = self.walk_option(
subconfig,
only_mandatory,
)
if only_mandatory:
if option:
ret.append(subconfig)
elif option[0]:
ret[subconfig] = option[1]
return ret
def walk_option(
self,
subconfig: SubConfig,
only_mandatory: bool,
):
try:
value = self.walk_valid_value(
subconfig,
only_mandatory,
)
except PropertiesOptionError as err:
if err.proptype in (["mandatory"], ["empty"]):
if only_mandatory:
return True
else:
raise err from err
else:
if not only_mandatory:
return True, value
if only_mandatory:
return False
return False, None
# =============================================================================
# Manage value
def set_value(
self,
subconfig,
value: Any,
) -> Any:
"""set value"""
self.get_settings().validate_properties(subconfig)
return self.get_values().set_value(subconfig, value)
def get_value(
self,
subconfig,
need_help=True,
):
"""
:return: option's value if name is an option name, OptionDescription
otherwise
"""
original_index = subconfig.index
subconfig = self._get(
subconfig,
need_help,
)
if isinstance(subconfig, list):
value = []
follower_subconfig = None
is_follower = not subconfig or subconfig[0].option.impl_is_follower()
for sconfig in subconfig:
if not is_follower or follower_subconfig is None:
follower_subconfig = self.get_sub_config(
sconfig.config_bag,
sconfig.path,
sconfig.index,
)
else:
follower_subconfig = follower_subconfig.parent.get_child(
sconfig.option,
sconfig.index,
False,
)
value.append(
self.get_value(
follower_subconfig,
need_help=need_help,
)
)
else:
value = self.get_values().get_cached_value(subconfig)
if subconfig.option.impl_is_follower():
length = subconfig.parent.get_length_leadership()
follower_len = self.get_values().get_max_length(subconfig.path)
if follower_len > length:
option_name = subconfig.option.impl_get_display_name(
subconfig, with_quote=True
)
raise LeadershipError(
subconfig,
"leadership-follower-greater",
index=follower_len,
length=length,
)
self.get_settings().validate_mandatory(
subconfig,
value,
)
if original_index != subconfig.index:
value = value[original_index]
return value
def _get(
self,
subconfig: "SubConfig",
need_help: bool,
validate_properties: bool = True,
) -> "OptionBag":
# pylint: disable=too-many-locals
option = subconfig.option
if not option.impl_is_symlinkoption():
return subconfig
suboption = option.impl_getopt()
if suboption.issubdyn():
dynopt = suboption.getsubdyn()
return subconfig.get_common_child(
suboption,
true_path=subconfig.path,
validate_properties=validate_properties,
)
if suboption.impl_is_follower() and subconfig.index is None:
subconfig = self.get_sub_config(
subconfig.config_bag, # pylint: disable=no-member
suboption.impl_getpath(),
None,
validate_properties=validate_properties,
true_path=subconfig.path,
)
leadership_length = subconfig.parent.get_length_leadership()
ret = []
follower = subconfig.option
parent = subconfig.parent
for idx in range(leadership_length):
ret.append(
parent.get_child(
follower,
idx,
True,
)
)
return ret
if suboption.impl_is_leader():
index = None
else:
index = subconfig.index
s_subconfig = self.get_sub_config(
subconfig.config_bag, # pylint: disable=no-member
suboption.impl_getpath(),
index,
validate_properties=validate_properties,
true_path=subconfig.path,
)
return self._get(
s_subconfig,
need_help,
)
def get_owner(
self,
subconfig: "SubConfig",
*,
validate_meta=True,
):
"""get owner"""
subconfigs = self._get(
subconfig,
need_help=True,
)
if isinstance(subconfigs, list):
for sc in subconfigs:
owner = self.get_owner(
sc,
validate_meta=validate_meta,
)
if owner != owners.default:
break
else:
owner = owners.default
else:
owner = self.get_values().getowner(subconfigs, validate_meta=validate_meta)
return owner
class _CommonConfig(_Config):
"abstract base class for the Config, KernelGroupConfig and the KernelMetaConfig"
__slots__ = (
"_impl_values",
"_impl_values_cache",
"_impl_settings",
"properties_cache",
"_impl_permissives_cache",
"parents",
"impl_type",
)
def _impl_build_all_caches(self, descr):
if not descr.impl_already_build_caches():
descr._group_type = groups.root # pylint: disable=protected-access
descr._build_cache(
self._display_name
) # pylint: disable=no-member,protected-access
if not hasattr(descr, "_cache_force_store_values"):
raise ConfigError(
_("option description seems to be part of an other " "config")
)
def get_parents(self):
"""get parents"""
for parent in self.parents: # pylint: disable=no-member
yield parent()
# information
def impl_set_information(
self,
config_bag,
key,
value,
):
"""updates the information's attribute
:param key: information's key (ex: "help", "doc"
:param value: information's value (ex: "the help string")
"""
self._impl_values.set_information(
None, # pylint: disable=no-member
key,
value,
)
for option in self.get_description()._cache_dependencies_information.get(
key, []
): # pylint: disable=protected-access
# option_bag = OptionBag(option,
# None,
# config_bag,
# properties=None,
# )
option_bag = None
self.reset_cache(option_bag)
def impl_get_information(
self,
subconfig,
key,
default,
):
"""retrieves one information's item
:param key: the item string (ex: "help")
"""
return self._impl_values.get_information(
None, # pylint: disable=no-member
key,
default,
)
def impl_del_information(
self,
key,
raises=True,
):
"""delete an information"""
self._impl_values.del_information(
key, # pylint: disable=no-member
raises,
)
def impl_list_information(self):
"""list information keys for context"""
return self._impl_values.list_information() # pylint: disable=no-member
def gen_fake_context(self) -> "KernelConfig":
"""generate a fake values to improve validation when assign a new value"""
export = deepcopy(self.get_values()._values) # pylint: disable=protected-access
fake_context = KernelConfig(
self._impl_descr,
force_values=export,
force_settings=self.get_settings(),
name=self._impl_name, # pylint: disable=no-member
)
fake_context.parents = self.parents # pylint: disable=no-member
return fake_context
def duplicate(
self,
force_values=None,
force_settings=None,
metaconfig_prefix=None,
child=None,
deep=None,
name=None,
):
"""duplication config"""
# pylint: disable=too-many-arguments
if name is None:
name = self._impl_name # pylint: disable=no-member
if isinstance(self, KernelConfig):
duplicated_config = KernelConfig(
self._impl_descr,
_duplicate=True,
force_values=force_values,
force_settings=force_settings,
name=name,
)
else:
duplicated_config = KernelMetaConfig(
[],
_duplicate=True,
optiondescription=self._impl_descr,
name=name,
)
duplicated_values = duplicated_config.get_values()
duplicated_settings = duplicated_config.get_settings()
duplicated_values._values = deepcopy(
self.get_values()._values
) # pylint: disable=protected-access
duplicated_values._informations = deepcopy(
self.get_values()._informations
) # pylint: disable=protected-access
duplicated_settings._properties = deepcopy(
self.get_settings()._properties
) # pylint: disable=protected-access
duplicated_settings._permissives = deepcopy(
self.get_settings()._permissives
) # pylint: disable=protected-access
duplicated_settings.ro_append = self.get_settings().ro_append
duplicated_settings.rw_append = self.get_settings().rw_append
duplicated_settings.ro_remove = self.get_settings().ro_remove
duplicated_settings.rw_remove = self.get_settings().rw_remove
# duplicated_settings.default_properties = self.get_settings().default_properties
duplicated_config.reset_cache(None, None)
if child is not None:
duplicated_config._impl_children.append(
child
) # pylint: disable=protected-access
child.parents.append(weakref.ref(duplicated_config))
if self.parents: # pylint: disable=no-member
if deep is not None:
for parent in self.parents: # pylint: disable=no-member
wparent = parent()
if wparent not in deep:
deep.append(wparent)
subname = wparent.impl_getname()
if metaconfig_prefix:
subname = metaconfig_prefix + subname
duplicated_config = wparent.duplicate(
deep=deep,
metaconfig_prefix=metaconfig_prefix,
child=duplicated_config,
name=subname,
)
else:
duplicated_config.parents = self.parents # pylint: disable=no-member
for parent in self.parents: # pylint: disable=no-member
parent()._impl_children.append(
duplicated_config
) # pylint: disable=protected-access
return duplicated_config
def get_config_path(self):
"""get config path"""
path = self.impl_getname()
for parent in self.parents: # pylint: disable=no-member
wparent = parent()
if wparent is None: # pragma: no cover
raise ConfigError(
_("parent of {0} not already exists").format(self._impl_name)
) # pylint: disable=no-member
path = parent().get_config_path() + "." + path
return path
def impl_getname(self):
"""get config name"""
return self._impl_name # pylint: disable=no-member
# ____________________________________________________________
class KernelConfig(_CommonConfig):
"""main configuration management entry"""
# pylint: disable=too-many-instance-attributes
__slots__ = (
"__weakref__",
"_impl_name",
"_display_name",
"_impl_symlink",
"_storage",
)
impl_type = "config"
def __init__(
self,
descr,
force_values=None,
force_settings=None,
name=None,
display_name=None,
_duplicate=False,
):
"""Configuration option management class
:param descr: describes the configuration schema
:type descr: an instance of ``option.OptionDescription``
:param context: the current root config
:type context: `Config`
"""
# pylint: disable=too-many-arguments,too-many-arguments
self._display_name = display_name
self.parents = []
self._impl_symlink = []
self._impl_name = name
if isinstance(descr, Leadership):
raise ConfigError(
_("cannot set leadership object has root optiondescription")
)
if isinstance(descr, DynOptionDescription):
msg = _("cannot set dynoptiondescription object has root optiondescription")
raise ConfigError(msg)
if force_settings is not None and force_values is not None:
self._impl_settings = force_settings
self._impl_permissives_cache = Cache()
self.properties_cache = Cache()
self._impl_values = Values(force_values)
self._impl_values_cache = Cache()
else:
self._impl_settings = Settings()
self._impl_permissives_cache = Cache()
self.properties_cache = Cache()
self._impl_values = Values()
self._impl_values_cache = Cache()
self._impl_context = weakref.ref(self)
if None in [force_settings, force_values]:
self._impl_build_all_caches(descr)
super().__init__(
descr,
self._impl_context,
None,
)
class KernelGroupConfig(_CommonConfig):
"""Group a config with same optiondescription tree"""
__slots__ = (
"__weakref__",
"_impl_children",
"_impl_name",
"_display_name",
)
impl_type = "group"
def __init__(
self,
children,
display_name=None,
name=None,
_descr=None,
):
# pylint: disable=super-init-not-called
names = []
for child in children:
if not isinstance(child, (KernelConfig, KernelGroupConfig)):
raise TypeError(
_("child must be a Config, GroupConfig, MixConfig or MetaConfig")
)
name_ = child._impl_name
names.append(name_)
if len(names) != len(set(names)):
while range(1, len(names) + 1):
name = names.pop(0)
if name in names:
raise ConflictError(
_(
"config name must be uniq in " 'groupconfig for "{0}"'
).format(name)
)
self._impl_children = children
self.parents = []
self._display_name = display_name
if name:
self._impl_name = name
self._impl_context = weakref.ref(self)
self._impl_descr = _descr
self._impl_path = None
def get_children(self):
"""get all children"""
return self._impl_children
def reset_cache(
self,
subconfig,
resetted_opts=None,
):
if resetted_opts is None:
resetted_opts = []
if isinstance(self, KernelMixConfig):
super().reset_cache(
subconfig,
resetted_opts=copy(resetted_opts),
)
for child in self._impl_children:
if subconfig is not None:
parent_subconfig = subconfig.change_context(child)
else:
parent_subconfig = None
child.reset_cache(
parent_subconfig,
resetted_opts=copy(resetted_opts),
)
def set_value(
self,
subconfig,
value,
only_config=False,
):
"""Setattr not in current KernelGroupConfig, but in each children"""
ret = []
for child in self._impl_children:
cconfig_bag = subconfig.config_bag.copy()
cconfig_bag.context = child
if isinstance(child, KernelGroupConfig):
ret.extend(
child.set_value(
subconfig,
value,
only_config=only_config,
)
)
else:
settings = child.get_settings()
properties = settings.get_context_properties()
permissives = settings.get_context_permissives()
cconfig_bag.properties = properties
cconfig_bag.permissives = permissives
try:
# GROUP
coption_bag = child.get_sub_config(
cconfig_bag,
subconfig.path,
subconfig.index,
validate_properties=False,
)
child.set_value(
coption_bag,
value,
)
except PropertiesOptionError as err:
# pylint: disable=protected-access
ret.append(
PropertiesOptionError(
err._subconfig,
err.proptype,
err._settings,
err._opt_type,
err._name,
err._orig_opt,
)
)
except (ValueError, LeadershipError, AttributeError) as err:
ret.append(err)
return ret
def find_group(
self,
config_bag,
byname=None,
bypath=undefined,
byoption=undefined,
byvalue=undefined,
raise_if_not_found=True,
_sub=False,
):
"""Find first not in current KernelGroupConfig, but in each children"""
# pylint: disable=too-many-arguments
# if KernelMetaConfig, all children have same OptionDescription in
# context so search only one time the option for all children
if bypath is undefined and byname is not None and self.impl_type == "meta":
root_option_bag = OptionBag(
self.get_description(),
None,
config_bag,
)
next(
self.find(
root_option_bag,
bytype=None,
byname=byname,
byvalue=undefined,
raise_if_not_found=raise_if_not_found,
with_option=True,
)
)
byname = None
ret = []
for child in self._impl_children:
if isinstance(child, KernelGroupConfig):
ret.extend(
child.find_group(
byname=byname,
bypath=bypath,
byoption=byoption,
byvalue=byvalue,
config_bag=config_bag,
raise_if_not_found=False,
_sub=True,
)
)
else:
cconfig_bag = config_bag.copy()
cconfig_bag.context = child
if cconfig_bag.properties is None:
settings = child.get_settings()
properties = settings.get_context_properties()
permissives = settings.get_context_permissives()
cconfig_bag.properties = properties
cconfig_bag.permissives = permissives
root_option_bag = OptionBag(
child.get_description(),
None,
cconfig_bag,
)
try:
next(
child.find(
root_option_bag,
None,
byname,
byvalue,
raise_if_not_found=False,
only_path=bypath,
only_option=byoption,
)
)
ret.append(child)
except StopIteration:
pass
if not _sub:
self._find_return_results(
ret != [], # pylint: disable=use-implicit-booleaness-not-comparison
raise_if_not_found,
)
return ret
def reset(
self,
path: str,
only_children: bool,
config_bag: ConfigBag,
) -> None:
"""reset value for specified path"""
for child in self._impl_children:
settings = child.get_settings()
cconfig_bag = config_bag.copy()
cconfig_bag.context = child
settings = child.get_settings()
properties = settings.get_context_properties()
permissives = settings.get_context_permissives()
cconfig_bag.properties = properties
cconfig_bag.permissives = permissives
cconfig_bag.remove_validation()
# GROUP
subconfig = child.get_sub_config(
cconfig_bag,
path,
None,
validate_properties=False,
)
child.get_values().reset(subconfig)
def getconfig(
self,
name: str,
) -> KernelConfig:
"""get a child from a config name"""
for child in self._impl_children:
if name == child.impl_getname():
return child
raise ConfigError(_('unknown config "{}"').format(name))
class KernelMixConfig(KernelGroupConfig):
"""Kernel mixconfig: this config can have differents optiondescription tree"""
# pylint: disable=too-many-instance-attributes
__slots__ = (
"_impl_symlink",
"_storage",
)
impl_type = "mix"
def __init__(
self,
optiondescription,
children,
name=None,
display_name=None,
_duplicate=False,
):
self._impl_name = name
self._impl_symlink = []
for child in children:
if not isinstance(child, (KernelConfig, KernelMixConfig)):
raise TypeError(_("child must be a Config, MixConfig or MetaConfig"))
child.parents.append(weakref.ref(self))
self._impl_settings = Settings()
self._impl_settings._properties = deepcopy(self._impl_settings._properties)
self._impl_settings._permissives = deepcopy(self._impl_settings._permissives)
self._impl_permissives_cache = Cache()
self.properties_cache = Cache()
self._impl_values = Values()
self._impl_values._values = deepcopy(self._impl_values._values)
self._impl_values_cache = Cache()
self._display_name = display_name
self._impl_build_all_caches(optiondescription)
super().__init__(
children,
_descr=optiondescription,
display_name=display_name,
)
def set_value(
self,
subconfig,
value,
only_config=False,
force_default=False,
force_dont_change_value=False,
force_default_if_same=False,
):
"""only_config: could be set if you want modify value in all Config included in
this KernelMetaConfig
"""
# pylint: disable=too-many-branches,too-many-nested-blocks,too-many-locals,too-many-arguments
ret = []
if only_config:
if force_default or force_default_if_same or force_dont_change_value:
raise ValueError(
_(
"force_default, force_default_if_same or "
"force_dont_change_value cannot be set with"
" only_config"
)
)
else:
if force_default or force_default_if_same or force_dont_change_value:
if force_default and force_dont_change_value:
raise ValueError(
_(
"force_default and force_dont_change_value"
" cannot be set together"
)
)
for child in self._impl_children:
cconfig_bag = subconfig.config_bag.copy()
cconfig_bag.context = child
settings = child.get_settings()
properties = settings.get_context_properties()
cconfig_bag.properties = properties
cconfig_bag.permissives = settings.get_context_permissives()
try:
if self.impl_type == "meta":
obj = self
else:
obj = child
validate_properties = (
not force_default and not force_default_if_same
)
# MIX
moption_bag = obj.get_sub_config(
cconfig_bag,
subconfig.path,
subconfig.index,
validate_properties=validate_properties,
)
if force_default_if_same:
if not child.get_values().hasvalue(
subconfig.path, index=subconfig.index
):
child_value = undefined
else:
child_value = child.get_value(moption_bag)
if force_default or (
force_default_if_same and value == child_value
):
child.get_values().reset(moption_bag)
continue
if force_dont_change_value:
child_value = child.get_value(moption_bag)
if value != child_value:
child.set_value(
moption_bag,
child_value,
)
except PropertiesOptionError as err:
# pylint: disable=protected-access
ret.append(
PropertiesOptionError(
err._subconfig,
err.proptype,
err._settings,
err._opt_type,
err._name,
err._orig_opt,
)
)
except (ValueError, LeadershipError, AttributeError) as err:
ret.append(err)
try:
# MIX
moption_bag = self.get_sub_config(
subconfig.config_bag,
subconfig.path,
subconfig.index,
validate_properties=not only_config,
)
if only_config:
ret = super().set_value(
moption_bag,
value,
only_config=only_config,
)
else:
_CommonConfig.set_value(
self,
moption_bag,
value,
)
except (PropertiesOptionError, ValueError, LeadershipError) as err:
ret.append(err)
return ret
def reset(
self,
path: str,
only_children: bool,
config_bag: ConfigBag,
) -> None:
"""reset value for a specified path"""
# pylint: disable=arguments-differ
rconfig_bag = config_bag.copy()
rconfig_bag.remove_validation()
if self.impl_type == "meta":
# MIX
subconfig = self.get_sub_config(
config_bag,
path,
None,
validate_properties=True,
)
elif not only_children:
try:
# MIX
subconfig = self.get_sub_config(
rconfig_bag,
path,
None,
validate_properties=True,
)
except AttributeError:
only_children = True
for child in self._impl_children:
rconfig_bag.context = child
try:
if self.impl_type == "meta":
moption_bag = subconfig
moption_bag.config_bag = rconfig_bag
else:
# MIX
moption_bag = child.get_sub_config(
rconfig_bag,
path,
None,
validate_properties=True,
)
child.get_values().reset(moption_bag)
except AttributeError:
pass
if isinstance(child, KernelMixConfig):
child.reset(
path,
False,
rconfig_bag,
)
if not only_children:
subconfig.config_bag = config_bag
self.get_values().reset(subconfig)
def new_config(
self,
name=None,
type_="config",
):
"""Create a new config/metaconfig/mixconfig and add it to this MixConfig"""
if name:
for child in self._impl_children:
if child.impl_getname() == name:
raise ConflictError(
_("config name must be uniq in " "groupconfig for {0}").format(
child
)
)
assert type_ in ("config", "metaconfig", "mixconfig"), _(
"unknown type {}"
).format(type_)
if type_ == "config":
config = KernelConfig(self._impl_descr, name=name)
elif type_ == "metaconfig":
config = KernelMetaConfig(
[],
optiondescription=self._impl_descr,
name=name,
)
elif type_ == "mixconfig":
config = KernelMixConfig(
children=[],
optiondescription=self._impl_descr,
name=name,
)
# Copy context properties/permissives
settings = config.get_settings()
properties = settings.get_context_properties()
settings.set_context_properties(
properties,
config,
)
settings.set_context_permissives(settings.get_context_permissives())
settings.ro_append = settings.ro_append
settings.rw_append = settings.rw_append
settings.ro_remove = settings.ro_remove
settings.rw_remove = settings.rw_remove
# settings.default_properties = settings.default_properties
config.parents.append(weakref.ref(self))
self._impl_children.append(config)
return config
def add_config(
self,
config,
):
"""Add a child config to a mix config"""
if not config.impl_getname():
raise ConfigError(_("config added has no name, the name is mandatory"))
if config.impl_getname() in [
child.impl_getname() for child in self._impl_children
]:
raise ConflictError(
_('config name "{0}" is not uniq in ' 'groupconfig "{1}"').format(
config.impl_getname(), self.impl_getname()
),
)
config.parents.append(weakref.ref(self))
self._impl_children.append(config)
config.reset_cache(None, None)
def remove_config(
self,
name,
):
"""Remove a child config to a mix config by it's name"""
for current_index, child in enumerate(self._impl_children):
if name == child.impl_getname():
child.reset_cache(None, None)
break
else:
raise ConfigError(_("cannot find the config {0}").format(name))
for child_index, parent in enumerate(child.parents):
if parent() == self:
break
else: # pragma: no cover
raise ConfigError(
_("cannot find the config {0}").format(self.impl_getname())
)
self._impl_children.pop(current_index)
child.parents.pop(child_index)
return child
class KernelMetaConfig(KernelMixConfig):
"""Meta config"""
__slots__ = tuple()
impl_type = "meta"
def __init__(
self,
children,
optiondescription=None,
name=None,
display_name=None,
_duplicate=False,
):
descr = None
if optiondescription is not None:
if not _duplicate:
new_children = []
for child_name in children:
assert isinstance(child_name, str), _(
"MetaConfig with optiondescription"
" must have string has child, "
"not {}"
).format(child_name)
new_children.append(
KernelConfig(optiondescription, name=child_name)
)
children = new_children
descr = optiondescription
for child in children:
if __debug__ and not isinstance(child, (KernelConfig, KernelMetaConfig)):
raise TypeError(_("child must be a Config or MetaConfig"))
if descr is None:
descr = child.get_description()
elif descr is not child.get_description():
raise ValueError(
_(
"all config in metaconfig must "
"have the same optiondescription"
)
)
super().__init__(
descr,
children,
name=name,
display_name=display_name,
)
def add_config(
self,
config,
):
if self._impl_descr is not config.get_description():
raise ValueError(_("metaconfig must " "have the same optiondescription"))
super().add_config(config)