rougail-output-display/src/rougail/output_display/display.py

332 lines
12 KiB
Python

"""
Silique (https://www.silique.fr)
Copyright (C) 2022-2026
This program is free software: you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from typing import Optional, Any
from ruamel.yaml import YAML
from io import BytesIO
from tiramisu.error import PropertiesOptionError, LeadershipError, ConfigError
from tiramisu import owners, groups
from .config import OutPuts
from .i18n import _
class RougailOutputDisplay:
def __init__(
self,
config: "Config",
*,
rougailconfig: "RougailConfig" = None,
root_config=None,
user_data_errors: Optional[list] = None,
user_data_warnings: Optional[list] = None,
config_owner_is_path: bool = False,
layer_datas=None,
true_config=None,
**kwargs,
) -> None:
if rougailconfig is None:
from rougail import RougailConfig
rougailconfig = RougailConfig
self.rougailconfig = rougailconfig
self.config = config
if true_config is None:
self.true_config = config
else:
self.true_config = true_config
self.root_config = root_config
self.layer_datas = layer_datas
self.config_owner_is_path = config_owner_is_path
self.errors = []
self.warnings = []
if user_data_errors is None:
user_data_errors = []
self.user_data_errors = user_data_errors
if user_data_warnings is None:
user_data_warnings = []
self.user_data_warnings = user_data_warnings
# self.out = []
self.nodes = None
self.yaml = YAML()
self.yaml.indent(mapping=2, sequence=4, offset=2)
def run(self) -> str:
output_format = self.rougailconfig["display.output_format"]
output = OutPuts().get()[output_format](self.rougailconfig)
warnings = self.user_data_warnings + self.warnings
errors = self.user_data_errors + self.errors
level = None
errors_warnings_dict = {}
if not errors and self.nodes is None:
show_secrets = self.rougailconfig["display.show_secrets"]
self.nodes = Node(self.yaml, show_secrets, self.config, self.root_config, self.config_owner_is_path, errors)
if warnings:
level = "warnings"
#output.display_warnings(errors_warnings_dict, warnings)
for warning in warnings:
output.error_warn_to_dict(warning, errors_warnings_dict, 'warning')
if errors:
level = "errors"
for error in errors:
output.error_warn_to_dict(error, errors_warnings_dict, 'error')
if level:
if level == "errors":
tree = output.error_header()
else:
tree = output.warning_header()
ret = output.parse_error_warning(tree, errors_warnings_dict, output.display_error, level)
if errors:
return False, ret
ret += "\n"
else:
ret = ''
code, run = output.run(self.nodes, self.layer_datas)
return code, ret + run
def print(self) -> None:
status, ret = self.run()
print(ret)
return status
class Node:
def __init__(
self,
yaml,
show_secrets,
config,
root_config,
config_owner_is_path,
errors,
*,
node=None,
values: Optional[dict]=None,
level: int=0,
leader_index: Optional[int]=None,
) -> None:
self.yaml = yaml
self.show_secrets = show_secrets
self.config = config
self.root_config = root_config
self.config_owner_is_path = config_owner_is_path
self.errors = errors
if node is None:
values = self.config.value.get()
self.level = level
if node:
self.hidden = "hidden" in node.property.get()
self.description = node.description()
else:
self.hidden = None
self.description = _("Variables:")
self.level = level
self.children = []
if node and node.isoptiondescription() and node.isleadership():
values_iter = iter(values.items())
leader, leader_values = next(values_iter)
followers_values = {idx: {leader: value} for idx, value in enumerate(leader_values)}
for follower, follower_value in values_iter:
followers_values[follower.index()][follower] = follower_value
for idx, fvalues in enumerate(followers_values.values()):
self.add_node(leader, fvalues, leader_index=idx)
else:
for option, value in values.items():
if option.isoptiondescription():
if option.group_type() == groups.namespace and not value:
continue
if option.isleadership():
leader, leader_values = next(iter(value.items()))
if not leader_values:
self.add_leaf(leader, [], description=option.description())
else:
self.add_node(option, value)
else:
if value == {}:
self.add_leaf(option, "{}")
else:
self.add_node(option, value)
else:
if option.isleader():
index = leader_index
else:
index = None
self.add_leaf(option, value, leader_index=index)
def add_node(
self,
option,
values,
*,
leader_index: Optional[int]=None,
) -> 'Node':
self.children.append({"type": "node",
"node": Node(
self.yaml,
self.show_secrets,
self.config,
self.root_config,
self.config_owner_is_path,
self.errors,
node=option,
values=values,
level=self.level + 1,
leader_index=leader_index,
),
})
def add_leaf(
self, option, value: Any, *, leader_index: Optional[int] = None, description: Optional[str] = None
):
properties = option.property.get()
if description is None:
description = option.description()
icon = "leaf"
else:
icon = "node"
self.children.append({"type": "leaf",
"description": description,
"values": self.get_values(option, value, leader_index, properties),
"icon": icon,
"hidden": "hidden" in properties,
},
)
def get_values(self, option: "Option", value: Any, leader_index: Optional[int], properties: list[str]) -> None:
if option.isoptiondescription():
return [{"is_default": True, "value": value, "loaded_from": None}]
values = []
meta_config = self.config
meta_option = option
index = option.index()
if index is not None:
loaded_from_key = f"loaded_from_{index}"
else:
loaded_from_key = "loaded_from"
#default = option.owner.isdefault() or not option.information.get("default_value_makes_sense", True)
force_store_value = "force_store_value" in properties
is_default = option.owner.isdefault()
option_path = option.path()
default_owner = owners.default
if self.root_config == meta_config:
added = True
true_default = is_default
else:
true_default = is_default and meta_option.owner.get() == default_owner
added = not is_default or true_default
secret_manager = option.information.get('secret_manager', False)
while True:
if values and true_default and (value in [None, []] or secret_manager):
break
if isinstance(value, list):
value = [self.convert_value(meta_option, val) for val in value]
else:
value = self.convert_value(meta_option, value)
if true_default:
loaded_from = None
else:
loaded_from = meta_option.information.get(loaded_from_key, None)
if added or force_store_value:
values.append(
{
"value": value,
"is_default": true_default,
"loaded_from": loaded_from,
}
)
if true_default or force_store_value:
break
meta_config_path = meta_config.path()
if is_default and (not meta_config_path or "." not in meta_config_path):
# we are in root metaconfig and we have default value
break
new_meta_config = self.get_metaconfig_with_default_value(meta_config, meta_option)
if not new_meta_config:
break
meta_option = new_meta_config.option(option_path, index)
if new_meta_config == meta_config:
# we already have current value, so we search default's one
is_default = True
true_default = True
value = meta_option.value.default()
else:
try:
is_default = meta_option.owner.isdefault()
except LeadershipError:
break
try:
value = meta_option.value.get()
except ValueError as err:
if not is_default:
meta_option._set_subconfig()
self.errors.append({str(err): meta_option._subconfig})
break
except Exception as err:
break
true_default = is_default and meta_option.owner.get() == default_owner
added = True
meta_config = new_meta_config
if leader_index is not None:
if len(value) > leader_index:
value = value[leader_index]
else:
value = None
return values
def convert_value(
self,
option,
value: Any,
) -> str:
"""Dump variable, means transform bool, ... to yaml string"""
if (
value is not None
and not self.show_secrets
and option.type() == "password"
):
return "*" * 10
if isinstance(value, str):
return value
with BytesIO() as ymlfh:
self.yaml.dump(value, ymlfh)
ret = ymlfh.getvalue().decode("utf-8").strip()
if ret.endswith("..."):
ret = ret[:-3].strip()
return ret
def get_metaconfig_with_default_value(self, meta_config, option):
default_owner = option.owner.default()
if default_owner == owners.default:
if option.owner.get() == default_owner:
return None
if self.root_config:
return self.root_config
return self.config
if not self.config_owner_is_path:
while True:
if meta_config.type() != 'metaconfig':
return None
meta_config = meta_config.parent()
if not meta_config.owner.isdefault():
break
else:
meta_config = self.root_config
for child in default_owner.split(".")[1:]:
meta_config = meta_config.config(child)
return meta_config