tiramisu/tiramisu/api.py

2353 lines
79 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# Copyright (C) 2017-2026 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# ____________________________________________________________
from inspect import getdoc
from typing import List, Set, Any, Optional, Callable, Dict
from warnings import catch_warnings, simplefilter
from functools import wraps
from copy import deepcopy
from .error import (
ConfigError,
LeadershipError,
ValueErrorWarning,
PropertiesOptionError,
AttributeOptionError,
)
from .i18n import _
from .setting import (
ConfigBag,
owners,
groups,
undefined,
FORBIDDEN_SET_PROPERTIES,
SPECIAL_PROPERTIES,
DEFAULT_PROPERTIES,
)
from .config import (
KernelConfig,
KernelGroupConfig,
KernelMetaConfig,
KernelMixConfig,
SubConfig,
)
from .option import RegexpOption, OptionDescription, ChoiceOption, Leadership
from .todict import TiramisuDict
from .autolib import Calculation
TIRAMISU_VERSION = 5
class Fake_SubConfig:
def __init__(self, config_bag, path, index):
self.config_bag = config_bag
self.path = path
self.index = index
class TiramisuHelp:
_tmpl_help = " {0}\t{1}"
def help(self, _display: bool = True) -> List[str]:
def display(doc=""):
if _display: # pragma: no cover
print(doc)
all_modules = dir(self.__class__)
modules = []
max_len = 0
force = False
for module_name in all_modules:
if module_name in ["forcepermissive", "unrestraint", "nowarnings"]:
force = True
max_len = max(max_len, len("forcepermissive"))
elif module_name != "help" and not module_name.startswith("_"):
modules.append(module_name)
max_len = max(max_len, len(module_name))
modules.sort()
display(_(getdoc(self)))
display()
if force:
display(_("Settings:"))
display(
self._tmpl_help.format(
"forcepermissive",
_("Access to option without verifying permissive " "properties"),
).expandtabs(max_len + 10)
)
display(
self._tmpl_help.format(
"unrestraint", _("Access to option without property restriction")
).expandtabs(max_len + 10)
)
display(
self._tmpl_help.format(
"nowarnings", _("Do not warnings during validation")
).expandtabs(max_len + 10)
)
display()
display(_("Commands:"))
for module_name in modules:
module = getattr(self, module_name)
doc = _(getdoc(module))
display(self._tmpl_help.format(module_name, doc).expandtabs(max_len + 10))
display()
def __dir__(self):
if "_registers" in super().__dir__():
return list(self._registers.keys())
return super().__dir__()
def __getattr__(self, subfunc):
raise ConfigError(
_("please specify a valid sub function ({0}.{1})").format(
self.__class__.__name__, subfunc
)
)
class CommonTiramisu(TiramisuHelp):
_validate_properties = True
def _set_subconfig(self) -> None:
if self._subconfig:
if id(self._subconfig.config_bag) != id(self._config_bag):
subconfig = self._subconfig
self._subconfig = subconfig.__class__(option=subconfig.option,
index=subconfig.index,
path=subconfig.path,
config_bag=self._config_bag,
parent=subconfig.parent,
identifiers=subconfig.identifiers,
identifier=None,
true_path=subconfig.true_path,
properties=subconfig.properties,
validate_properties=False,
check_dynamic_without_identifiers=False,
)
self._subconfig.is_self_dynamic_without_identifiers = subconfig.is_self_dynamic_without_identifiers
else:
self._subconfig._length = None
if not self._subconfig:
try:
self._subconfig = self._config_bag.context.get_sub_config(
self._config_bag,
self._path,
self._index,
validate_properties=self._validate_properties,
allow_dynoption=True,
)
except AssertionError as err:
raise ConfigError(str(err))
def option_type(types):
def wrapper(func):
@wraps(func)
def wrapped(*args, **kwargs):
self = args[0]
config_bag = self._config_bag
if self._config_bag.context.impl_type == "group" and "group" in types:
options_bag = [
OptionBag(
None,
None,
self._config_bag,
path=self._path,
)
]
kwargs["is_group"] = True
return func(self, options_bag, *args[1:], **kwargs)
self._set_subconfig()
if (
"allow_dynoption" not in types and not ("dynoption_or_uncalculated" in types and kwargs.get("uncalculated", False) is True)
) and self._subconfig.is_dynamic_without_identifiers:
raise AttributeOptionError(self._subconfig.path, "option-dynamic")
option = self._subconfig.option
error_type = None
if "dynamic" in types:
if not self._subconfig.is_dynamic:
error_type = "only available for dynamic option"
elif option.impl_is_optiondescription():
if "optiondescription" not in types:
if option.impl_is_leadership():
if "leadership" not in types:
error_type = "not available for a Leadership"
else:
error_type = "not available for an OptionDescription"
elif option.impl_is_symlinkoption():
if "symlink" not in types:
error_type = "this function is not available for a SymLinkOption"
elif "option" not in types:
if "choice" in types:
if not isinstance(option, ChoiceOption):
error_type = "only available for ChoiceOption"
elif option.impl_is_leader():
if "leader" not in types:
error_type = "not available for a Leader"
elif option.impl_is_follower():
if "follower" not in types:
error_type = "not available for a Follower"
else:
error_type = "not available for an Option"
if not error_type:
if (
not option.impl_is_optiondescription()
and not option.impl_is_symlinkoption()
and option.impl_is_follower()
):
# default is "without_index"
if (
self._index is not None
and "with_index" not in types
and "with_or_without_index" not in types
and "with_index_or_uncalculated" not in types
):
msg = _("please do not specify index ({0}.{1})").format(
self.__class__.__name__, func.__name__
)
raise ConfigError(msg)
if self._index is None and ("with_index" in types or ("with_index_or_uncalculated" in types and kwargs.get("uncalculated", False) is False)):
msg = _(
"please specify index with a follower option ({0}.{1})"
).format(self.__class__.__name__, func.__name__)
raise ConfigError(msg)
if "validate_properties" in types or (
self._validate_properties and "dont_validate_property" not in types
):
settings = self._config_bag.context.get_settings()
parent = self._subconfig.parent
if parent and parent.transitive_properties:
while parent:
if not parent.parent.transitive_properties:
settings.validate_properties(
parent,
need_help=True,
)
break
parent = parent.parent
settings.validate_properties(
self._subconfig,
need_help=True,
)
return func(self, *args[1:], **kwargs)
msg = _("please specify a valid sub function ({0}.{1}): {2}").format(
self.__class__.__name__, func.__name__, error_type
)
raise ConfigError(msg)
wrapped.func = func
return wrapped
return wrapper
class CommonTiramisuOption(CommonTiramisu):
_validate_properties = False
def __init__(
self,
path: str,
index: Optional[int],
config_bag: ConfigBag,
subconfig: Optional[SubConfig]=None,
) -> None:
self._path = path
self._index = index
self._config_bag = config_bag
self._subconfig = subconfig
self._set_subconfig()
class _TiramisuOptionWalk:
def _list(
self,
subconfig: SubConfig,
validate_properties: bool,
*,
uncalculated: bool = False,
with_index: bool = True,
):
options = []
for sub_subconfig in subconfig.get_children(
validate_properties,
uncalculated=uncalculated,
with_index=with_index,
check_dynamic_without_identifiers=False,
):
options.append(
TiramisuOption(
sub_subconfig.path,
sub_subconfig.index,
self._config_bag,
subconfig=sub_subconfig,
)
)
return options
class _TiramisuOptionOptionDescription:
"""Manage option"""
_validate_properties = False
@option_type(
[
"optiondescription",
"option",
"with_or_without_index",
"symlink",
"allow_dynoption",
]
)
def get(self):
"""Get Tiramisu option"""
return self._subconfig.option
@option_type(
[
"optiondescription",
"option",
"with_or_without_index",
"symlink",
"allow_dynoption",
]
)
def isoptiondescription(self):
"""Test if option is an optiondescription"""
return self._subconfig.option.impl_is_optiondescription()
@option_type(["optiondescription", "allow_dynoption"])
def isleadership(self):
"""Test if option is a leader or a follower"""
return self._subconfig.option.impl_is_leadership()
@option_type(
[
"optiondescription",
"option",
"with_or_without_index",
"symlink",
"allow_dynoption",
]
)
def description(
self,
with_quote: bool = False,
uncalculated: bool = False,
):
"""Get option description"""
if not uncalculated:
return self._subconfig.option.impl_get_display_name(
self._subconfig, with_quote=with_quote
)
return self._subconfig.option._get_information(
self._subconfig,
"doc",
None,
)
@option_type(
[
"optiondescription",
"option",
"symlink",
"with_or_without_index",
"allow_dynoption",
]
)
def name(
self,
*,
uncalculated: bool = False,
) -> str:
"""Get option name"""
if uncalculated:
return self._subconfig.option.impl_getname()
if self._subconfig.is_dynamic_without_identifiers:
raise AttributeOptionError(self._subconfig.path, "option-dynamic")
return self._subconfig.true_path.rsplit(".", 1)[-1]
@option_type(
[
"optiondescription",
"option",
"with_or_without_index",
"symlink",
"allow_dynoption",
]
)
def path(
self,
*,
uncalculated: bool = False,
) -> str:
"""Get option path"""
if uncalculated:
return self._subconfig.option.impl_getpath()
if self._subconfig.is_dynamic_without_identifiers:
raise AttributeOptionError(self._subconfig.path, "option-dynamic")
return self._subconfig.true_path
def parent(self):
parent = self._subconfig.parent
return TiramisuOption(
parent.path,
None,
self._config_bag,
subconfig=parent,
)
@option_type(["optiondescription", "option", "symlink", "with_or_without_index"])
def has_dependency(
self,
self_is_dep=True,
) -> bool:
"""Test if option has dependency"""
return self._subconfig.option.impl_has_dependency(self_is_dep)
@option_type(["optiondescription", "option", "symlink", "with_or_without_index"])
def dependencies(
self,
*,
uncalculated: bool = False,
):
"""Get dependencies from this option"""
options = []
context = self._config_bag.context
index = self._index
parent = self._subconfig.parent
parent_option = parent.option
for is_default, woption in self._subconfig.option.get_dependencies(
self._config_bag.context
):
option = woption()
if not uncalculated and option.issubdyn():
for subconfig in context.get_dynamic_from_dyn_option(
self._subconfig, option
):
if subconfig.properties is None:
subconfig.properties = undefined
if is_default and subconfig.config_bag.context.get_owner(subconfig) != owners.default:
continue
options.append(
TiramisuOption(
subconfig.path,
None,
self._config_bag,
)
)
elif not uncalculated and option.impl_is_dynoptiondescription():
for subconfig in context.get_dynamic_from_dyn_option(
self._subconfig, option
):
if is_default and subconfig.config_bag.context.get_owner(subconfig) != owners.default:
continue
options.append(
TiramisuOption(
subconfig.path,
None,
self._config_bag,
)
)
else:
if (
not option.impl_is_optiondescription()
and option.impl_is_follower()
and parent_option.impl_is_leadership()
and parent_option.in_same_leadership(option)
):
if index is not None:
current_indexes = [index]
else:
current_indexes = range(parent.get_length_leadership())
else:
current_indexes = [None]
for current_index in current_indexes:
t_option = TiramisuOption(
option.impl_getpath(),
current_index,
self._config_bag,
allow_dynoption=uncalculated,
)
t_option._set_subconfig()
subconfig = t_option._subconfig
if is_default and subconfig.config_bag.context.get_owner(subconfig) != owners.default:
continue
options.append(
t_option,
)
return options
@option_type(
[
"option",
"optiondescription",
"symlink",
"with_or_without_index",
"allow_dynoption",
]
)
def type(self, only_self=False, translation=False):
"""Get de option type"""
option = self._subconfig.option
if option.impl_is_optiondescription():
if translation:
type_ = _("optiondescription")
else:
type_ = "optiondescription"
elif only_self and option.impl_is_symlinkoption():
if translation:
type_ = _("symlink")
else:
type_ = "symlink"
else:
type_ = option.get_type(translation=translation)
return type_
@option_type(["option", "symlink", "with_or_without_index", "allow_dynoption"])
def extra(self, extra):
"""Get de option extra"""
return self._subconfig.option.impl_get_extra(extra)
@option_type(
[
"option",
"optiondescription",
"symlink",
"with_or_without_index",
"allow_dynoption",
]
)
def isdynamic(self, *, only_self: bool = False):
"""Test if option is a dynamic optiondescription"""
if not only_self:
return self._subconfig.is_dynamic
return (
self._subconfig.option.impl_is_optiondescription()
and self._subconfig.option.impl_is_dynoptiondescription()
)
@option_type(["option", "leadership", "allow_dynoption"])
def leader(self):
"""Get the leader option for a leadership or a follower option"""
option = self._subconfig.option
if isinstance(option, Leadership):
leadership = self._subconfig
else:
leadership = self._subconfig.parent
leader_subconfig = leadership.get_child(
leadership.option.get_leader(),
None,
False,
config_bag=self._config_bag,
)
return TiramisuOption(
leader_subconfig.path,
None,
self._config_bag,
subconfig=leader_subconfig,
)
@option_type(["leadership"])
def followers(self):
"""Get the followers option for a leadership"""
option = self._subconfig.option
if isinstance(option, Leadership):
leadership = self._subconfig
else:
leadership = self._subconfig.parent
ret = []
for follower in leadership.option.get_followers():
follower_subconfig = leadership.get_child(
follower,
None,
False,
config_bag=self._config_bag,
)
ret.append(
TiramisuOption(
follower_subconfig.path,
None,
self._config_bag,
subconfig=follower_subconfig,
)
)
return ret
def has_identifiers(self):
return self._subconfig.is_dynamic_without_identifiers
@option_type(["dynamic", "with_or_without_index", "allow_dynoption"])
def identifiers(
self,
*,
only_self: bool = False,
uncalculated: bool = False,
convert: bool = False,
):
"""Get identifiers for dynamic option"""
subconfig = self._subconfig
if only_self:
func = self._identifiers_only_self
elif not subconfig.is_dynamic_without_identifiers:
func = self._identifiers_all
else:
func = self._identifiers_all_no_identifiers
return func(subconfig, uncalculated, convert)
def _identifiers_all(self, subconfig, uncalculated, convert):
dynconfig = None
_subconfig = subconfig
while not dynconfig:
if _subconfig.option.impl_is_optiondescription() and _subconfig.option.impl_is_dynoptiondescription():
dynconfig = _subconfig
else:
_subconfig = _subconfig.parent
identifiers = []
for identifier in _subconfig.identifiers:
if convert:
identifier = dynconfig.option.convert_identifier_to_path(identifier)
identifiers.append(identifier)
return identifiers
def _identifiers_all_no_identifiers(self, subconfig, uncalculated, convert):
"""dyn{{ identifier }}.var{{ identifier }}.var
=> dyn["val1", "val2"]
=> var["val3", "val4"]
returns [["val1", "val3"], ["val1", "val4"], ["val2", "val3"], ["val2", "val4"]]
"""
identifiers = []
while True:
if subconfig.option.impl_is_optiondescription() and subconfig.option.impl_is_dynoptiondescription():
if not subconfig.is_self_dynamic_without_identifiers:
new_identifiers = [subconfig.identifiers[-1]]
else:
new_identifiers = subconfig.option.get_identifiers(subconfig.parent, uncalculated=uncalculated, convert=convert)
if isinstance(new_identifiers, Calculation):
if identifiers:
identifiers = [[new_identifiers] + old_identifiers for old_identifiers in identifiers]
else:
identifiers = [new_identifiers]
elif identifiers:
identifiers = [[identifier] + old_identifiers for identifier in new_identifiers for old_identifiers in identifiers]
else:
identifiers = [[identifier] for identifier in new_identifiers]
subconfig = subconfig.parent
if subconfig is None:
break
return identifiers
def _identifiers_only_self(self, subconfig, uncalculated, convert):
if (
not subconfig.option.impl_is_optiondescription()
or not subconfig.option.impl_is_dynoptiondescription()
):
raise ConfigError(
_(
"the option {0} is not a dynamic option, cannot get identifiers with only_self parameter to True"
).format(subconfig.path),
subconfig=subconfig,
)
dynconfig = None
_subconfig = subconfig
while not dynconfig:
if _subconfig.option.impl_is_optiondescription() and _subconfig.option.impl_is_dynoptiondescription():
dynconfig = _subconfig
_subconfig = _subconfig.parent
return dynconfig.option.get_identifiers(
dynconfig.parent,
uncalculated=uncalculated,
convert=convert,
)
class _TiramisuOptionOption(_TiramisuOptionOptionDescription):
"""Manage option"""
@option_type(["option", "symlink", "with_or_without_index", "allow_dynoption"])
def ismulti(self):
"""Test if option could have multi value"""
return self._subconfig.option.impl_is_multi()
@option_type(["option", "symlink", "with_or_without_index", "allow_dynoption"])
def issubmulti(self):
"""Test if option could have submulti value"""
return self._subconfig.option.impl_is_submulti()
@option_type(["option", "with_or_without_index", "symlink", "allow_dynoption"])
def isleader(self):
"""Test if option is a leader"""
return self._subconfig.option.impl_is_leader()
@option_type(["option", "with_or_without_index", "symlink", "allow_dynoption"])
def isfollower(self):
"""Test if option is a follower"""
return self._subconfig.option.impl_is_follower()
@option_type(["option", "symlink", "with_or_without_index", "allow_dynoption"])
def issymlinkoption(self) -> bool:
"""Test if option is a symlink option"""
return self._subconfig.option.impl_is_symlinkoption()
@option_type(["option", "with_or_without_index"])
def pattern(self) -> str:
"""Get the option pattern"""
option = self._subconfig.option
type = option.get_type()
if isinstance(option, RegexpOption):
return option._regexp.pattern
if type == "integer":
# FIXME negative too!
return r"^[0-9]+$"
if type == "domain name":
return option.impl_get_extra("_domain_re").pattern
if type in ["ip", "network", "netmask"]:
# FIXME only from 0.0.0.0 to 255.255.255.255
return r"^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
@option_type(["option", "with_or_without_index", "symlink"])
def index(self, index=undefined):
"""Get index of option"""
if index is undefined:
return self._subconfig.index
subconfig = self._subconfig.parent.get_child(
self._subconfig.option,
index,
True,
config_bag=self._config_bag,
)
return TiramisuOption(self._path, index, self._config_bag, subconfig=subconfig)
@option_type(["symlink", "optiondescription", "allow_dynoption"])
def option(self, *args, **kwargs):
"""For OptionDescription get sub option, for symlinkoption get the linked option"""
if self._subconfig.option.impl_is_optiondescription():
return self._option_description(*args, **kwargs)
return self._option_symlink(*args, **kwargs)
def _option_description(
self,
path,
index=None,
):
sub_path = self._path + "." + path
return TiramisuOption(
sub_path,
index,
self._config_bag,
)
def _option_symlink(self):
subconfig = self._subconfig.config_bag.context._get(
self._subconfig,
need_help=True,
validate_properties=self._validate_properties,
)
if isinstance(subconfig, list):
raise ConfigError(
_("cannot get option from a follower symlink without index")
)
subconfig.true_path = subconfig.path
return TiramisuOption(
subconfig.path,
subconfig.index,
self._config_bag,
subconfig=subconfig,
)
class TiramisuOptionOwner(CommonTiramisuOption):
"""Manage option's owner"""
_validate_properties = True
@option_type(["symlink", "option", "with_index"])
def get(self, only_self=False):
"""Get owner for a specified option"""
return self._config_bag.context.get_owner(
self._subconfig, validate_meta=not only_self
)
@option_type(["symlink", "option", "with_index"])
def default(self):
values = self._config_bag.context.get_values()
return values.get_default_owner(self._subconfig)
@option_type(["symlink", "option", "with_index"])
def isdefault(self):
"""Is option has defaut value"""
subconfig = self._subconfig
s_properties = subconfig.properties
if (
"frozen" in s_properties
and "force_default_on_freeze" in s_properties
):
return True
context = self._config_bag.context
subconfig = context._get(
subconfig,
True,
)
return not context.get_values().hasvalue(subconfig.path, index=subconfig.index)
@option_type(["option", "with_index"])
def set(
self,
owner: str,
) -> None:
"""Get owner for a specified option"""
try:
obj_owner = getattr(owners, owner)
except AttributeError:
owners.addowner(owner)
obj_owner = getattr(owners, owner)
self._config_bag.context.get_values().set_owner(
self._subconfig,
obj_owner,
)
class TiramisuOptionProperty(CommonTiramisuOption):
"""Manage option's property"""
_validate_properties = False
@option_type(
["option", "optiondescription", "with_index_or_uncalculated", "symlink", "allow_dynoption"]
)
def get(
self,
*,
only_raises: bool = False,
apply_requires: bool = True,
uncalculated: bool = False,
):
"""Get properties for an option"""
if self._subconfig.is_dynamic_without_identifiers and not uncalculated:
raise AttributeOptionError(self._subconfig.path, "option-dynamic")
settings = self._config_bag.context.get_settings()
if not only_raises:
return settings.getproperties(
self._subconfig,
uncalculated=uncalculated,
apply_requires=apply_requires,
)
return settings.calc_raises_properties(
self._subconfig,
uncalculated=uncalculated,
apply_requires=apply_requires,
not_unrestraint=True,
)
@option_type(["option", "optiondescription", "with_or_without_index"])
def add(self, prop):
"""Add new property for an option"""
if prop in FORBIDDEN_SET_PROPERTIES:
raise ConfigError(
_('cannot add this property: "{0}"').format(" ".join(prop))
)
settings = self._config_bag.context.get_settings()
props = settings.get_personalize_properties(
self._path,
self._index,
)
settings.setproperties(
self._subconfig,
props | {prop},
)
@option_type(["option", "optiondescription", "with_or_without_index"])
def remove(
self,
prop,
):
"""Remove new property for an option"""
settings = self._config_bag.context.get_settings()
props = settings.get_personalize_properties(
self._path,
self._index,
)
if prop not in props:
if self._index is None:
if prop in settings.getproperties(self._subconfig):
msg = _(
'cannot remove option\'s property "{0}", use permissive instead in option "{1}"'
).format(prop, self._path)
else:
msg = _('cannot find "{0}" in option "{1}"').format(
prop, self._path
)
else:
if prop in settings.getproperties(self._subconfig):
msg = _(
'cannot remove option\'s property "{0}", use permissive instead in option "{1}" at index "{2}"'
).format(prop, self._path, self._index)
else:
msg = _('cannot find "{0}" in option "{1}" at index "{2}"').format(
prop, self._path, self._index
)
raise ConfigError(msg)
settings.setproperties(
self._subconfig,
props - {prop},
)
@option_type(["option", "optiondescription", "with_or_without_index"])
def reset(self):
"""Reset all personalised properties"""
self._config_bag.context.get_settings().reset(self._subconfig)
class TiramisuOptionPermissive(CommonTiramisuOption):
"""Manage option's permissive"""
_validate_properties = False
@option_type(["option", "optiondescription", "symlink", "with_or_without_index"])
def get(self):
"""Get permissives value"""
return self._config_bag.context.get_settings().getpermissives(self._subconfig)
@option_type(["option", "optiondescription", "with_or_without_index"])
def add(
self,
permissive,
):
"""Set permissives value"""
permissives = self._config_bag.context.get_settings().getpermissives(
self._subconfig
)
self._config_bag.context.get_settings().setpermissives(
self._subconfig,
frozenset(permissives | {permissive}),
)
@option_type(["option", "optiondescription", "with_or_without_index"])
def remove(self, permissive):
"""Remove a config property"""
permissives = set(self.get())
if permissive not in permissives:
msg = _('cannot find "{0}"').format(permissive)
raise ConfigError(msg)
self._config_bag.context.get_settings().setpermissives(
self._subconfig,
frozenset(permissives - {permissive}),
)
@option_type(["option", "optiondescription", "with_or_without_index"])
def reset(self):
"""Reset all personalised permissive"""
self._config_bag.context.get_settings().reset_permissives(self._subconfig)
class TiramisuOptionInformation(CommonTiramisuOption):
"""Manage option's informations"""
_validate_properties = False
@option_type(
[
"option",
"optiondescription",
"with_or_without_index",
"symlink",
"allow_dynoption",
]
)
def get(
self,
name: str,
default=undefined,
) -> Any:
"""Get information"""
return self._config_bag.context.get_values().get_information(
self._subconfig,
name,
default,
)
@option_type(["option", "optiondescription", "allow_dynoption"])
def set(self, key: str, value: Any) -> None:
"""Set information"""
self._config_bag.context.get_values().set_information(
self._subconfig,
key,
value,
)
@option_type(["option", "optiondescription", "allow_dynoption"])
def remove(
self,
key: str,
) -> None:
"""Remove information"""
self._config_bag.context.get_values().del_information(
key,
path=self._path,
)
@option_type(
[
"option",
"optiondescription",
"with_or_without_index",
"symlink",
"allow_dynoption",
]
)
def list(self) -> list:
"""List information's keys"""
lst1 = set(self._subconfig.option._list_information())
lst2 = set(self._config_bag.context.get_values().list_information(self._path))
return lst1 | lst2
class _TiramisuODGet:
def _od_get(
self,
root_subconfig: SubConfig,
) -> dict:
"""exports the whole config into a `dict`
:returns: dict of Option's name (or path) and values
"""
def parse_od_get(values):
ret_ = {}
for subconfig, value in values.items():
option = TiramisuOption(
subconfig.path,
subconfig.index,
self._config_bag,
subconfig=subconfig,
)
if option.isoptiondescription():
value = parse_od_get(value)
ret_[option] = value
return ret_
return parse_od_get(self._config_bag.context.walk(root_subconfig))
class TiramisuOptionValue(CommonTiramisuOption, _TiramisuODGet):
"""Manage option's value"""
_validate_properties = True
@option_type(["option", "symlink", "with_index", "optiondescription", "dynoption_or_uncalculated"])
def get(
self,
*,
uncalculated: bool = False,
):
"""Get value for an option or option and sub option with values with optiondescription"""
if self._subconfig.option.impl_is_optiondescription():
if uncalculated:
raise ConfigError("uncalculated is not allowed for optiondescription")
return self._od_get(self._subconfig)
if uncalculated:
value = self._subconfig.option.impl_getdefault()
index = self._subconfig.index
if not isinstance(value, list) or index is None:
return value
if index >= len(value):
return self._subconfig.option.impl_getdefault_multi()
return value[index]
return self._get(uncalculated)
def _get(
self,
need_help: bool = True,
):
"""Get option's value"""
return self._config_bag.context.get_value(self._subconfig, need_help)
@option_type(["option", "with_index"])
def set(
self,
value,
):
"""Change option's value"""
option = self._subconfig.option
if (
not isinstance(value, Calculation)
and isinstance(value, list)
and option.impl_is_leader()
and len(value) < self._subconfig.parent.get_length_leadership()
):
raise LeadershipError(self._subconfig, "leadership-reduce")
values = self._config_bag.context.get_values()
return values.set_value(self._subconfig, value)
@option_type(["group", "option", "with_index"])
def reset(
self,
is_group: bool = False,
) -> None:
"""Reset value for an option"""
if is_group:
self._config_bag.context.reset(
self._subconfig.path,
self._config_bag,
)
else:
values = self._config_bag.context.get_values()
if self._subconfig.index is not None:
values.reset_follower(self._subconfig)
else:
values.reset(self._subconfig)
@option_type(
["option", "with_or_without_index", "symlink", "dont_validate_property", "dynoption_or_uncalculated"]
)
def default(
self,
uncalculated: bool = False,
) -> Any:
"""Get default value (default of option or calculated value)"""
if uncalculated:
return self._subconfig.option.impl_getdefault()
if self._subconfig.option.impl_is_follower() and self._subconfig.index is None:
msg = _("please specify index with a follower option ({0})").format(
self._subconfig.path
)
raise ConfigError(msg)
return self._config_bag.context.get_values().get_default_value(self._subconfig)
@option_type(
["option", "with_or_without_index", "symlink", "dont_validate_property"]
)
def defaultmulti(self):
"""Get default value when added a value for a multi option (not for optiondescription)"""
if not self._subconfig.option.impl_is_multi():
raise ConfigError(_("only multi value has defaultmulti"))
return self._subconfig.option.impl_getdefault_multi()
@option_type(["option", "with_index"])
def valid(self):
"""The if the option's value is valid"""
try:
with catch_warnings(record=True) as warns:
simplefilter("always", ValueErrorWarning)
self._get(self._subconfig)
for warn in warns:
if isinstance(warn.message, ValueErrorWarning):
return False
except ValueError:
return False
return True
@option_type(["choice", "with_index_or_uncalculated", "allow_dynoption"])
def list(
self,
*,
uncalculated: bool = False,
):
"""All values available for a ChoiceOption"""
if self._subconfig.is_dynamic_without_identifiers and not uncalculated:
raise AttributeOptionError(self._subconfig.path, "option-dynamic")
return self._subconfig.option.impl_get_values(
self._subconfig,
uncalculated,
)
@option_type(["leader"])
def pop(
self,
index: int,
):
"""Pop a value"""
self._config_bag.context.get_values().reset_leadership(
self._subconfig,
index,
)
@option_type(["leader", "follower", "with_or_without_index"])
def len(self):
"""Length for a leadership"""
return self._subconfig.parent.get_length_leadership()
def mandatory(self, *, return_type=False):
"""Return path of options with mandatory property without any value"""
subconfig = self._subconfig
ori_config_bag = self._subconfig.config_bag
config_bag = ori_config_bag.copy()
config_bag.properties -= {"mandatory", "empty", "warnings"}
config_bag.set_permissive()
self._subconfig.config_bag = config_bag
try:
if subconfig.option.impl_is_optiondescription():
if return_type:
raise ConfigError(
_("return_type is not valid for a optiondescription")
)
options = []
for subconfig in config_bag.context.walk(
self._subconfig,
only_mandatory=True,
):
options.append(
TiramisuOption(
subconfig.path,
subconfig.index,
ori_config_bag,
subconfig=subconfig,
)
)
self._subconfig.config_bag = ori_config_bag
return options
try:
self._config_bag.context.walk_valid_value(
self._subconfig, only_mandatory=True
)
except PropertiesOptionError as err:
if return_type:
return err.proptype[0]
return err.proptype == ["mandatory"] or err.proptype == ["empty"]
self._subconfig.config_bag = ori_config_bag
return False
except Exception as err:
self._subconfig.config_bag = ori_config_bag
raise err from err
def _registers(
_registers: Dict[str, type],
prefix: str,
):
for module_name in globals().keys():
if module_name != prefix and module_name.startswith(prefix):
module = globals()[module_name]
func_name = module_name[len(prefix) :].lower()
_registers[func_name] = module
# __________________________________________________________________________________________________
#
class TiramisuConfig(TiramisuHelp, _TiramisuOptionWalk):
def __init__(
self,
config_bag: ConfigBag,
orig_config_bags: Optional[List["OptionBag"]],
) -> None:
self._config_bag = config_bag
self._orig_config_bags = orig_config_bags
def _return_config(self, config):
if isinstance(config, KernelConfig):
return Config(config)
if isinstance(config, KernelMetaConfig):
return MetaConfig(config)
if isinstance(config, KernelMixConfig):
return MixConfig([], config)
if isinstance(config, KernelGroupConfig):
return GroupConfig(config)
def type(self):
"""get the type"""
config = self._config_bag.context
if isinstance(config, KernelConfig):
return "config"
if isinstance(config, KernelMetaConfig):
return "metaconfig"
if isinstance(config, KernelMixConfig):
return "mixconfig"
if isinstance(config, KernelGroupConfig):
return "groupconfig"
def name(self):
"""get the name"""
return self._config_bag.context.impl_getname()
class TiramisuOption(
CommonTiramisu,
_TiramisuOptionOption,
TiramisuConfig,
):
"""Manage selected option"""
_validate_properties = False
_registers = {}
def __init__(
self,
path: Optional[str] = None,
index: Optional[int] = None,
config_bag: Optional[ConfigBag] = None,
*,
allow_dynoption: bool = False,
subconfig: Optional[SubConfig] = None,
) -> None:
self._path = path
self._index = index
self._config_bag = config_bag
self._subconfig = subconfig
if not self._registers:
_registers(self._registers, "TiramisuOption")
def __repr__(self):
msg = f'<TiramisuOption path="{self._path}"'
if self._index is not None:
msg += f", index={self._index}"
msg += ">"
return msg
def __getattr__(self, subfunc: str) -> Any:
if subfunc in self._registers:
return self._registers[subfunc](
self._path,
self._index,
self._config_bag,
self._subconfig,
)
if subfunc in ["forcepermissive", "unrestraint", "nowarnings"]:
if subfunc == "unrestraint" and self._config_bag.is_unrestraint:
return self
#if self._orig_config_bags:
# msg = _(
# "do not use unrestraint, nowarnings or forcepermissive together"
# )
# raise ConfigError(msg)
config_bag = self._config_bag.copy()
if subfunc == "unrestraint":
config_bag.unrestraint()
elif subfunc == "nowarnings":
config_bag.nowarnings()
else:
config_bag.set_permissive()
return self.__class__(self._path, self._index, config_bag)
raise ConfigError(
_("please specify a valid sub function ({0}.{1}) for {2}").format(
self.__class__.__name__, subfunc, self._path
)
)
#
def __iter__(self):
self._set_subconfig()
for sub_subconfig in self._subconfig.get_children(True):
yield TiramisuOption(
sub_subconfig.path,
sub_subconfig.index,
self._config_bag,
subconfig=sub_subconfig,
)
@option_type(["optiondescription", "allow_dynoption"])
def group_type(self):
"""Get type for an optiondescription (only for optiondescription)"""
self._set_subconfig()
return self._subconfig.option.impl_get_group_type()
@option_type(["optiondescription", "validate_properties", "allow_dynoption"])
def list(
self,
*,
validate_properties: bool = True,
uncalculated: bool = False,
with_index: bool = True,
):
"""List options inside an option description (by default list only option)"""
self._set_subconfig()
if self._subconfig.is_dynamic_without_identifiers and not uncalculated:
raise AttributeOptionError(self._subconfig.path, "option-dynamic")
return self._list(
self._subconfig,
validate_properties,
uncalculated=uncalculated,
with_index=with_index,
)
def _load_dict(
self,
clearable: str = "all",
remotable: str = "minimum",
):
config = self._config_bag.context
self._tiramisu_dict = TiramisuDict(
self._return_config(config),
root=self._path,
clearable=clearable,
remotable=remotable,
)
@option_type(["optiondescription"])
def dict(
self,
clearable: str = "all",
remotable: str = "minimum",
form: List = [],
force: bool = False,
) -> Dict:
"""Convert config and option to tiramisu format"""
if force or self._tiramisu_dict is None:
self._load_dict(clearable, remotable)
return self._tiramisu_dict.todict(form)
@option_type(["optiondescription"])
def updates(
self,
body: List,
) -> Dict:
"""Updates value with tiramisu format"""
if self._tiramisu_dict is None: # pragma: no cover
self._load_dict()
return self._tiramisu_dict.set_updates(body)
#
# def forcepermissive(self):
# self._set_subconfig()
# self._subconfig.config_bag.set_permissive()
class TiramisuContextInformation(TiramisuConfig):
"""Manage config informations"""
def get(
self,
name,
default=undefined,
):
"""Get an information"""
context = self._config_bag.context
values = context.get_values()
subconfig = context.get_root(self._config_bag)
return values.get_information(
subconfig,
name,
default,
)
def set(
self,
name,
value,
):
"""Set an information"""
self._config_bag.context.impl_set_information(
self._config_bag,
name,
value,
)
def remove(
self,
name,
):
"""Remove an information"""
self._config_bag.context.impl_del_information(name)
def list(self):
"""List information's keys"""
lst1 = set(self._config_bag.context.get_description()._list_information())
lst2 = set(self._config_bag.context.impl_list_information())
return lst1 | lst2
def exportation(self):
"""Export all informations"""
return deepcopy(self._config_bag.context.get_values()._informations)
def importation(self, informations):
"""Import informations"""
self._config_bag.context.get_values()._informations = deepcopy(informations)
class TiramisuContextValue(TiramisuConfig, _TiramisuODGet):
"""Manage config value"""
def mandatory(self):
"""Return path of options with mandatory property without any value"""
config_bag = self._config_bag.copy()
config_bag.properties -= {"mandatory", "empty", "warnings"}
config_bag.set_permissive()
root = self._config_bag.context.get_root(config_bag)
options = []
for subconfig in self._config_bag.context.walk(
root,
only_mandatory=True,
):
if id(subconfig.config_bag) != id(config_bag):
old_is_dynamic_without_identifiers = subconfig.is_self_dynamic_without_identifiers
subconfig = subconfig.__class__(option=subconfig.option,
index=subconfig.index,
path=subconfig.path,
config_bag=config_bag,
parent=subconfig.parent,
identifiers=subconfig.identifiers,
identifier=None,
true_path=subconfig.true_path,
properties=subconfig.properties,
validate_properties=False,
check_dynamic_without_identifiers=False,
)
subconfig.is_self_dynamic_without_identifiers = old_is_dynamic_without_identifiers
else:
subconfig._length = None
options.append(
TiramisuOption(
subconfig.path,
subconfig.index,
self._config_bag,
subconfig=subconfig,
)
)
return options
# FIXME should be only for group/meta
def set(
self,
path: str,
value: Any,
*,
index: Optional[int] = None,
only_config=undefined,
force_default=undefined,
force_default_if_same=undefined,
force_dont_change_value=undefined,
):
"""Set a value in config or children for a path"""
kwargs = {}
if only_config is not undefined:
kwargs["only_config"] = only_config
if force_default is not undefined:
kwargs["force_default"] = force_default
if force_default_if_same is not undefined:
kwargs["force_default_if_same"] = force_default_if_same
if force_dont_change_value is not undefined:
kwargs["force_dont_change_value"] = force_dont_change_value
context = self._config_bag.context
if isinstance(context, KernelGroupConfig):
subconfig = Fake_SubConfig(
self._config_bag,
path,
index,
)
else:
subconfig = context.get_sub_config(
self._config_bag,
path,
index,
validate_properties=False,
)
return self._config_bag.context.set_value(
subconfig,
value,
**kwargs,
)
# FIXME should be only for group/meta
def reset(self, path: str, only_children: bool = False):
"""Reset value"""
self._config_bag.context.reset(
path,
only_children,
self._config_bag,
)
def get(self):
"""Get option and sub option with values"""
root = self._config_bag.context.get_root(self._config_bag)
return self._od_get(root)
def exportation(
self,
with_default_owner: bool = False,
):
"""Export all values"""
self._force_store_value()
exportation = deepcopy(self._config_bag.context.get_values()._values)
if not with_default_owner:
del exportation[None]
return exportation
def importation(self, values):
"""Import values"""
cvalues = self._config_bag.context.get_values()
if None not in values:
current_owner = cvalues.get_context_owner()
cvalues._values = deepcopy(values)
self._config_bag.context.reset_cache(None, None)
if None not in values:
cvalues._values[None] = {None: [None, current_owner]}
def _force_store_value(self):
descr = self._config_bag.context.get_description()
descr.impl_build_force_store_values(self._config_bag)
class TiramisuContextOwner(TiramisuConfig):
"""Global owner"""
def get(self):
"""Get owner"""
return self._config_bag.context.get_values().get_context_owner()
def set(self, owner):
"""Set owner"""
try:
obj_owner = getattr(owners, owner)
except AttributeError:
owners.addowner(owner)
obj_owner = getattr(owners, owner)
values = self._config_bag.context.get_values()
values.set_context_owner(obj_owner)
class PropertyPermissive:
def _set_default_permissive(
self,
settings,
):
or_properties = settings.rw_append - settings.ro_append - SPECIAL_PROPERTIES
permissives = frozenset(settings.get_context_permissives() | or_properties)
settings.set_context_permissives(permissives)
def _reset_config_properties(
self,
settings,
):
properties = settings.get_context_properties()
permissives = settings.get_context_permissives()
self._config_bag.properties = properties
self._config_bag.permissives = permissives
if self._orig_config_bags:
for config_bag in self._orig_config_bags:
config_bag.properties = properties
config_bag.permissives = permissives
class TiramisuContextProperty(TiramisuConfig, PropertyPermissive):
"""Manage config properties"""
def read_only(self):
"""Set config to read only mode"""
if self._config_bag.is_unrestraint:
raise ConfigError("cannot change context property in unrestraint mode")
old_props = self._config_bag.properties
settings = self._config_bag.context.get_settings()
settings.read_only(self._config_bag)
self._set_default_permissive(settings)
self._reset_config_properties(settings)
if (
"force_store_value" not in old_props
and "force_store_value" in self._config_bag.properties
):
self._force_store_value()
def read_write(self):
"""Set config to read and write mode"""
if self._config_bag.is_unrestraint:
raise ConfigError("cannot change context property in unrestraint mode")
old_props = self._config_bag.properties
settings = self._config_bag.context.get_settings()
settings.read_write(self._config_bag)
self._set_default_permissive(settings)
self._reset_config_properties(settings)
if (
"force_store_value" not in old_props
and "force_store_value" in self._config_bag.properties
):
self._force_store_value()
def add(self, prop):
"""Add a config property"""
props = set(self.get())
if prop not in props:
props.add(prop)
self._set(frozenset(props))
def remove(self, prop):
"""Remove a config property"""
props = set(self.get())
if prop not in props:
msg = f'cannot find "{prop}"'
raise ConfigError(msg)
props.remove(prop)
self._set(frozenset(props))
def get(
self,
*,
only_raises: bool = False,
apply_requires: bool = True,
uncalculated: bool = False,
) -> Set:
"""Get all config properties"""
if only_raises:
return set()
return self._config_bag.properties
def _set(
self,
props,
):
"""Personalise config properties"""
if self._config_bag.is_unrestraint:
raise ConfigError("cannot change context property in unrestraint mode")
if "force_store_value" in props:
force_store_value = "force_store_value" not in self._config_bag.properties
else:
force_store_value = False
settings = self._config_bag.context.get_settings()
settings.set_context_properties(
props,
self._config_bag.context,
)
self._reset_config_properties(settings)
if force_store_value:
self._force_store_value()
def reset(self):
"""Remove config properties"""
if self._config_bag.is_unrestraint:
raise ConfigError("cannot change context property in unrestraint mode")
settings = self._config_bag.context.get_settings()
settings.reset(self._config_bag)
self._reset_config_properties(settings)
def exportation(self):
"""Export config properties"""
settings = self._config_bag.context.get_settings()
return {
"properties": deepcopy(settings._properties),
"ro_append": settings.ro_append.copy(),
"ro_remove": settings.ro_remove.copy(),
"rw_append": settings.rw_append.copy(),
"rw_remove": settings.rw_remove.copy(),
}
def importation(self, data):
"""Import config properties"""
if self._config_bag.is_unrestraint:
raise ConfigError("cannot change context property in unrestraint mode")
properties = data["properties"]
if "force_store_value" in properties.get(None, {}).get(None, []):
force_store_value = "force_store_value" not in self._config_bag.properties
else:
force_store_value = False
context = self._config_bag.context
settings = context.get_settings()
settings._properties = deepcopy(properties)
settings.ro_append = data["ro_append"].copy()
settings.ro_remove = data["ro_remove"].copy()
settings.rw_append = data["rw_append"].copy()
settings.rw_remove = data["rw_remove"].copy()
context.reset_cache(None, None)
self._reset_config_properties(settings)
if force_store_value:
self._force_store_value()
def _force_store_value(self):
descr = self._config_bag.context.get_description()
descr.impl_build_force_store_values(self._config_bag)
def setdefault(
self, properties: Set[str], type: Optional[str], when: Optional[str] = None
) -> None:
if not isinstance(properties, frozenset):
raise TypeError(_("properties must be a frozenset"))
setting = self._config_bag.context.get_settings()
if when not in ["append", "remove"]:
raise ValueError(
_("unknown when {} (must be in append or remove)").format(when)
)
if type == "read_only":
if when == "append":
setting.ro_append = properties
else:
setting.ro_remove = properties
elif type == "read_write":
if when == "append":
setting.rw_append = properties
else:
setting.rw_remove = properties
else:
raise ValueError(_("unknown type {}").format(type))
def default(
self,
type: Optional[str] = None,
when: Optional[str] = None,
) -> Set[str]:
setting = self._config_bag.context.get_settings()
if type is None and when is None:
return DEFAULT_PROPERTIES
if type == "current":
return setting.get_context_properties()
if when not in ["append", "remove"]:
raise ValueError(
_("unknown when {} (must be in append or remove)").format(when)
)
if type == "read_only":
if when == "append":
return setting.ro_append
return setting.ro_remove
if type == "read_write":
if when == "append":
return setting.rw_append
return setting.rw_remove
raise ValueError(_("unknown type {}").format(type))
class TiramisuContextPermissive(TiramisuConfig, PropertyPermissive):
"""Manage config permissives"""
def get(self):
"""Get config permissives"""
return self._get()
def _get(self):
return self._config_bag.context.get_settings().get_context_permissives()
def _set(
self,
permissives,
):
"""Set config permissives"""
if self._config_bag.is_unrestraint:
raise ConfigError("cannot change context permissive in unrestraint mode")
settings = self._config_bag.context.get_settings()
settings.set_context_permissives(permissives)
self._reset_config_properties(settings)
def exportation(self):
"""Export config permissives"""
return deepcopy(self._config_bag.context.get_settings()._permissives)
def importation(self, permissives):
"""Import config permissives"""
if self._config_bag.is_unrestraint:
raise ConfigError("cannot change context permissive in unrestraint mode")
context = self._config_bag.context
settings = context.get_settings()
settings._permissives = deepcopy(permissives)
context.reset_cache(
None,
None,
)
self._reset_config_properties(settings)
def reset(self):
"""Remove config permissives"""
if self._config_bag.is_unrestraint:
raise ConfigError("cannot change context permissive in unrestraint mode")
settings = self._config_bag.context.get_settings()
settings.reset_permissives(self._config_bag)
self._set_default_permissive(settings)
self._reset_config_properties(settings)
def add(self, permissive):
"""Add a config permissive"""
permissives = set(self._get())
permissives.add(permissive)
self._set(frozenset(permissives))
def remove(self, permissive):
"""Remove a config permissive"""
permissives = set(self._get())
if permissive not in permissives:
msg = f'cannot find "{permissive}"'
raise ConfigError(msg)
permissives.remove(permissive)
self._set(frozenset(permissives))
class TiramisuContextOption(TiramisuConfig, _TiramisuOptionWalk):
def __init__(self) -> None:
self._tiramisu_dict = None
def __iter__(self):
root = self._config_bag.context.get_root(self._config_bag)
for sub_subconfig in root.get_children(True):
yield TiramisuOption(
sub_subconfig.path,
sub_subconfig.index,
self._config_bag,
subconfig=sub_subconfig,
)
def get(self):
"""Get Tiramisu option"""
return self._config_bag.context.get_description()
def isleadership(self):
"""Test if option is a leader or a follower"""
return False
def description(
self,
uncalculated: bool = False,
) -> str:
"""Get option description"""
if not uncalculated:
return self._config_bag.context.get_description().impl_get_display_name(
None
)
return self._config_bag.context.get_description()._get_information(
None,
"doc",
None,
)
def path(
self,
):
"""Get option path"""
return self._config_bag.context.get_config_path()
def has_dependency(
self,
self_is_dep=True,
) -> bool:
"""Test if option has dependency"""
return False
def isoptiondescription(self):
"""Test if option is an optiondescription"""
return True
def isdynamic(self):
"""Test if option is a dynamic optiondescription"""
return False
def list(
self,
*,
validate_properties: bool = True,
uncalculated: bool = False,
with_index: bool = True,
):
"""List options (by default list only option)"""
root = self._config_bag.context.get_root(self._config_bag)
return self._list(
root,
validate_properties,
uncalculated=uncalculated,
with_index=with_index,
)
def _load_dict(self, clearable="all", remotable="minimum"):
self._tiramisu_dict = TiramisuDict(
self._return_config(self._config_bag.context),
root=None,
clearable=clearable,
remotable=remotable,
)
def dict(
self,
clearable="all",
remotable="minimum",
form=None,
force=False,
):
"""Convert config and option to tiramisu format"""
if form is None:
form = []
if force or self._tiramisu_dict is None:
self._load_dict(clearable, remotable)
return self._tiramisu_dict.todict(form)
def updates(self, body: List) -> Dict:
"""Updates value with tiramisu format"""
if self._tiramisu_dict is None: # pragma: no cover
self._load_dict()
return self._tiramisu_dict.set_updates(body)
def __repr__(self):
return f'<{self.__class__.__name__} path="{self.path()}">'
class _TiramisuContextConfigReset:
def reset(self):
"""Remove all datas to current config (informations, values, properties, ...)"""
# Option's values
context_owner = self._config_bag.context.get_values().get_context_owner()
self._config_bag.context.get_values()._values = {
None: {None: [None, context_owner]}
}
# Option's informations
self._config_bag.context.get_values()._informations = {}
# Option's properties
self._config_bag.context.get_settings()._properties = {}
# Option's permissives
self._config_bag.context.get_settings()._permissives = {}
# Remove cache
self._config_bag.context.reset_cache(None, None)
class _TiramisuContextConfig(TiramisuConfig, _TiramisuContextConfigReset):
"""Actions to Config"""
def type(self):
"""Type a Config"""
return "config"
def copy(self, name=None):
"""Copy current config"""
config = self._config_bag.context.duplicate(name=name)
return self._return_config(config)
def deepcopy(self, metaconfig_prefix=None, name=None):
"""Copy current config with all parents"""
config = self._config_bag.context.duplicate(
metaconfig_prefix=metaconfig_prefix,
deep=[],
name=name,
)
return self._return_config(config)
def parents(self):
"""Get all parents of current config"""
ret = []
for parent in self._config_bag.context.get_parents():
ret.append(self._return_config(parent))
return ret
def path(self):
"""Get path from config (all parents name)"""
return self._config_bag.context.get_config_path()
class _TiramisuContextGroupConfig(TiramisuConfig):
"""Actions to GroupConfig"""
def type(self):
"""Type a Config"""
return "groupconfig"
def list(self):
"""List children's config"""
return [
self._return_config(child)
for child in self._config_bag.context.get_children()
]
#
# def find(self,
# name: str,
# value=undefined,
# ):
# """Find an or a list of config with finding option"""
# return GroupConfig(self._config_bag.context.find_group(byname=name,
# byvalue=value,
# config_bag=self._config_bag,
# ))
def __call__(self, path: Optional[str]):
"""Select a child Tiramisu config"""
spaths = path.split(".")
config = self._config_bag.context
for spath in spaths:
config = config.getconfig(spath)
if isinstance(config, KernelGroupConfig):
return self._return_config(config)
return self._return_config(config)
def copy(self, name=None):
config = self._config_bag.context.duplicate(name=name)
return self._return_config(config)
def deepcopy(self, name=None, metaconfig_prefix=None):
config = self._config_bag.context.duplicate(
metaconfig_prefix=metaconfig_prefix,
deep=[],
name=name,
)
return self._return_config(config)
def path(self):
return self._config_bag.context.get_config_path()
class _TiramisuContextMixConfig(
_TiramisuContextGroupConfig, _TiramisuContextConfigReset
):
"""Actions to MixConfig"""
def type(self):
"""Type a Config"""
return "mixconfig"
def new(self, name=None, type="config"):
"""Create and add a new config"""
config = self._config_bag.context
new_config = config.new_config(type_=type, name=name)
return self._return_config(new_config)
def remove(self, name):
"""Remove config from MetaConfig"""
config = self._config_bag.context.remove_config(name)
return self._return_config(config)
def add(self, config):
"""Add config from MetaConfig"""
# pylint: disable=protected-access
self._config_bag.context.add_config(config._config_bag.context)
def parents(self):
"""Get all parents of current config"""
ret = []
for parent in self._config_bag.context.get_parents():
ret.append(self._return_config(parent))
return ret
class _TiramisuContextMetaConfig(_TiramisuContextMixConfig):
"""Actions to MetaConfig"""
def type(self):
"""Type a Config"""
return "metaconfig"
class TiramisuContextCache(TiramisuConfig):
"""Manage config cache"""
def reset(self):
"""Reset cache"""
self._config_bag.context.reset_cache(None, None)
def set_expiration_time(
self,
time: int,
) -> None:
"""Change expiration time value"""
self._config_bag.expiration_time = time
def get_expiration_time(self) -> int:
"""Get expiration time value"""
return self._config_bag.expiration_time
class TiramisuAPI(TiramisuHelp):
"""TiramisuAPI common class"""
_registers = {}
def __init__(self, config_bag, orig_config_bags=None) -> None:
self._config_bag = config_bag
self._orig_config_bags = orig_config_bags
if not self._registers:
_registers(self._registers, "TiramisuContext")
super().__init__()
def option(
self,
path: str,
index: Optional[int] = None,
) -> TiramisuOption:
"""Select an option by path"""
return TiramisuOption(
path,
index,
self._config_bag,
)
def __getattr__(self, subfunc: str) -> Any:
if subfunc in ["forcepermissive", "unrestraint", "nowarnings"]:
if subfunc == "unrestraint" and self._config_bag.is_unrestraint:
return self
if self._orig_config_bags:
msg = _(
"do not use unrestraint, nowarnings or forcepermissive together"
)
raise ConfigError(msg)
config_bag = self._config_bag.copy()
if subfunc == "unrestraint":
config_bag.unrestraint()
elif subfunc == "nowarnings":
config_bag.nowarnings()
else:
config_bag.set_permissive()
return ConfigProp(config_bag, [self._config_bag])
if subfunc == "config":
config_type = self._config_bag.context.impl_type
if config_type == "group":
config = _TiramisuContextGroupConfig
elif config_type == "meta":
config = _TiramisuContextMetaConfig
elif config_type == "mix":
config = _TiramisuContextMixConfig
else:
config = _TiramisuContextConfig
return config(self._config_bag, self._orig_config_bags)
if subfunc in self._registers:
config_bag = self._config_bag
# del config_bag.permissives
return self._registers[subfunc](config_bag, self._orig_config_bags)
raise ConfigError(
_("please specify a valid sub function ({0}.{1})").format(
self.__class__.__name__, subfunc
)
)
def __dir__(self):
return list(self._registers.keys()) + [
"unrestraint",
"forcepermissive",
"nowarnings",
"config",
]
class ConfigProp(TiramisuAPI, TiramisuContextOption):
pass
class Config(TiramisuAPI, TiramisuContextOption):
"""Root config object that enables us to handle the configuration options"""
def __init__(
self,
descr: OptionDescription,
name=None,
display_name=None,
) -> None:
if isinstance(descr, KernelConfig):
config = descr
else:
config = KernelConfig(
descr,
name=name,
display_name=display_name,
)
settings = config.get_settings()
properties = settings.get_context_properties()
permissives = settings.get_context_permissives()
config_bag = ConfigBag(
config,
properties=properties,
permissives=permissives,
)
super().__init__(config_bag)
def __del__(self):
try:
del self._config_bag.context
del self._config_bag
del self._orig_config_bags
except ConfigError:
pass
class MetaConfig(TiramisuAPI, TiramisuContextOption):
"""MetaConfig object that enables us to handle the sub configuration's options
with common root optiondescription
"""
# pylint: disable=too-few-public-methods
def __init__(
self,
children: "Config" = None,
name=None,
optiondescription: Optional[OptionDescription] = None,
display_name=None,
) -> None:
if children is None:
children = []
if isinstance(children, KernelMetaConfig):
config = children
else:
_children = []
for child in children:
if isinstance(child, TiramisuAPI):
_children.append(child._config_bag.context)
else:
_children.append(child)
config = KernelMetaConfig(
_children,
optiondescription=optiondescription,
name=name,
display_name=display_name,
)
settings = config.get_settings()
properties = settings.get_context_properties()
permissives = settings.get_context_permissives()
config_bag = ConfigBag(config, properties=properties, permissives=permissives)
super().__init__(config_bag)
class MixConfig(TiramisuAPI):
"""MixConfig object that enables us to handle the sub configuration's options
with differents root optiondescription
"""
# pylint: disable=too-few-public-methods
def __init__(
self,
optiondescription: OptionDescription,
children: List[Config],
name: Callable = None,
display_name=None,
) -> None:
if isinstance(children, KernelMixConfig):
config = children
else:
_children = []
for child in children:
if isinstance(child, TiramisuAPI):
_children.append(child._config_bag.context)
else:
_children.append(child)
config = KernelMixConfig(
optiondescription,
_children,
name=name,
display_name=display_name,
)
settings = config.get_settings()
properties = settings.get_context_properties()
permissives = settings.get_context_permissives()
config_bag = ConfigBag(
config,
properties=properties,
permissives=permissives,
)
super().__init__(config_bag)
class GroupConfig(TiramisuAPI):
"""GroupConfig that enables us to access the sub configuration's options"""
# pylint: disable=too-few-public-methods
def __init__(
self,
children,
name=None,
) -> None:
if isinstance(children, KernelGroupConfig):
config = children
else:
_children = []
for child in children:
if isinstance(child, TiramisuAPI):
_children.append(child._config_bag.context)
else:
_children.append(child)
config = KernelGroupConfig(_children, name=name)
config_bag = ConfigBag(config, properties=None, permissives=None)
super().__init__(config_bag)