""" Silique (https://www.silique.fr) Copyright (C) 2024-2025 This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . """ from io import BytesIO from pathlib import Path from typing import Optional from ruamel.yaml import YAML, CommentedMap from ruamel.yaml.representer import RoundTripRepresenter from ruamel.yaml.tokens import CommentToken from ruamel.yaml.error import CommentMark from ruamel.yaml.comments import CommentedSeq from ruamel.yaml.scalarstring import ( LiteralScalarString, FoldedScalarString, ScalarString, ) from djlint.settings import Config from djlint.reformat import formatter from tiramisu.config import get_common_path from rougail.convert import RougailConvert from rougail.convert.object_model import ( Variable, Family, Calculation, JinjaCalculation, IdentifierCalculation, IdentifierPropertyCalculation, NamespaceCalculation, IdentifierParam, IndexCalculation, VariableCalculation, IndexParam, NamespaceParam, Param, AnyParam, ) from rougail.tiramisu import normalize_family, RENAME_TYPE from rougail.utils import undefined from .upgrade import RougailUpgrade from .__version__ import __version__ def _(text): return text # XXX explicit null def represent_none(self, data): return self.represent_scalar("tag:yaml.org,2002:null", "null") def represent_str(self, data): if data == "": return self.represent_scalar("tag:yaml.org,2002:null", "") return self.represent_scalar("tag:yaml.org,2002:str", data) RoundTripRepresenter.add_representer(type(None), represent_none) RoundTripRepresenter.add_representer(str, represent_str) # XXX class RougailOutputFormatter: output_name = "formatter" def __init__( self, config: "Config", *, rougailconfig: "RougailConfig" = None, user_data_errors: Optional[list] = None, user_data_warnings: Optional[list] = None, ) -> None: self.basic_types = { str: "string", int: "integer", bool: "boolean", float: "float", } if rougailconfig is None: from rougail import RougailConfig rougailconfig = RougailConfig rougailconfig["step.output"] = self.output_name self.rougailconfig = rougailconfig if self.rougailconfig["step.output"] != self.output_name: raise ExtentionError( _('the "step.output" is not set to "{0}"').format(self.output_name) ) # yaml.top_level_colon_align = True self.main_namespace = normalize_family(self.rougailconfig["main_namespace"]) self.has_default_structural_format_version = ( self.rougailconfig["default_structural_format_version"] is not None ) self.config = Config() self.config.profile = "jinja" self.config.line_break_after_multiline_tag = True self.config.indent = " " self.attributes = {} self.yaml = YAML() def run(self): self.upgrade() self.families = {self.main_namespace: CommentedMap()} self.parse() self.yaml.indent(mapping=2, sequence=4, offset=2) self.yaml.version = "1.2" self.yaml.explicit_start = True self.yaml.explicit_end = True self.default_flow_style = False with BytesIO() as ymlfh: families = self.families[self.main_namespace] if not families: self.yaml.dump("", ymlfh) else: self.yaml.dump(families, ymlfh) ret = ymlfh.getvalue().decode("utf-8").strip() return True, ret def get_attributes(self, obj, excludes=[]) -> dict: type_name = obj.__name__ if type_name == "Variable" and excludes == []: raise Exception("pff") if type_name not in self.attributes: self.attributes[type_name] = { str(attr): o.default for attr, o in obj.model_fields.items() if str(attr) not in excludes } return self.attributes[type_name] def upgrade(self) -> None: filenames = self.rougailconfig["main_structural_directories"] if len(filenames) > 1: raise Exception(_("only one file is allowed")) filename = Path(filenames[0]) if not filename.is_file(): raise Exception(_("only a file is allowed")) self.version_name, self.original_yaml = RougailUpgrade(self.rougailconfig).run( filename ) self.version_name, datas = RougailUpgrade(self.rougailconfig).run(filename) self.rougail = RougailConvert(self.rougailconfig) self.rougail.load_config() self.rougail.init() self.filename_str = str(filename) if self.main_namespace is None: self.rougail.namespace = None else: self.rougail.namespace = normalize_family(self.main_namespace) self.rougail.create_namespace(self.main_namespace) self.rougail.validate_file_version( datas, self.filename_str, ) self.rougail.parse_root_file( self.filename_str, self.rougail.namespace, "1.1", datas, ) def print(self): ret, data = self.run() print(data) return ret def parse(self): self.families[self.main_namespace][self.version_name] = float( self.rougail.version ) self.remaining = len(self.rougail.paths._data) for path, obj in self.rougail.paths._data.items(): self.remaining -= 1 if path == self.rougail.namespace: # self.families[path] = self.families[None] continue if isinstance(obj, Family): self.parse_family(path, obj) elif isinstance(obj, Variable): self.parse_variable(path, obj) if list(self.families[self.main_namespace]) != [self.version_name]: self.families[self.main_namespace].yaml_value_comment_extend( self.version_name, [CommentToken("\n\n", CommentMark(0)), None] ) if self.has_default_structural_format_version: del self.families[self.main_namespace][self.version_name] def parse_family(self, path, obj): children = [p.rsplit(".", 1)[-1] for p in self.rougail.parents[path]] parent, name = self.get_parent_name(path) ret = self.families[parent] family = CommentedMap() yaml_data = self.parse_yaml(path) force_keys = [] if isinstance(yaml_data, dict): if yaml_data.get("redefine", False): family["redefine"] = True force_keys = list(yaml_data) if yaml_data.get("exists") is not None: family["exists"] = yaml_data["exists"] force_keys = list(yaml_data) type_ = obj.type if type_ == "dynamic": attributes = self.get_attributes(self.rougail.dynamic) else: attributes = self.get_attributes(self.rougail.family) for attr, default_value in attributes.items(): if attr in ["name", "path", "namespace", "version", "xmlfiles"]: continue try: value = getattr(obj, attr) except AttributeError: continue if attr != "type" and attr not in force_keys and value == default_value: continue if attr in children: attr = f"_{attr}" family[attr] = self.object_to_yaml(attr, type_, value, False, path) if type_ == "dynamic" or (children and type_ == "family"): if "_type" in family: del family["_type"] else: del family["type"] if not set(family): ret[name] = CommentedMap() ret.yaml_value_comment_extend( name, [CommentToken("\n\n", CommentMark(0)), None] ) elif not set(family) - {"description"}: # ret[name] = CommentedMap() add_column = 3 path_len = path.count(".") if self.rougail.namespace: path_len -= 1 column = path_len * 2 + len(name) + add_column if self.remaining: description = family["description"].strip() + "\n\n" else: description = family["description"].strip() ret.yaml_add_eol_comment(description, name, column=column) else: self.add_space(family) ret[name] = family self.families[path] = ret[name] def parse_variable(self, path, obj): parent, name = self.get_parent_name(path) ret = self.families[parent] variable = CommentedMap() yaml_data = self.parse_yaml(path) force_keys = [] if isinstance(yaml_data, dict): if yaml_data.get("redefine", False): variable["redefine"] = True force_keys = list(yaml_data) if yaml_data.get("exists") is not None: variable["exists"] = yaml_data["exists"] force_keys = list(yaml_data) multi = obj.multi or isinstance(obj.default, list) type_ = obj.type if type_ in RENAME_TYPE: type_ = RENAME_TYPE[type_] if type_ == 'cidr' or type_ == 'network_cidr': if type_ == 'cidr': type_ = 'ip' else: type_ = 'network' if not obj.params: obj.params = [] key = 'cidr' param = AnyParam( key='cidr', value=True, type="any", path=None, attribute=None, family_is_dynamic=None, namespace=self.rougail.namespace, xmlfiles=obj.xmlfiles, ) obj.params.append(param) for attr, default_value in self.get_attributes( self.rougail.variable, ["name", "path", "namespace", "version", "xmlfiles"] ).items(): if attr == "type": value = type_ else: try: value = getattr(obj, attr) except AttributeError: continue if attr not in force_keys and value == default_value: continue value = self.object_to_yaml(attr, type_, value, multi, path) variable[attr] = value if variable.get("mandatory") is True and None not in variable.get( "choices", [] ): del variable["mandatory"] if "default" in variable: if isinstance(obj.default, VariableCalculation): is_multi = "multi" in variable and variable["multi"] is True if "type" in variable or is_multi: other_path = self.rougail.paths.get_full_path(obj.default.variable, path) if other_path in self.rougail.paths: other_obj = self.rougail.paths[other_path] if "type" in variable and variable["type"] == other_obj.type: del variable["type"] if is_multi and obj.multi: del variable["multi"] if "type" in variable and isinstance(obj.default, IndexCalculation) and variable["type"] == "integer": del variable["type"] if "type" in variable and variable["type"] in [ "string", "boolean", "integer", "float", ]: if variable["default"] and isinstance(variable["default"], list): tested_value = variable["default"][0] else: tested_value = variable["default"] if variable["type"] == self.basic_types.get(type(tested_value), None): del variable["type"] if ( "multi" in variable and variable["multi"] is True and isinstance(variable["default"], list) ): del variable["multi"] elif variable.get("type") == "choice" and "choices" in variable: del variable["type"] elif variable.get("type") == "string": # default type is string del variable["type"] if set(variable) in [{"multi"}, {"multi", "description"}]: variable["default"] = [] variable.pop("multi") elif variable.get("type") == "boolean" and not multi: # if boolean, the default value is True del variable["type"] variable["default"] = True if ( "default" not in variable and variable.get("multi") is True and not set(variable) - {"default", "description", "multi"} ): variable["default"] = [] del variable["multi"] if not isinstance(variable.get("default"), dict) and not set(variable) - { "default", "description", }: # shorthand notation default = variable.get("default") ret[name] = default add_column = 3 if isinstance(default, list): ret[name] = CommentedSeq() if not default: add_column += 3 for d in default: ret[name].append(d) else: if default is None: ret[name] = "" else: ret[name] = default add_column += len(str(default)) + 1 if "description" in variable: description = variable["description"].strip() if self.remaining and (not multi or not default): description += "\n\n" path_len = path.count(".") if self.rougail.namespace: path_len -= 1 column = path_len * 2 + len(name) + add_column ret.yaml_add_eol_comment(description, name, column=column) if multi and default: self.add_space(ret) else: self.add_space(ret) else: if "default" in variable and variable["default"] is None: variable["default"] = "" ret[name] = variable self.add_space(variable) def add_space(self, obj): def _get_last_obj(o, parent, param, typ): if isinstance(o, CommentedMap): param = list(o)[-1] return _get_last_obj(o[param], o, param, "map") if isinstance(o, CommentedSeq): param = len(o) - 1 return _get_last_obj(o[param], o, param, "seq") return typ, parent, param param = list(obj)[-1] typ, parent, param = _get_last_obj(obj[param], obj, param, "map") if isinstance(parent[param], ScalarString): enter = "\n" else: enter = "\n\n" if typ == "seq": func = parent.yaml_key_comment_extend else: func = parent.yaml_value_comment_extend if self.remaining: func(param, [CommentToken(enter, CommentMark(0)), None]) def object_to_yaml(self, key, type_, value, multi, object_path): if isinstance(value, list): if key == "params": new_values = CommentedMap() else: new_values = CommentedSeq() for v in value: new_value = self.object_to_yaml(key, type_, v, multi, object_path) if key == "params": if "min_number" in new_value: new_value["min_integer"] = new_value.pop("min_number") if "max_number" in new_value: new_value["max_integer"] = new_value.pop("max_number") new_values.update(new_value) else: new_values.append(new_value) return new_values if isinstance(value, JinjaCalculation): jinja = CommentedMap() jinja_values = formatter(self.config, value.jinja.strip())[:-1] if key == "default" and not multi: jinja["jinja"] = FoldedScalarString(jinja_values) fold_pos = [] old_i = 0 for i, ltr in enumerate(jinja_values): if ltr == "\n": fold_pos.append(i - old_i) old_i = 1 jinja["jinja"].fold_pos = fold_pos elif key == "secret_manager": return self.object_to_yaml( "params", type_, value.params, multi, object_path ) else: jinja["jinja"] = LiteralScalarString(jinja_values) if value.return_type: return_type = value.return_type if return_type in RENAME_TYPE: return_type = RENAME_TYPE[return_type] jinja["return_type"] = return_type if value.description: if "\n" in value.description: jinja["description"] = LiteralScalarString(value.description.strip()) else: jinja["description"] = value.description.strip() if value.params: jinja["params"] = self.object_to_yaml( "params", type_, value.params, multi, object_path ) return jinja elif isinstance(value, Calculation): variable_attributes = self.get_attributes( value.__class__, [ "path", "inside_list", "version", "xmlfiles", "attribute_name", "namespace", ], ) variable = CommentedMap() if isinstance( value, (IdentifierCalculation, IdentifierPropertyCalculation) ): variable["type"] = "identifier" elif isinstance(value, IndexCalculation): variable["type"] = "index" elif isinstance(value, NamespaceCalculation): variable["type"] = "namespace" for key, default in variable_attributes.items(): val = getattr(value, key) if val != default and val is not undefined: variable[key] = val if "variable" in variable: variable["variable"] = self.calc_variable_path( object_path, variable["variable"] ) if variable.get("type") == "identifier" and "identifier" in variable: del variable["type"] if value.description: if "\n" in value.description: variable["description"] = LiteralScalarString(value.description.strip()) else: variable["description"] = value.description.strip() return variable elif isinstance(value, Param): param_attributes = self.get_attributes( value.__class__, ["type", "key", "namespace"] ) if list(param_attributes) == ["value"]: variable = value.value else: variable = CommentedMap() if isinstance(value, IdentifierParam): variable["type"] = "identifier" elif isinstance(value, IndexParam): variable["type"] = "index" elif isinstance(value, NamespaceParam): variable["type"] = "namespace" for key, default in param_attributes.items(): val = getattr(value, key) if val != default and val is not undefined: variable[key] = val if variable.get("type") == "identifier" and "identifier" in variable: del variable["type"] if "variable" in variable: variable["variable"] = self.calc_variable_path( object_path, variable["variable"] ) return {value.key: variable} elif type_ == "port" and isinstance(value, str) and value.isnumeric(): return int(value) elif key == "help" and "\n" in value: return LiteralScalarString(value.strip()) return value def calc_variable_path(self, object_path, variable_path): if not variable_path.startswith("_"): common_path = get_common_path(object_path, variable_path) if not self.rougail.namespace or common_path: if not common_path: len_common_path = 0 else: len_common_path = len(common_path) + 1 relative_object_path = object_path[len_common_path:] final_path = "_" * (relative_object_path.count(".") + 1) + "." return ( "_" * (relative_object_path.count(".") + 1) + "." + variable_path[len_common_path:] ) return variable_path def get_parent_name(self, path): if "." in path: return path.rsplit(".", 1) return None, path def parse_yaml(self, path: str) -> dict: def _yaml(y): if not subpath: return y name = subpath.pop(0) if name not in y and name.endswith("{{ identifier }}"): search_name = name[:-16] if search_name not in y: search_name = name.replace("{{ identifier }}", "{{ suffix }}") name = search_name return _yaml(y[name]) if self.main_namespace: subpath = path.split(".")[1:] else: subpath = path.split(".") return _yaml(self.original_yaml) RougailOutput = RougailOutputFormatter __all__ = ("RougailOutputFormatter",)