fix: reorganise user_datas

This commit is contained in:
egarette@silique.fr 2024-12-11 20:54:03 +01:00
parent 8afb787c98
commit 00e0941f6e
12 changed files with 161 additions and 158 deletions

View file

@ -92,7 +92,7 @@ class Annotator(Walk): # pylint: disable=R0903
elif variable.multi:
msg = _(
'the variable "{0}" is multi but has a non list default value'
).format(variable.name)
).format(variable.path)
raise DictConsistencyError(msg, 12, variable.xmlfiles)
elif variable.path in self.objectspace.followers:
self.objectspace.default_multi[variable.path] = variable.default

1
src/rougail/cli Symbolic link
View file

@ -0,0 +1 @@
../../../rougail-cli/src/rougail/cli

View file

@ -189,6 +189,7 @@ class FakeRougailConvert(RougailConvert):
self.force_optional = False
self.plugins = ["structural_commandline"]
self.user_datas = []
self.output = None
self.add_extra_options = self.add_extra_options

View file

@ -395,6 +395,7 @@ class ParserVariable:
"structural_commandline.add_extra_options"
]
self.user_datas = rougailconfig["step.user_data"]
self.output = rougailconfig["step.output"]
self.plugins = rougailconfig["plugins"]
def _init(self):

View file

@ -81,6 +81,10 @@ class NotFoundError(Exception):
pass
class ExtentionError(Exception):
pass
## ---- specific exceptions ----

1
src/rougail/output_ansible Symbolic link
View file

@ -0,0 +1 @@
../../../rougail-output-ansible/src/rougail/output_ansible

1
src/rougail/output_console Symbolic link
View file

@ -0,0 +1 @@
../../../rougail-output-console/src/rougail/output_console

1
src/rougail/output_doc Symbolic link
View file

@ -0,0 +1 @@
../../../rougail-output-doc/src/rougail/output_doc

1
src/rougail/output_json Symbolic link
View file

@ -0,0 +1 @@
../../../rougail-output-json/src/rougail/output_json

View file

@ -0,0 +1 @@
../../../rougail-user-data-environment/src/rougail/user_data_environment

1
src/rougail/user_data_file Symbolic link
View file

@ -0,0 +1 @@
../../../rougail-user-data-file/src/rougail/user_data_file

View file

@ -21,7 +21,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from typing import List
from re import findall
from tiramisu import undefined
from tiramisu import undefined, Calculation
from tiramisu.error import PropertiesOptionError, LeadershipError, ConfigError
from .object_model import CONVERT_OPTION
@ -32,44 +32,148 @@ class UserDatas:
self.config = config
def user_datas(self, user_datas: List[dict]):
values = {}
errors = []
warnings = []
self.values = {}
self.errors = []
self.warnings = []
self._populate_values(user_datas)
self._auto_configure_dynamics()
self._populate_config()
self._populate_error_warnings()
return {
"errors": self.errors,
"warnings": self.warnings,
}
def _populate_values(self, user_datas):
for datas in user_datas:
options = datas.get("options", {})
for name, data in datas.get("values", {}).items():
values[name] = {
self.values[name] = {
"values": data,
"options": options.copy(),
}
errors.extend(datas.get("errors", []))
warnings.extend(datas.get("warnings", []))
self._auto_configure_dynamics(values)
while values:
self.errors.extend(datas.get("errors", []))
self.warnings.extend(datas.get("warnings", []))
def _get_variable(self, config):
for subconfig in config:
if subconfig.isoptiondescription():
yield from self._get_variable(subconfig)
else:
yield subconfig
def _auto_configure_dynamics(self):
cache = {}
added = []
for path, data in list(self.values.items()):
value = data["values"]
try:
option = self.config.option(path)
option.name()
except (ConfigError, PropertiesOptionError):
pass
except AttributeError:
self._not_found_is_dynamic(self.config, path, cache, added)
def _not_found_is_dynamic(self, config, path, cache, added):
"""if path is not found, check if parent is a dynamic family
"""
current_path = ""
identifiers = []
# get parent
for name in path.split(".")[:-1]:
if current_path:
current_path += "."
current_path += name
if current_path in cache:
config, identifier = cache[current_path]
identifiers.append(identifier)
continue
tconfig = config.option(name)
try:
tconfig.group_type()
# object exists, so current config is the temporary config
config = tconfig
if config.isdynamic(only_self=True):
identifiers.append(config.identifiers()[-1])
except AttributeError:
# try to found the good dynamic family
for tconfig in config.list(uncalculated=True):
if not tconfig.isdynamic(only_self=True):
# it's not a dynamic variable
continue
identifier = self._get_identifier(
tconfig.name(), name
)
if identifier is None:
# it's a dynamic variable but doesn't match the current name
continue
dynamic_variable = tconfig.information.get(
"dynamic_variable",
None,
)
if not dynamic_variable:
# it's the good dynamic variable but it's not linked to a variable
# so cannot change the variable
continue
option_type = self.config.option(
dynamic_variable
).information.get("type")
dyn_options_values = self.config.option(dynamic_variable).get().impl_getdefault()
if "{{ identifier }}" in dynamic_variable:
for s in identifiers:
dynamic_variable = dynamic_variable.replace(
"{{ identifier }}", str(s), 1
)
if dynamic_variable not in self.values and not dyn_options_values:
self.values[dynamic_variable] = {"values": []}
added.append(dynamic_variable)
elif dynamic_variable not in added:
continue
config = tconfig
identifiers.append(identifier)
typ = CONVERT_OPTION.get(option_type, {}).get(
"func"
)
if typ:
identifier = typ(identifier)
if (
identifier
not in self.values[dynamic_variable]["values"]
):
self.values[dynamic_variable]["values"].append(
identifier
)
cache[current_path] = config, identifier
break
def _populate_config(self):
while self.values:
value_is_set = False
for option in self._get_variable(self.config):
path = option.path()
if path not in values:
if path not in self.values:
continue
options = values[path].get("options", {})
value = values[path]["values"]
options = self.values[path].get("options", {})
value = self.values[path]["values"]
needs_convert = options.get("needs_convert", False)
# converted value
if option.ismulti():
if options.get("multi_separator") and not isinstance(value, list):
value = value.split(options["multi_separator"])
values[path]["values"] = value
self.values[path]["values"] = value
if option.issubmulti():
value = [[val] for val in value]
if options.get("needs_convert"):
if needs_convert:
if option.issubmulti():
for idx, val in enumerate(value):
value[idx] = [convert_value(option, v) for v in val]
else:
value = [convert_value(option, val) for val in value]
values[path]["values"] = value
values[path]["options"]["needs_convert"] = False
elif options.get("needs_convert"):
self.values[path]["values"] = value
self.values[path]["options"]["needs_convert"] = False
elif needs_convert:
value = convert_value(option, value)
index = option.index()
if index is not None:
@ -82,18 +186,29 @@ class UserDatas:
# value is correctly set, remove variable to the set
if index is not None:
# if it's a follower waiting for all followers are sets
values[path]["values"][index] = undefined
if set(values[path]["values"]) == {undefined}:
values.pop(path)
self.values[path]["values"][index] = undefined
if set(self.values[path]["values"]) == {undefined}:
self.values.pop(path)
else:
values.pop(path)
self.values.pop(path)
except Exception:
if path != option.path():
values[option.path()] = values.pop(path)
self.values[option.path()] = self.values.pop(path)
if not value_is_set:
break
def _get_identifier(self, true_name, name) -> str:
if true_name == "{{ identifier }}":
return name
regexp = true_name.replace("{{ identifier }}", "(.*)")
finded = findall(regexp, name)
if len(finded) != 1 or not finded[0]:
return None
return finded[0]
def _populate_error_warnings(self):
# we don't find variable, apply value just to get error or warning messages
for path, data in values.items():
for path, data in self.values.items():
try:
option = self.config.option(path)
value = data["values"]
@ -105,148 +220,23 @@ class UserDatas:
else:
option.value.set(value)
except AttributeError as err:
errors.append(str(err))
self.errors.append(str(err))
except (ValueError, LeadershipError) as err:
errors.append(str(err))
self.errors.append(str(err))
except PropertiesOptionError as err:
warnings.append(str(err))
return {
"errors": errors,
"warnings": warnings,
}
def _get_variable(self, config):
for subconfig in config:
if subconfig.isoptiondescription():
yield from self._get_variable(subconfig)
else:
yield subconfig
def _auto_configure_dynamics(
self,
values,
):
cache = {}
added = []
for path, data in list(values.items()):
value = data["values"]
try:
option = self.config.option(path)
option.name()
except (ConfigError, PropertiesOptionError):
pass
except AttributeError:
config = self.config
current_path = ""
for name in path.split(".")[:-1]:
if current_path:
current_path += "."
current_path += name
if current_path in cache:
config, identifier = cache[current_path]
else:
tconfig = config.option(name)
try:
tconfig.group_type()
config = tconfig
except AttributeError:
for tconfig in config.list(uncalculated=True):
if tconfig.isdynamic(only_self=True):
identifier = self._get_identifier(
tconfig.name(), name
)
if identifier is None:
continue
dynamic_variable = tconfig.information.get(
"dynamic_variable",
None,
)
if not dynamic_variable:
continue
option_type = self.config.option(
dynamic_variable
).information.get("type")
identifiers = tconfig.identifiers()
if identifiers:
for s in identifiers:
dynamic_variable = dynamic_variable.replace(
"{{ identifier }}", str(s), 1
)
if dynamic_variable not in values and not self.config.option(dynamic_variable).get().impl_getdefault():
values[dynamic_variable] = {"values": []}
added.append(dynamic_variable)
elif dynamic_variable not in added:
continue
config = tconfig
typ = CONVERT_OPTION.get(option_type, {}).get(
"func"
)
if typ:
identifier = typ(identifier)
if (
identifier
not in values[dynamic_variable]["values"]
):
values[dynamic_variable]["values"].append(
identifier
)
cache[current_path] = config, identifier
break
else:
if option.isdynamic():
parent_option = self.config.option(path.rsplit(".", 1)[0])
identifier = self._get_identifier(
parent_option.name(uncalculated=True),
parent_option.name(),
)
dynamic_variable = None
while True:
dynamic_variable = parent_option.information.get(
"dynamic_variable",
None,
)
if dynamic_variable:
break
parent_option = self.config.option(
parent_option.path().rsplit(".", 1)[0]
)
if "." not in parent_option.path():
parent_option = None
break
if not parent_option:
continue
identifiers = parent_option.identifiers()
for identifier in identifiers:
dynamic_variable = dynamic_variable.replace(
"{{ identifier }}", str(identifier), 1
)
if dynamic_variable not in values and not self.config.option(dynamic_variable).get().impl_getdefault():
values[dynamic_variable] = {"values": []}
added.append(dynamic_variable)
elif dynamic_variable not in added:
continue
option_type = option.information.get("type")
typ = CONVERT_OPTION.get(option_type, {}).get("func")
if typ:
identifier = typ(identifier)
if identifier not in values[dynamic_variable]["values"]:
values[dynamic_variable]["values"].append(identifier)
cache[option.path()] = option, identifier
def _get_identifier(self, true_name, name) -> str:
if true_name == "{{ identifier }}":
return name
regexp = true_name.replace("{{ identifier }}", "(.*)")
finded = findall(regexp, name)
if len(finded) != 1 or not finded[0]:
return None
return finded[0]
self.warnings.append(str(err))
def convert_value(option, value):
if value == "":
return None
option_type = option.information.get("type")
if option_type == 'choice':
choices = option.value.list()
if value not in choices and isinstance(value, str):
# FIXME add other tests (boolean, float, ...)
if value.isnumeric() and int(value) in choices:
value = int(value)
func = CONVERT_OPTION.get(option_type, {}).get("func")
if func:
return func(value)