rougail-output-formatter/src/rougail/output_formatter/__init__.py

567 lines
21 KiB
Python
Raw Normal View History

2024-12-23 20:54:52 +01:00
"""
Silique (https://www.silique.fr)
2025-02-10 09:46:31 +01:00
Copyright (C) 2024-2025
2024-12-23 20:54:52 +01:00
This program is free software: you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from io import BytesIO
from pathlib import Path
from typing import Optional
from ruamel.yaml import YAML, CommentedMap
2025-04-27 09:27:04 +02:00
from ruamel.yaml.representer import RoundTripRepresenter
2024-12-23 20:54:52 +01:00
from ruamel.yaml.tokens import CommentToken
from ruamel.yaml.error import CommentMark
from ruamel.yaml.comments import CommentedSeq
2025-05-11 19:12:45 +02:00
from ruamel.yaml.scalarstring import (
LiteralScalarString,
FoldedScalarString,
ScalarString,
)
2024-12-23 20:54:52 +01:00
2025-03-27 08:36:49 +01:00
from djlint.settings import Config
from djlint.reformat import formatter
2024-12-23 20:54:52 +01:00
from tiramisu.config import get_common_path
from rougail.convert import RougailConvert
2025-06-18 06:39:39 +02:00
from rougail.convert.object_model import (
2025-05-11 19:12:45 +02:00
Variable,
Family,
Calculation,
JinjaCalculation,
IdentifierCalculation,
IdentifierPropertyCalculation,
NamespaceCalculation,
IdentifierParam,
IndexCalculation,
IndexParam,
NamespaceParam,
Param,
)
from rougail.tiramisu import normalize_family, RENAME_TYPE
2025-06-18 06:39:39 +02:00
from rougail.utils import undefined
2024-12-23 20:54:52 +01:00
from .upgrade import RougailUpgrade
2025-04-25 22:54:20 +02:00
from .__version__ import __version__
2024-12-23 20:54:52 +01:00
def _(text):
return text
2025-05-11 19:12:45 +02:00
2025-04-27 09:27:04 +02:00
# XXX explicit null
def represent_none(self, data):
2025-05-11 19:12:45 +02:00
return self.represent_scalar("tag:yaml.org,2002:null", "null")
2025-04-27 09:27:04 +02:00
def represent_str(self, data):
2025-05-11 19:12:45 +02:00
if data == "":
return self.represent_scalar("tag:yaml.org,2002:null", "")
return self.represent_scalar("tag:yaml.org,2002:str", data)
2025-04-27 09:27:04 +02:00
RoundTripRepresenter.add_representer(type(None), represent_none)
RoundTripRepresenter.add_representer(str, represent_str)
# XXX
2024-12-23 20:54:52 +01:00
class RougailOutputFormatter:
2025-05-11 19:12:45 +02:00
output_name = "formatter"
2024-12-23 20:54:52 +01:00
def __init__(
self,
config: "Config",
*,
rougailconfig: "RougailConfig" = None,
user_data_errors: Optional[list] = None,
user_data_warnings: Optional[list] = None,
) -> None:
self.basic_types = {
str: "string",
int: "integer",
2024-12-23 20:54:52 +01:00
bool: "boolean",
float: "float",
}
if rougailconfig is None:
from rougail import RougailConfig
2025-05-11 19:12:45 +02:00
2024-12-23 20:54:52 +01:00
rougailconfig = RougailConfig
rougailconfig["step.output"] = self.output_name
self.rougailconfig = rougailconfig
if self.rougailconfig["step.output"] != self.output_name:
2025-05-11 19:12:45 +02:00
raise ExtentionError(
_('the "step.output" is not set to "{0}"').format(self.output_name)
)
2024-12-23 20:54:52 +01:00
# yaml.top_level_colon_align = True
self.main_namespace = normalize_family(self.rougailconfig["main_namespace"])
2025-09-22 14:24:23 +02:00
self.has_default_structural_format_version = (
self.rougailconfig["default_structural_format_version"] is not None
2025-05-11 19:12:45 +02:00
)
2025-03-27 08:36:49 +01:00
self.config = Config()
2025-05-11 19:12:45 +02:00
self.config.profile = "jinja"
2025-03-27 21:45:29 +01:00
self.config.line_break_after_multiline_tag = True
self.config.indent = " "
self.attributes = {}
self.yaml = YAML()
def run(self):
self.upgrade()
self.families = {self.main_namespace: CommentedMap()}
self.parse()
self.yaml.indent(mapping=2, sequence=4, offset=2)
2025-05-11 19:12:45 +02:00
self.yaml.version = "1.2"
self.yaml.explicit_start = True
self.yaml.explicit_end = True
self.default_flow_style = False
with BytesIO() as ymlfh:
families = self.families[self.main_namespace]
if not families:
2025-05-11 19:12:45 +02:00
self.yaml.dump("", ymlfh)
else:
self.yaml.dump(families, ymlfh)
ret = ymlfh.getvalue().decode("utf-8").strip()
return True, ret
def get_attributes(self, obj, excludes=[]) -> dict:
type_name = obj.__name__
2025-05-11 19:12:45 +02:00
if type_name == "Variable" and excludes == []:
raise Exception("pff")
if type_name not in self.attributes:
2025-05-11 19:12:45 +02:00
self.attributes[type_name] = {
str(attr): o.default
for attr, o in obj.model_fields.items()
if str(attr) not in excludes
}
return self.attributes[type_name]
def upgrade(self) -> None:
2025-09-22 14:24:23 +02:00
filenames = self.rougailconfig["main_structural_directories"]
if len(filenames) > 1:
2025-05-11 19:12:45 +02:00
raise Exception(_("only one file is allowed"))
filename = Path(filenames[0])
if not filename.is_file():
2025-05-11 19:12:45 +02:00
raise Exception(_("only a file is allowed"))
2025-03-27 08:36:49 +01:00
2025-05-11 19:12:45 +02:00
self.version_name, self.original_yaml = RougailUpgrade(self.rougailconfig).run(
filename
)
self.version_name, datas = RougailUpgrade(self.rougailconfig).run(filename)
self.rougail = RougailConvert(self.rougailconfig)
2024-12-23 20:54:52 +01:00
self.rougail.load_config()
self.rougail.init()
self.filename_str = str(filename)
if self.main_namespace is None:
self.rougail.namespace = None
else:
self.rougail.namespace = normalize_family(self.main_namespace)
self.rougail.create_namespace(self.main_namespace)
self.rougail.validate_file_version(
datas,
self.filename_str,
)
self.rougail.parse_root_file(
self.filename_str,
self.rougail.namespace,
"1.1",
datas,
)
def print(self):
2025-02-10 09:46:31 +01:00
ret, data = self.run()
print(data)
return ret
2024-12-23 20:54:52 +01:00
def parse(self):
2025-05-11 19:12:45 +02:00
self.families[self.main_namespace][self.version_name] = float(
self.rougail.version
)
2025-04-27 09:27:04 +02:00
self.remaining = len(self.rougail.paths._data)
2024-12-23 20:54:52 +01:00
for path, obj in self.rougail.paths._data.items():
2025-04-27 09:27:04 +02:00
self.remaining -= 1
2024-12-23 20:54:52 +01:00
if path == self.rougail.namespace:
2025-05-11 19:12:45 +02:00
# self.families[path] = self.families[None]
2024-12-23 20:54:52 +01:00
continue
if isinstance(obj, Family):
self.parse_family(path, obj)
if isinstance(obj, Variable):
self.parse_variable(path, obj)
if list(self.families[self.main_namespace]) != [self.version_name]:
2025-05-11 19:12:45 +02:00
self.families[self.main_namespace].yaml_value_comment_extend(
self.version_name, [CommentToken("\n\n", CommentMark(0)), None]
)
2025-09-22 14:24:23 +02:00
if self.has_default_structural_format_version:
del self.families[self.main_namespace][self.version_name]
2024-12-23 20:54:52 +01:00
def parse_family(self, path, obj):
2025-05-11 19:12:45 +02:00
children = [p.rsplit(".", 1)[-1] for p in self.rougail.parents[path]]
2024-12-23 20:54:52 +01:00
parent, name = self.get_parent_name(path)
ret = self.families[parent]
family = CommentedMap()
yaml_data = self.parse_yaml(path)
force_keys = []
if isinstance(yaml_data, dict):
if yaml_data.get("redefine", False):
family["redefine"] = True
force_keys = list(yaml_data)
if yaml_data.get("exists") is not None:
family["exists"] = yaml_data["exists"]
force_keys = list(yaml_data)
type_ = obj.type
if type_ == "dynamic":
attributes = self.get_attributes(self.rougail.dynamic)
2024-12-23 20:54:52 +01:00
else:
attributes = self.get_attributes(self.rougail.family)
2024-12-23 20:54:52 +01:00
for attr, default_value in attributes.items():
2025-01-04 11:44:09 +01:00
if attr in ["name", "path", "namespace", "version", "xmlfiles"]:
2024-12-23 20:54:52 +01:00
continue
try:
value = getattr(obj, attr)
except AttributeError:
continue
if attr != "type" and attr not in force_keys and value == default_value:
continue
if attr in children:
2025-05-11 19:12:45 +02:00
attr = f"_{attr}"
2024-12-23 20:54:52 +01:00
value = self.object_to_yaml(attr, type_, value, False, path)
family[attr] = value
2025-05-11 19:12:45 +02:00
if type_ == "dynamic" or (children and type_ == "family"):
2024-12-23 20:54:52 +01:00
if "_type" in family:
del family["_type"]
else:
del family["type"]
if not set(family):
ret[name] = CommentedMap()
2025-05-11 19:12:45 +02:00
ret.yaml_value_comment_extend(
name, [CommentToken("\n\n", CommentMark(0)), None]
)
elif not set(family) - {"description"}:
#
2024-12-23 20:54:52 +01:00
ret[name] = CommentedMap()
2025-04-27 09:27:04 +02:00
add_column = 3
2025-05-11 19:12:45 +02:00
path_len = path.count(".")
2025-04-27 09:27:04 +02:00
if self.rougail.namespace:
path_len -= 1
column = path_len * 2 + len(name) + add_column
if self.remaining:
description = family["description"].strip() + "\n\n"
2025-04-27 09:27:04 +02:00
else:
description = family["description"].strip()
2025-04-27 09:27:04 +02:00
ret.yaml_add_eol_comment(description, name, column=column)
2024-12-23 20:54:52 +01:00
else:
self.add_space(family)
ret[name] = family
self.families[path] = ret[name]
def parse_variable(self, path, obj):
parent, name = self.get_parent_name(path)
ret = self.families[parent]
variable = CommentedMap()
yaml_data = self.parse_yaml(path)
force_keys = []
if isinstance(yaml_data, dict):
if yaml_data.get("redefine", False):
variable["redefine"] = True
force_keys = list(yaml_data)
if yaml_data.get("exists") is not None:
variable["exists"] = yaml_data["exists"]
force_keys = list(yaml_data)
multi = obj.multi or isinstance(obj.default, list)
type_ = obj.type
if type_ in RENAME_TYPE:
type_ = RENAME_TYPE[type_]
2025-05-11 19:12:45 +02:00
for attr, default_value in self.get_attributes(
self.rougail.variable, ["name", "path", "namespace", "version", "xmlfiles"]
).items():
if attr == "type":
value = type_
else:
try:
value = getattr(obj, attr)
except AttributeError:
continue
2024-12-23 20:54:52 +01:00
if attr not in force_keys and value == default_value:
continue
value = self.object_to_yaml(attr, type_, value, multi, path)
variable[attr] = value
2025-05-11 19:12:45 +02:00
if variable.get("mandatory") is True and None not in variable.get(
"choices", []
):
2024-12-23 20:54:52 +01:00
del variable["mandatory"]
if "default" in variable:
2025-05-11 19:12:45 +02:00
if "type" in variable and variable["type"] in [
"string",
"boolean",
"integer",
2025-05-11 19:12:45 +02:00
"float",
]:
2024-12-23 20:54:52 +01:00
if variable["default"] and isinstance(variable["default"], list):
tested_value = variable["default"][0]
else:
tested_value = variable["default"]
if variable["type"] == self.basic_types.get(type(tested_value), None):
del variable["type"]
2025-05-11 19:12:45 +02:00
if (
"multi" in variable
and variable["multi"] is True
and isinstance(variable["default"], list)
):
2024-12-23 20:54:52 +01:00
del variable["multi"]
elif variable.get("type") == "choice" and "choices" in variable:
del variable["type"]
elif variable.get("type") == "string":
# default type is string
del variable["type"]
2025-05-11 19:12:45 +02:00
if set(variable) in [{"multi"}, {"multi", "description"}]:
2024-12-23 20:54:52 +01:00
variable["default"] = []
variable.pop("multi")
elif variable.get("type") == "boolean" and not multi:
# if boolean, the default value is True
del variable["type"]
variable["default"] = True
2025-05-11 19:12:45 +02:00
if (
"default" not in variable
and variable.get("multi") is True
and not set(variable) - {"default", "description", "multi"}
):
2025-03-26 19:34:00 +01:00
variable["default"] = []
2025-05-11 19:12:45 +02:00
del variable["multi"]
if not isinstance(variable.get("default"), dict) and not set(variable) - {
"default",
"description",
}:
2024-12-23 20:54:52 +01:00
# shorthand notation
2025-05-11 19:12:45 +02:00
default = variable.get("default")
2024-12-23 20:54:52 +01:00
ret[name] = default
2025-04-27 09:27:04 +02:00
add_column = 3
2024-12-23 20:54:52 +01:00
if isinstance(default, list):
ret[name] = CommentedSeq()
2025-04-27 09:27:04 +02:00
if not default:
add_column += 3
2024-12-23 20:54:52 +01:00
for d in default:
ret[name].append(d)
else:
2025-04-27 09:27:04 +02:00
if default is None:
ret[name] = ""
else:
ret[name] = default
add_column += len(str(default)) + 1
2024-12-23 20:54:52 +01:00
if "description" in variable:
description = variable["description"].strip()
2025-04-27 09:27:04 +02:00
if self.remaining and (not multi or not default):
2024-12-23 20:54:52 +01:00
description += "\n\n"
2025-05-11 19:12:45 +02:00
path_len = path.count(".")
2025-04-27 09:27:04 +02:00
if self.rougail.namespace:
path_len -= 1
column = path_len * 2 + len(name) + add_column
ret.yaml_add_eol_comment(description, name, column=column)
2024-12-23 20:54:52 +01:00
if multi and default:
self.add_space(ret)
else:
self.add_space(ret)
else:
2025-04-27 09:27:04 +02:00
if "default" in variable and variable["default"] is None:
variable["default"] = ""
2024-12-23 20:54:52 +01:00
ret[name] = variable
self.add_space(variable)
def add_space(self, obj):
def _get_last_obj(o, parent, param, typ):
if isinstance(o, CommentedMap):
param = list(o)[-1]
2025-05-11 19:12:45 +02:00
return _get_last_obj(o[param], o, param, "map")
2024-12-23 20:54:52 +01:00
if isinstance(o, CommentedSeq):
param = len(o) - 1
2025-05-11 19:12:45 +02:00
return _get_last_obj(o[param], o, param, "seq")
2024-12-23 20:54:52 +01:00
return typ, parent, param
2025-05-11 19:12:45 +02:00
2024-12-23 20:54:52 +01:00
param = list(obj)[-1]
typ, parent, param = _get_last_obj(obj[param], obj, param, "map")
if isinstance(parent[param], ScalarString):
2025-05-11 19:12:45 +02:00
enter = "\n"
2025-03-30 19:39:29 +02:00
else:
2025-05-11 19:12:45 +02:00
enter = "\n\n"
if typ == "seq":
2024-12-23 20:54:52 +01:00
func = parent.yaml_key_comment_extend
else:
func = parent.yaml_value_comment_extend
2025-04-27 09:27:04 +02:00
if self.remaining:
func(param, [CommentToken(enter, CommentMark(0)), None])
2024-12-23 20:54:52 +01:00
def object_to_yaml(self, key, type_, value, multi, object_path):
if isinstance(value, list):
2025-05-11 19:12:45 +02:00
if key == "params":
2024-12-23 20:54:52 +01:00
new_values = CommentedMap()
else:
new_values = CommentedSeq()
for v in value:
new_value = self.object_to_yaml(key, type_, v, multi, object_path)
2025-05-11 19:12:45 +02:00
if key == "params":
if "min_number" in new_value:
new_value["min_integer"] = new_value.pop("min_number")
if "max_number" in new_value:
new_value["max_integer"] = new_value.pop("max_number")
2024-12-23 20:54:52 +01:00
new_values.update(new_value)
else:
new_values.append(new_value)
return new_values
if isinstance(value, JinjaCalculation):
jinja = CommentedMap()
2025-03-27 08:36:49 +01:00
jinja_values = formatter(self.config, value.jinja.strip())[:-1]
2025-05-11 19:12:45 +02:00
if key == "default" and not multi:
2025-03-27 08:36:49 +01:00
jinja["jinja"] = FoldedScalarString(jinja_values)
fold_pos = []
old_i = 0
for i, ltr in enumerate(jinja_values):
2025-05-11 19:12:45 +02:00
if ltr == "\n":
2025-03-27 08:36:49 +01:00
fold_pos.append(i - old_i)
old_i = 1
jinja["jinja"].fold_pos = fold_pos
2025-05-11 19:12:45 +02:00
elif key == "secret_manager":
return self.object_to_yaml(
"params", type_, value.params, multi, object_path
)
2024-12-23 20:54:52 +01:00
else:
jinja["jinja"] = LiteralScalarString(jinja_values)
if value.return_type:
return_type = value.return_type
if return_type in RENAME_TYPE:
return_type = RENAME_TYPE[return_type]
jinja["return_type"] = return_type
2024-12-23 20:54:52 +01:00
if value.description:
2025-05-11 19:12:45 +02:00
if "\n" in value.description:
jinja["description"] = LiteralScalarString(value.description.strip())
else:
jinja["description"] = value.description.strip()
2024-12-23 20:54:52 +01:00
if value.params:
2025-05-11 19:12:45 +02:00
jinja["params"] = self.object_to_yaml(
"params", type_, value.params, multi, object_path
)
2024-12-23 20:54:52 +01:00
return jinja
elif isinstance(value, Calculation):
2025-05-11 19:12:45 +02:00
variable_attributes = self.get_attributes(
value.__class__,
[
"path",
"inside_list",
"version",
"xmlfiles",
"attribute_name",
"namespace",
],
)
2024-12-23 20:54:52 +01:00
variable = CommentedMap()
2025-05-11 19:12:45 +02:00
if isinstance(
value, (IdentifierCalculation, IdentifierPropertyCalculation)
):
2024-12-23 20:54:52 +01:00
variable["type"] = "identifier"
elif isinstance(value, IndexCalculation):
2024-12-23 20:54:52 +01:00
variable["type"] = "index"
elif isinstance(value, NamespaceCalculation):
variable["type"] = "namespace"
2024-12-23 20:54:52 +01:00
for key, default in variable_attributes.items():
val = getattr(value, key)
if val != default and val is not undefined:
variable[key] = val
if "variable" in variable:
2025-05-11 19:12:45 +02:00
variable["variable"] = self.calc_variable_path(
object_path, variable["variable"]
)
if variable.get("type") == "identifier" and "identifier" in variable:
2024-12-23 20:54:52 +01:00
del variable["type"]
if value.description:
2025-05-11 19:12:45 +02:00
if "\n" in value.description:
variable["description"] = LiteralScalarString(value.description.strip())
else:
variable["description"] = value.description.strip()
2024-12-23 20:54:52 +01:00
return variable
elif isinstance(value, Param):
2025-05-11 19:12:45 +02:00
param_attributes = self.get_attributes(
value.__class__, ["type", "key", "namespace"]
)
if list(param_attributes) == ["value"]:
2024-12-23 20:54:52 +01:00
variable = value.value
else:
variable = CommentedMap()
if isinstance(value, IdentifierParam):
variable["type"] = "identifier"
elif isinstance(value, IndexParam):
2024-12-23 20:54:52 +01:00
variable["type"] = "index"
elif isinstance(value, NamespaceParam):
variable["type"] = "namespace"
2024-12-23 20:54:52 +01:00
for key, default in param_attributes.items():
val = getattr(value, key)
if val != default and val is not undefined:
variable[key] = val
2025-05-11 19:12:45 +02:00
if variable.get("type") == "identifier" and "identifier" in variable:
2024-12-23 20:54:52 +01:00
del variable["type"]
if "variable" in variable:
2025-05-11 19:12:45 +02:00
variable["variable"] = self.calc_variable_path(
object_path, variable["variable"]
)
2024-12-23 20:54:52 +01:00
return {value.key: variable}
2025-05-11 19:12:45 +02:00
elif type_ == "port" and isinstance(value, str) and value.isnumeric():
2024-12-23 20:54:52 +01:00
return int(value)
2025-05-11 19:12:45 +02:00
elif key == "help" and "\n" in value:
return LiteralScalarString(value.strip())
2024-12-23 20:54:52 +01:00
return value
def calc_variable_path(self, object_path, variable_path):
if not variable_path.startswith("_"):
common_path = get_common_path(object_path, variable_path)
if not self.rougail.namespace or common_path:
if not common_path:
len_common_path = 0
else:
len_common_path = len(common_path) + 1
relative_object_path = object_path[len_common_path:]
2025-05-11 19:12:45 +02:00
final_path = "_" * (relative_object_path.count(".") + 1) + "."
return (
"_" * (relative_object_path.count(".") + 1)
+ "."
+ variable_path[len_common_path:]
)
2024-12-23 20:54:52 +01:00
return variable_path
def get_parent_name(self, path):
if "." in path:
return path.rsplit(".", 1)
return None, path
def parse_yaml(self, path: str) -> dict:
def _yaml(y):
if not subpath:
return y
name = subpath.pop(0)
2025-05-11 19:12:45 +02:00
if name not in y and name.endswith("{{ identifier }}"):
search_name = name[:-16]
if search_name not in y:
search_name = name.replace("{{ identifier }}", "{{ suffix }}")
name = search_name
2024-12-23 20:54:52 +01:00
return _yaml(y[name])
2025-05-11 19:12:45 +02:00
2024-12-23 20:54:52 +01:00
if self.main_namespace:
2025-05-11 19:12:45 +02:00
subpath = path.split(".")[1:]
2024-12-23 20:54:52 +01:00
else:
2025-05-11 19:12:45 +02:00
subpath = path.split(".")
2024-12-23 20:54:52 +01:00
return _yaml(self.original_yaml)
RougailOutput = RougailOutputFormatter
__all__ = ("RougailOutputFormatter",)