rougail-output-formatter/src/rougail/output_formatter/__init__.py

463 lines
19 KiB
Python

"""
Silique (https://www.silique.fr)
Copyright (C) 2024-2025
This program is free software: you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from io import BytesIO
from pathlib import Path
from typing import Optional
from ruamel.yaml import YAML, CommentedMap
from ruamel.yaml.representer import RoundTripRepresenter
from ruamel.yaml.tokens import CommentToken
from ruamel.yaml.error import CommentMark
from ruamel.yaml.comments import CommentedSeq
from ruamel.yaml.scalarstring import LiteralScalarString, FoldedScalarString, ScalarString
from djlint.settings import Config
from djlint.reformat import formatter
from tiramisu import undefined
from tiramisu.config import get_common_path
from rougail.convert import RougailConvert
from rougail.object_model import Variable, Family, Calculation, JinjaCalculation, IdentifierCalculation, IdentifierPropertyCalculation, NamespaceCalculation, IdentifierParam, IndexCalculation, IndexParam, NamespaceParam, Param
from rougail.utils import normalize_family
from .upgrade import RougailUpgrade
from .__version__ import __version__
def _(text):
return text
# XXX explicit null
def represent_none(self, data):
return self.represent_scalar('tag:yaml.org,2002:null', 'null')
def represent_str(self, data):
if data == '':
return self.represent_scalar('tag:yaml.org,2002:null', "")
return self.represent_scalar('tag:yaml.org,2002:str', data)
RoundTripRepresenter.add_representer(type(None), represent_none)
RoundTripRepresenter.add_representer(str, represent_str)
# XXX
class RougailOutputFormatter:
output_name = 'formatter'
def __init__(
self,
config: "Config",
*,
rougailconfig: "RougailConfig" = None,
user_data_errors: Optional[list] = None,
user_data_warnings: Optional[list] = None,
) -> None:
self.basic_types = {
str: "string",
int: "number",
bool: "boolean",
float: "float",
}
if rougailconfig is None:
from rougail import RougailConfig
rougailconfig = RougailConfig
rougailconfig["step.output"] = self.output_name
if rougailconfig["step.output"] != self.output_name:
raise ExtentionError(_('the "step.output" is not set to "{0}"').format(self.output_name))
# yaml.top_level_colon_align = True
self.main_namespace = rougailconfig["main_namespace"]
filenames = rougailconfig["main_dictionaries"]
if len(rougailconfig["main_dictionaries"]) > 1:
raise Exception(_('only one file is allowed'))
filename = Path(filenames[0])
if not filename.is_file():
raise Exception(_('only a file is allowed'))
self.config = Config()
self.config.profile = 'jinja'
self.config.line_break_after_multiline_tag = True
self.config.indent = " "
self.original_yaml = RougailUpgrade(rougailconfig).run(filename)
datas = RougailUpgrade(rougailconfig).run(filename)
self.rougail = RougailConvert(rougailconfig)
self.rougail.load_config()
self.rougail.init()
self.filename_str = str(filename)
if self.main_namespace is None:
self.rougail.namespace = None
else:
self.rougail.namespace = normalize_family(self.main_namespace)
self.rougail.create_namespace(self.main_namespace)
self.rougail.validate_file_version(
datas,
self.filename_str,
)
self.rougail.parse_root_file(
self.filename_str,
self.rougail.namespace,
"1.1",
datas,
)
self.yaml = YAML()
def run(self):
self.families_attributes = {attr: obj.get("default") for attr, obj in self.rougail.family.model_json_schema()["properties"].items()}
self.dynamics_attributes = {attr: obj.get("default") for attr, obj in self.rougail.dynamic.model_json_schema()["properties"].items()}
self.variables_attributes = {attr: obj.get("default") for attr, obj in self.rougail.variable.model_json_schema()["properties"].items()}
self.families = {None: CommentedMap()}
self.parse()
self.yaml.indent(mapping=2, sequence=4, offset=2)
self.yaml.version = '1.2'
self.yaml.explicit_start = True
self.yaml.explicit_end = True
self.default_flow_style = False
with BytesIO() as ymlfh:
self.yaml.dump(self.families[None], ymlfh)
ret = ymlfh.getvalue().decode("utf-8").strip()
return True, ret
def print(self):
ret, data = self.run()
print(data)
return ret
def parse(self):
# FIXME path to relative !
if self.rougail.namespace:
version_path = f'{self.rougail.namespace}.version'
else:
version_path = 'version'
if version_path in self.rougail.paths._data:
version_name = '_version'
else:
version_name = 'version'
self.families[None][version_name] = None
self.families[None].yaml_value_comment_extend(version_name, [CommentToken('\n\n', CommentMark(0)), None])
version = None
self.remaining = len(self.rougail.paths._data)
for path, obj in self.rougail.paths._data.items():
self.remaining -= 1
if version is None or version == '':
version = obj.version
if path == self.rougail.namespace:
self.families[path] = self.families[None]
continue
if isinstance(obj, Family):
self.parse_family(path, obj)
if isinstance(obj, Variable):
self.parse_variable(path, obj)
if not version:
raise Exception(_(f'no variables in file {self.filename_str}'))
self.families[None][version_name] = float(version)
def parse_family(self, path, obj):
children = [p.rsplit('.', 1)[-1] for p in self.rougail.parents[path]]
parent, name = self.get_parent_name(path)
ret = self.families[parent]
family = CommentedMap()
yaml_data = self.parse_yaml(path)
force_keys = []
if isinstance(yaml_data, dict):
if yaml_data.get("redefine", False):
family["redefine"] = True
force_keys = list(yaml_data)
if yaml_data.get("exists") is not None:
family["exists"] = yaml_data["exists"]
force_keys = list(yaml_data)
type_ = obj.type
if type_ == "dynamic":
attributes = self.dynamics_attributes
else:
attributes = self.families_attributes
for attr, default_value in attributes.items():
if attr in ["name", "path", "namespace", "version", "xmlfiles"]:
continue
try:
value = getattr(obj, attr)
except AttributeError:
continue
if attr != "type" and attr not in force_keys and value == default_value:
continue
if attr in children:
attr = f'_{attr}'
value = self.object_to_yaml(attr, type_, value, False, path)
family[attr] = value
if type_ == "dynamic" or (children and type_ == 'family'):
if "_type" in family:
del family["_type"]
else:
del family["type"]
if not set(family):
ret[name] = CommentedMap()
ret.yaml_value_comment_extend(name, [CommentToken('\n\n', CommentMark(0)), None])
elif not set(family) - {'description'}:
#
ret[name] = CommentedMap()
add_column = 3
path_len = path.count('.')
if self.rougail.namespace:
path_len -= 1
column = path_len * 2 + len(name) + add_column
if self.remaining:
description = family["description"] + '\n\n'
else:
description = family["description"]
ret.yaml_add_eol_comment(description, name, column=column)
else:
self.add_space(family)
ret[name] = family
self.families[path] = ret[name]
def parse_variable(self, path, obj):
parent, name = self.get_parent_name(path)
ret = self.families[parent]
variable = CommentedMap()
yaml_data = self.parse_yaml(path)
force_keys = []
if isinstance(yaml_data, dict):
if yaml_data.get("redefine", False):
variable["redefine"] = True
force_keys = list(yaml_data)
if yaml_data.get("exists") is not None:
variable["exists"] = yaml_data["exists"]
force_keys = list(yaml_data)
multi = obj.multi or isinstance(obj.default, list)
type_ = obj.type
for attr, default_value in self.variables_attributes.items():
if attr in ["name", "path", "namespace", "version", "xmlfiles"]:
continue
try:
value = getattr(obj, attr)
except AttributeError:
continue
if attr not in force_keys and value == default_value:
continue
value = self.object_to_yaml(attr, type_, value, multi, path)
variable[attr] = value
if variable.get("mandatory") is True and None not in variable.get("choices", []):
del variable["mandatory"]
if "default" in variable:
if "type" in variable and variable["type"] in ["string", "boolean", "number", "float"]:
if variable["default"] and isinstance(variable["default"], list):
tested_value = variable["default"][0]
else:
tested_value = variable["default"]
if variable["type"] == self.basic_types.get(type(tested_value), None):
del variable["type"]
if "multi" in variable and variable["multi"] is True and isinstance(variable["default"], list):
del variable["multi"]
elif variable.get("type") == "choice" and "choices" in variable:
del variable["type"]
elif variable.get("type") == "string":
# default type is string
del variable["type"]
if set(variable) in [{"multi"}, {'multi', 'description'}]:
variable["default"] = []
variable.pop("multi")
elif variable.get("type") == "boolean" and not multi:
# if boolean, the default value is True
del variable["type"]
variable["default"] = True
if "default" not in variable and variable.get("multi") is True and not set(variable) - {'default', 'description', "multi"}:
variable["default"] = []
del(variable['multi'])
if not isinstance(variable.get("default"), dict) and not set(variable) - {'default', 'description'}:
# shorthand notation
default = variable.get('default')
ret[name] = default
add_column = 3
if isinstance(default, list):
ret[name] = CommentedSeq()
if not default:
add_column += 3
for d in default:
ret[name].append(d)
else:
if default is None:
ret[name] = ""
else:
ret[name] = default
add_column += len(str(default)) + 1
if "description" in variable:
description = variable["description"]
if self.remaining and (not multi or not default):
description += "\n\n"
path_len = path.count('.')
if self.rougail.namespace:
path_len -= 1
column = path_len * 2 + len(name) + add_column
ret.yaml_add_eol_comment(description, name, column=column)
if multi and default:
self.add_space(ret)
else:
self.add_space(ret)
else:
if "default" in variable and variable["default"] is None:
variable["default"] = ""
ret[name] = variable
self.add_space(variable)
def add_space(self, obj):
def _get_last_obj(o, parent, param, typ):
if isinstance(o, CommentedMap):
param = list(o)[-1]
return _get_last_obj(o[param], o, param, 'map')
if isinstance(o, CommentedSeq):
param = len(o) - 1
return _get_last_obj(o[param], o, param, 'seq')
return typ, parent, param
param = list(obj)[-1]
if isinstance(obj[param], ScalarString):
enter = '\n'
else:
enter = '\n\n'
typ, parent, param = _get_last_obj(obj[param], obj, param, 'map')
if typ == 'seq':
func = parent.yaml_key_comment_extend
else:
func = parent.yaml_value_comment_extend
if self.remaining:
func(param, [CommentToken(enter, CommentMark(0)), None])
def object_to_yaml(self, key, type_, value, multi, object_path):
if isinstance(value, list):
if key == 'params':
new_values = CommentedMap()
else:
new_values = CommentedSeq()
for v in value:
new_value = self.object_to_yaml(key, type_, v, multi, object_path)
if key == 'params':
new_values.update(new_value)
else:
new_values.append(new_value)
return new_values
if isinstance(value, JinjaCalculation):
jinja = CommentedMap()
jinja_values = formatter(self.config, value.jinja.strip())[:-1]
if key == 'default' and not multi:
jinja["jinja"] = FoldedScalarString(jinja_values)
fold_pos = []
old_i = 0
for i, ltr in enumerate(jinja_values):
if ltr == '\n':
fold_pos.append(i - old_i)
old_i = 1
jinja["jinja"].fold_pos = fold_pos
elif key == 'secret_manager':
return self.object_to_yaml("params", type_, value.params, multi, object_path)
else:
jinja["jinja"] = LiteralScalarString(jinja_values)
if value.return_type:
jinja["return_type"] = value.return_type
if value.description:
jinja["description"] = value.description
if value.params:
jinja["params"] = self.object_to_yaml("params", type_, value.params, multi, object_path)
return jinja
elif isinstance(value, Calculation):
variable_attributes = self.get_object_informations(value, ['path', 'inside_list', 'version', 'xmlfiles', 'attribute_name', 'namespace'])
variable = CommentedMap()
if isinstance(value, (IdentifierCalculation, IdentifierPropertyCalculation)):
variable["type"] = "identifier"
elif isinstance(value, IndexCalculation):
variable["type"] = "index"
elif isinstance(value, NamespaceCalculation):
variable["type"] = "namespace"
for key, default in variable_attributes.items():
val = getattr(value, key)
if val != default and val is not undefined:
variable[key] = val
if "variable" in variable:
variable["variable"] = self.calc_variable_path(object_path, variable["variable"])
if variable.get('type') == 'identifier' and 'identifier' in variable:
del variable["type"]
return variable
elif isinstance(value, Param):
param_attributes = self.get_object_informations(value, ["type", "key", "namespace"])
if list(param_attributes) == ['value']:
variable = value.value
else:
variable = CommentedMap()
if isinstance(value, IdentifierParam):
variable["type"] = "identifier"
elif isinstance(value, IndexParam):
variable["type"] = "index"
elif isinstance(value, NamespaceParam):
variable["type"] = "namespace"
for key, default in param_attributes.items():
val = getattr(value, key)
if val != default and val is not undefined:
variable[key] = val
if variable.get('type') == 'identifier' and 'identifier' in variable:
del variable["type"]
if "variable" in variable:
variable["variable"] = self.calc_variable_path(object_path, variable["variable"])
return {value.key: variable}
elif type_ == 'port' and isinstance(value, str) and value.isnumeric():
return int(value)
elif key == 'help' and '\n' in value:
return LiteralScalarString(value)
return value
def calc_variable_path(self, object_path, variable_path):
if not variable_path.startswith("_"):
common_path = get_common_path(object_path, variable_path)
if not self.rougail.namespace or common_path:
if not common_path:
len_common_path = 0
else:
len_common_path = len(common_path) + 1
relative_object_path = object_path[len_common_path:]
final_path = "_" * (relative_object_path.count(".") + 1) + '.'
return "_" * (relative_object_path.count(".") + 1) + '.' + variable_path[len_common_path:]
return variable_path
def get_object_informations(self, value, excludes=[]):
return {attr: obj.get("default") for attr, obj in value.__class__.model_json_schema()["properties"].items() if attr not in excludes}
def get_parent_name(self, path):
if "." in path:
return path.rsplit(".", 1)
return None, path
def parse_yaml(self, path: str) -> dict:
def _yaml(y):
if not subpath:
return y
name = subpath.pop(0)
if name not in y and name.endswith('{{ identifier }}'):
name = name[:-16]
return _yaml(y[name])
if self.main_namespace:
subpath = path.split('.')[1:]
else:
subpath = path.split('.')
return _yaml(self.original_yaml)
RougailOutput = RougailOutputFormatter
__all__ = ("RougailOutputFormatter",)