rougail/src/rougail/config/__init__.py

605 lines
18 KiB
Python

"""
Config file for Rougail
Created by:
EOLE (http://eole.orion.education.fr)
Copyright (C) 2005-2018
Forked by:
Cadoles (http://www.cadoles.com)
Copyright (C) 2019-2021
Silique (https://www.silique.fr)
Copyright (C) 2022-2026
This program is free software: you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from pathlib import Path
from tiramisu import Config
from tiramisu.error import display_list
from ruamel.yaml import YAML
from ..utils import _, load_modules
from ..tiramisu import normalize_family
from ..convert import RougailConvert
from ..convert.object_model import get_convert_option_types
RENAMED = {
"dictionaries_dir": "main_structural_directories",
"main_dictionaries": "main_structural_directories",
"variable_namespace": "main_namespace",
"functions_file": "functions_files",
}
NOT_IN_TIRAMISU = {
"custom_types": {},
}
SUBMODULES = None
def get_sub_modules():
global SUBMODULES
if SUBMODULES is None:
SUBMODULES = {}
for submodule in Path(__file__).parent.parent.iterdir():
if submodule.name.startswith("_") or not submodule.is_dir():
continue
config_file = submodule / "config.py"
if config_file.is_file():
SUBMODULES[submodule.name] = load_modules(
"rougail." + submodule.name + ".config", str(config_file)
)
return SUBMODULES
def get_level(module):
return float(module["level"]) + {
"structural": 0.1,
"user data": 0.2,
"output": 0.3,
}.get(module["process"])
class _RougailConfig:
def __init__(self, backward_compatibility: bool, add_extra_options: bool):
self.backward_compatibility = backward_compatibility
self.add_extra_options = add_extra_options
self.root = None
def copy(self, backward_compatibility=None):
if not self.root:
self.generate_config()
config = self.config.config.copy()
config.value.importation(self.config.value.exportation())
config.property.importation(
self.config.property.exportation()
)
config.property.read_only()
if backward_compatibility is None:
backward_compatibility = self.backward_compatibility
rougailconfig = _RougailConfig(backward_compatibility, self.add_extra_options)
rougailconfig.root = self.root
rougailconfig.config = config
rougailconfig.extra_vars = self.extra_vars.copy()
rougailconfig.not_in_tiramisu = NOT_IN_TIRAMISU | rougailconfig.extra_vars
for variable in self.not_in_tiramisu:
value = getattr(self, variable)
if not isinstance(value, str):
value = value.copy()
setattr(rougailconfig, variable, value)
return rougailconfig
def generate_config(self):
self.root, extra_vars = _rougail_config(
self.backward_compatibility, self.add_extra_options
)
self.config = Config(
self.root,
)
self.extra_vars = extra_vars
self.not_in_tiramisu = NOT_IN_TIRAMISU | extra_vars
for variable, default_value in self.not_in_tiramisu.items():
if not isinstance(default_value, str):
default_value = default_value.copy()
setattr(self, variable, default_value)
self.config.property.read_only()
def __setitem__(
self,
key,
value,
) -> None:
if self.root is None:
self.generate_config()
if key in self.not_in_tiramisu:
setattr(self, key, value)
else:
self.config.property.read_write()
key = RENAMED.get(key, key)
option = self.config.option(key)
if option.isoptiondescription() and option.isleadership():
if isinstance(value, RConfigLeadership):
leader = value.leader
followers = value.followers
else:
leader = list(value)
followers = value.values()
option.leader().value.reset()
option.leader().value.set(leader)
follower = option.followers()[0]
for idx, val in enumerate(followers):
self.config.option(follower.path(), idx).value.set(val)
else:
option.value.set(value)
self.config.property.read_only()
def __getitem__(
self,
key,
) -> None:
if self.root is None:
self.generate_config()
if key in self.not_in_tiramisu:
return getattr(self, key)
option = self.config.option(key)
if option.isoptiondescription() and option.isleadership():
return self.get_leadership(option)
ret = self.config.option(key).value.get()
return ret
def __contains__(
self,
key,
) -> None:
try:
self.__getitem__(key)
except AttributeError:
return False
return True
def get_leadership(self, option) -> dict:
leader = None
followers = []
for opt, value in option.value.get().items():
if opt.issymlinkoption():
continue
if leader is None:
leader = value
else:
followers.append(value)
return RConfigLeadership(self.config, option, leader, followers)
def parse(self, config) -> str:
for option in config:
if option.isoptiondescription():
yield from self.parse(option)
elif not option.issymlinkoption():
yield f"{option.path()}: {option.value.get()}"
def __repr__(self):
if self.root is None:
self.generate_config()
self.config.property.read_write()
try:
values = "\n".join(self.parse(self.config))
except Exception as err:
values = str(err)
self.config.property.read_only()
return values
class RConfigLeadership:
def __init__(self, config, option, leader, followers):
self.config = config
self.option = option
self.leader = leader
self.followers = followers
def items(self):
return dict(zip(self.leader, self.followers)).items()
def __setitem__(
self,
key,
value,
) -> None:
self.config.property.read_write()
names = self.option.option("names")
leader = names.value.get()
leader.append(key)
names.value.set(leader)
directories = self.option.option("directories", len(leader) - 1)
directories.value.set(value)
self.leader.append(key)
self.followers.append(value)
self.config.property.read_only()
def __getitem__(self, key):
option = self.option.option(key)
if option.isleader():
return option.value.get()
return [option.index(idx).value.get() for idx in range(option.value.len())]
def __repr__(self):
return dict(zip(self.leader, self.followers))
class StaticRougailConvert(RougailConvert):
def __init__(
self,
add_extra_options: bool,
rougailconfig: dict={},
) -> None:
self.add_extra_options = add_extra_options
super().__init__(rougailconfig)
def load_config(self) -> None:
self.sort_structural_files_all = False
self.main_namespace = None
self.suffix = ""
self.custom_types = {}
self.functions_files = []
self.modes_level = []
self.extra_annotators = []
self.base_option_name = "baseoption"
self.export_with_import = True
self.internal_functions = []
self.force_optional = False
self.structurals = ["commandline"]
self.user_data = []
self.output = None
self.tiramisu_cache = False
# self.tiramisu_cache = "a.py"
self.load_unexist_redefine = False
def get_common_rougail_config(
*,
backward_compatibility=True,
) -> str:
rougail_options = f"""default_structural_format_version:
description: {_('Default version of the structural file format')}
help: {_('This value is only used if the version is not set in the structural file')}
alternative_name: v
choices:
- '1.0'
- '1.1'
mandatory: false
types:
description: {_("File with personalize types")}
help: {_("This file contains personalize types in Rougail format for structure files")}
type: unix_filename
params:
allow_relative: true
test_existence: true
multi: true
mandatory: false
functions_files:
description: {_("File with functions")}
help: {_("This file contains filters and additional Jinja2 functions usable in structure files")}
type: unix_filename
params:
allow_relative: true
test_existence: true
types:
- file
multi: true
mandatory: false
modes_level:
description: {_("All modes level available")}
multi: true
mandatory: false
"""
if backward_compatibility:
rougail_options += """ default:
- basic
- standard
- advanced
"""
rougail_options += f"""
default_family_mode:
description: {_("Default mode for a family")}
default:
jinja: |
{{% if modes_level %}}
{{{{ modes_level[0] }}}}
{{% endif %}}
description: {_('the first one defined in "modes_level"')}
disabled:
jinja: |
{{% if not modes_level %}}
No mode
{{% endif %}}
description: {_('when no mode is defined in "modes_level"')}
validators:
- type: jinja
jinja: |
{{% if default_family_mode not in modes_level %}}
not in modes_level ({{modes_level}})
{{% endif %}}
description: {_('this mode must be available in "modes_level"')}
commandline: false
default_variable_mode:
description: {_("Default mode for a variable")}
default:
jinja: |
{{% if modes_level %}}
{{% if modes_level | length == 1 %}}
{{{{ modes_level[0] }}}}
{{% else %}}
{{{{ modes_level[1] }}}}
{{% endif %}}
{{% endif %}}
description: {_('if the variable "modes_level" is defined, the default value is the second available element, otherwise, the first')}
disabled:
jinja: |
{{% if not modes_level %}}
No mode
{{% endif %}}
description: {_('when no mode is defined in "modes_level"')}
validators:
- type: jinja
jinja: |
{{% if default_variable_mode not in modes_level %}}
not in modes_level ({{modes_level}})
{{% endif %}}
description: {_('this mode must be available in "modes_level"')}
commandline: false
base_option_name:
description: {_("Option name for the base option")}
default: baseoption
commandline: false
export_with_import:
description: {_("In cache file, do not importation of Tiramisu and other dependencies")}
default: true
commandline: false
tiramisu_cache:
description: {_("Store Tiramisu cache filename")}
help: "{_("This file contains the Tiramisu instructions used internally to load the variables.\n\nThis file can be used for debugging")}"
alternative_name: t
type: unix_filename
mandatory: false
commandline: false
params:
allow_relative: true
types:
- file
internal_functions:
description: {_("Name of internal functions that we can use as a function")}
multi: true
mandatory: false
commandline: false
extra_annotators:
description: {_("Name of extra annotators")}
multi: true
mandatory: false
commandline: false
suffix:
description: {_("Suffix add to generated options name")}
default: ''
mandatory: false
commandline: false
force_optional:
description: {_("Every variables in calculation are optionals")}
default: False
load_unexist_redefine:
description: {_("Loads redefine variables even if there don't already exists")}
commandline: false
default: False
secret_manager: # {_("The secret manager")}
pattern:
description: {_("The secret pattern to constructing the name of the item searched for in the secret manager")}
help: {_("The pattern is in Jinja2 format")}
default: "{{{{ project }}}} - {{{{ environment }}}} - {{{{ service }}}} - {{{{ user }}}}"
"""
processes = {
"structural": [],
"user data": [],
"output": [],
}
processes_tr = {"structural": _("structural"),
"user data": _("user datas"),
"output": _("output"),
}
processes_empty = []
for module in get_sub_modules().values():
data = module.get_rougail_config(backward_compatibility=backward_compatibility)
if data["process"]:
processes[data["process"]].append(data)
else:
processes_empty.append(data["options"])
# reorder
for process in processes:
processes[process] = list(sorted(processes[process], key=get_level))
rougail_process = "step: # Load and exporter steps"
for process in processes:
if processes[process]:
objects = processes[process]
process_name = normalize_family(process)
tr_process_name = processes_tr[process]
rougail_process += f"""
{process_name}:
description: {_('Select for {0}').format(tr_process_name)}
"""
if process != "structural":
rougail_process += """ alternative_name: {NAME[0]}
""".format(
NAME=normalize_family(process),
)
rougail_process += """ choices:
"""
for obj in objects:
rougail_process += f" - {obj['name']}\n"
if process == "structural":
rougail_process += """ commandline: false
multi: true
default:
- directory
"""
elif process == "user data":
rougail_process += """ multi: true
mandatory: false"""
hidden_outputs = [
process["name"]
for process in processes["output"]
if not process.get("allow_user_data", True)
]
if hidden_outputs:
rougail_process += """
disabled:
type: jinja
jinja: |
"""
for hidden_output in hidden_outputs:
rougail_process += """ {% if _.output is not propertyerror and _.output == 'NAME' %}
Cannot load user data for NAME output
{% endif %}
""".replace(
"NAME", hidden_output
)
rougail_process += f""" description: {_('outputs {0} did not allow user data')}
""".format(display_list(hidden_outputs, add_quote=True, separator="or"))
elif objects:
rougail_process += " default: {DEFAULT}".format(
DEFAULT=objects[0]["name"]
)
else:
if process == "output":
prop = "hidden"
else:
prop = "disabled"
rougail_process += """
{NAME}:
description: Select for {NAME}
mandatory: false
{PROP}: true
multi: true
default: ["You haven't installed \\\"{NAME}\\\" package for rougail"]
validators:
- jinja: Please install a rougail-{NAME}-* package.
""".format(
NAME=normalize_family(process),
PROP=prop,
)
rougail_process += f"""
define_default_params: false # {_('Override default parameters for option type')}
default_params:
description: {_("Default parameters for option type")}
disabled:
variable: _.define_default_params
when: false
"""
for typ, typ_description, params in get_convert_option_types():
rougail_process += f"""
{typ}: # {typ_description}
"""
for key, key_type, description, multi, value, choices in params:
rougail_process += f"""
{key}:
"""
if description:
rougail_process += f""" description: "{description}"
"""
rougail_process += f""" type: {key_type}
multi: {multi}
mandatory: false
default: {value}
"""
if choices:
rougail_process += " choices:\n"
for choice in choices:
rougail_process += f" - {choice}\n"
rougail_options += rougail_process
return processes, processes_empty, rougail_options
def _rougail_config(
backward_compatibility: bool = True,
add_extra_options: bool = True,
) -> "OptionDescription":
processes, processes_empty, rougail_options = get_common_rougail_config(backward_compatibility=backward_compatibility)
convert = StaticRougailConvert(add_extra_options)
convert.init()
convert.namespace = None
convert.parse_root_file(
["rougail.config"],
"",
"1.1",
YAML().load(rougail_options),
)
for process_empty in processes_empty:
convert.parse_root_file(
["rougail.config"],
"",
"1.1",
YAML().load(process_empty),
)
extra_vars = {}
objects = []
for obj in sorted(
[obj for objects in processes.values() for obj in objects], key=get_level
):
if "extra_vars" in obj:
extra_vars |= obj["extra_vars"]
if not "options" in obj:
continue
if not isinstance(obj["options"], list):
options = [obj["options"]]
else:
options = obj["options"]
for option in options:
convert.parse_root_file(
[f'rougail.config.{obj["name"]}'],
"",
"1.1",
YAML().load(option),
)
tiram_obj = convert.save()
optiondescription = {}
exec(tiram_obj, {}, optiondescription) # pylint: disable=W0122
return optiondescription["option_0"], extra_vars
def get_rougail_config(
*,
backward_compatibility: bool = True,
add_extra_options: bool = True,
) -> _RougailConfig:
return _RougailConfig(
backward_compatibility,
add_extra_options,
)
RougailConfig = get_rougail_config()