rougail-output-formatter/src/rougail/output_formatter/__init__.py

389 lines
16 KiB
Python
Raw Normal View History

2024-12-23 20:54:52 +01:00
"""
Silique (https://www.silique.fr)
Copyright (C) 2024
This program is free software: you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from io import BytesIO
from pathlib import Path
from typing import Optional
from ruamel.yaml import YAML, CommentedMap
from ruamel.yaml.tokens import CommentToken
from ruamel.yaml.error import CommentMark
from ruamel.yaml.comments import CommentedSeq
from ruamel.yaml.scalarstring import LiteralScalarString, FoldedScalarString
from tiramisu import undefined
from tiramisu.config import get_common_path
from rougail.convert import RougailConvert
from rougail.object_model import Variable, Family, Calculation, JinjaCalculation, IdentifierCalculation, IdentifierPropertyCalculation, IdentifierParam, IndexCalculation, IndexParam, Param
from rougail.utils import normalize_family
from .upgrade import RougailUpgrade
def _(text):
return text
class RougailOutputFormatter:
output_name = 'formatter'
def __init__(
self,
config: "Config",
*,
rougailconfig: "RougailConfig" = None,
user_data_errors: Optional[list] = None,
user_data_warnings: Optional[list] = None,
) -> None:
self.basic_types = {
str: "string",
int: "number",
bool: "boolean",
float: "float",
}
if rougailconfig is None:
from rougail import RougailConfig
rougailconfig = RougailConfig
rougailconfig["step.output"] = self.output_name
if rougailconfig["step.output"] != self.output_name:
raise ExtentionError(_('the "step.output" is not set to "{0}"').format(self.output_name))
# yaml.top_level_colon_align = True
self.main_namespace = rougailconfig["main_namespace"]
filenames = rougailconfig["main_dictionaries"]
if len(rougailconfig["main_dictionaries"]) > 1:
raise Exception(_('only one file is allowed'))
filename = Path(filenames[0])
if not filename.is_file():
raise Exception(_('only a file is allowed'))
self.original_yaml = RougailUpgrade(rougailconfig).run(filename)
datas = RougailUpgrade(rougailconfig).run(filename)
self.rougail = RougailConvert(rougailconfig)
self.rougail.load_config()
self.rougail.init()
self.filename_str = str(filename)
if self.main_namespace is None:
self.rougail.namespace = None
else:
self.rougail.namespace = normalize_family(self.main_namespace)
self.rougail.create_namespace(self.main_namespace)
self.rougail.validate_file_version(
datas,
self.filename_str,
)
self.rougail.parse_root_file(
self.filename_str,
self.rougail.namespace,
"1.1",
datas,
)
self.yaml = YAML()
def run(self):
self.families_attributes = {attr: obj.get("default") for attr, obj in self.rougail.family.model_json_schema()["properties"].items()}
self.dynamics_attributes = {attr: obj.get("default") for attr, obj in self.rougail.dynamic.model_json_schema()["properties"].items()}
self.variables_attributes = {attr: obj.get("default") for attr, obj in self.rougail.variable.model_json_schema()["properties"].items()}
self.families = {None: CommentedMap()}
self.parse()
self.yaml.indent(mapping=2, sequence=4, offset=2)
self.yaml.explicit_start=True
self.default_flow_style = False
with BytesIO() as ymlfh:
self.yaml.dump(self.families[None], ymlfh)
ret = ymlfh.getvalue().decode("utf-8").strip() + '\n'
return ret
def print(self):
print(self.run())
def parse(self):
# FIXME path to relative !
if self.rougail.namespace:
version_path = f'{self.rougail.namespace}.version'
else:
version_path = 'version'
if version_path in self.rougail.paths._data:
version_name = '_version'
else:
version_name = 'version'
self.families[None][version_name] = None
self.families[None].yaml_value_comment_extend(version_name, [CommentToken('\n\n', CommentMark(0)), None])
version = None
for path, obj in self.rougail.paths._data.items():
if version is None or version == '':
version = obj.version
if path == self.rougail.namespace:
self.families[path] = self.families[None]
continue
if isinstance(obj, Family):
self.parse_family(path, obj)
if isinstance(obj, Variable):
self.parse_variable(path, obj)
if not version:
raise Exception(_(f'no variables in file {self.filename_str}'))
self.families[None][version_name] = float(version)
def parse_family(self, path, obj):
children = [p.rsplit('.', 1)[-1] for p in self.rougail.parents[path]]
parent, name = self.get_parent_name(path)
ret = self.families[parent]
family = CommentedMap()
yaml_data = self.parse_yaml(path)
force_keys = []
if isinstance(yaml_data, dict):
if yaml_data.get("redefine", False):
family["redefine"] = True
force_keys = list(yaml_data)
if yaml_data.get("exists") is not None:
family["exists"] = yaml_data["exists"]
force_keys = list(yaml_data)
type_ = obj.type
if type_ == "dynamic":
attributes = self.dynamics_attributes
else:
attributes = self.families_attributes
for attr, default_value in attributes.items():
if attr in ["name", "path", "namespace", "version", "path_prefix", "xmlfiles"]:
continue
try:
value = getattr(obj, attr)
except AttributeError:
continue
if attr != "type" and attr not in force_keys and value == default_value:
continue
if attr in children:
attr = f'_{attr}'
value = self.object_to_yaml(attr, type_, value, False, path)
family[attr] = value
if type_ == "dynamic" or (children and type_ == 'family'):
if "_type" in family:
del family["_type"]
else:
del family["type"]
if not set(family):
ret[name] = CommentedMap()
ret.yaml_value_comment_extend(name, [CommentToken('\n\n', CommentMark(0)), None])
elif not set(family) - {'description'}:
#
ret[name] = CommentedMap()
ret.yaml_add_eol_comment(family["description"] + '\n\n', name)
else:
self.add_space(family)
ret[name] = family
self.families[path] = ret[name]
def parse_variable(self, path, obj):
parent, name = self.get_parent_name(path)
ret = self.families[parent]
variable = CommentedMap()
yaml_data = self.parse_yaml(path)
force_keys = []
if isinstance(yaml_data, dict):
if yaml_data.get("redefine", False):
variable["redefine"] = True
force_keys = list(yaml_data)
if yaml_data.get("exists") is not None:
variable["exists"] = yaml_data["exists"]
force_keys = list(yaml_data)
multi = obj.multi or isinstance(obj.default, list)
type_ = obj.type
for attr, default_value in self.variables_attributes.items():
if attr in ["name", "path", "namespace", "version", "path_prefix", "xmlfiles"]:
continue
try:
value = getattr(obj, attr)
except AttributeError:
continue
if attr not in force_keys and value == default_value:
continue
value = self.object_to_yaml(attr, type_, value, multi, path)
variable[attr] = value
if variable.get("mandatory") is True and None not in variable.get("choices", []):
del variable["mandatory"]
if "default" in variable:
if "type" in variable and variable["type"] in ["string", "boolean", "number", "float"]:
if variable["default"] and isinstance(variable["default"], list):
tested_value = variable["default"][0]
else:
tested_value = variable["default"]
if variable["type"] == self.basic_types.get(type(tested_value), None):
del variable["type"]
if "multi" in variable and variable["multi"] is True and isinstance(variable["default"], list):
del variable["multi"]
elif variable.get("type") == "choice" and "choices" in variable:
del variable["type"]
elif variable.get("type") == "string":
# default type is string
del variable["type"]
if set(variable) in [{"multi"}, {'multi', 'description'}]:
variable["default"] = []
variable.pop("multi")
elif variable.get("type") == "boolean" and not multi:
# if boolean, the default value is True
del variable["type"]
variable["default"] = True
if not isinstance(variable.get("default"), dict) and not set(variable) - {'default', 'description'}:
# shorthand notation
default = variable.get('default')
ret[name] = default
if isinstance(default, list):
ret[name] = CommentedSeq()
for d in default:
ret[name].append(d)
else:
ret[name] = default
if "description" in variable:
description = variable["description"]
if not multi or not default:
description += "\n\n"
ret.yaml_add_eol_comment(description, name)
if multi and default:
self.add_space(ret)
else:
self.add_space(ret)
else:
ret[name] = variable
self.add_space(variable)
def add_space(self, obj):
def _get_last_obj(o, parent, param, typ):
if isinstance(o, CommentedMap):
param = list(o)[-1]
return _get_last_obj(o[param], o, param, 'map')
if isinstance(o, CommentedSeq):
param = len(o) - 1
return _get_last_obj(o[param], o, param, 'seq')
return typ, parent, param
param = list(obj)[-1]
typ, parent, param = _get_last_obj(obj[param], obj, param, 'map')
if typ == 'seq':
func = parent.yaml_key_comment_extend
else:
func = parent.yaml_value_comment_extend
func(param, [CommentToken('\n\n', CommentMark(0)), None])
def object_to_yaml(self, key, type_, value, multi, object_path):
if isinstance(value, list):
if key == 'params':
new_values = CommentedMap()
else:
new_values = CommentedSeq()
for v in value:
new_value = self.object_to_yaml(key, type_, v, multi, object_path)
if key == 'params':
new_values.update(new_value)
else:
new_values.append(new_value)
return new_values
if isinstance(value, JinjaCalculation):
jinja = CommentedMap()
# replace \n to space a add index of \n (now a space) to fold_pos
jinja_values = value.jinja.strip()
if key == 'default' and not multi:
jinja["jinja"] = FoldedScalarString(jinja_values.replace('\n', ' '))
jinja["jinja"].fold_pos = [i for i, ltr in enumerate(jinja_values) if ltr == '\n']
else:
jinja["jinja"] = LiteralScalarString(jinja_values)
if value.return_type:
jinja["return_type"] = value.return_type
if value.description:
jinja["description"] = value.description
if value.params:
jinja["params"] = self.object_to_yaml("params", type_, value.params, multi, object_path)
return jinja
elif isinstance(value, Calculation):
variable_attributes = self.get_object_informations(value, ['path', 'inside_list', 'version', 'xmlfiles', 'attribute_name', 'namespace'])
variable = CommentedMap()
if isinstance(value, (IdentifierCalculation, IdentifierPropertyCalculation)):
variable["type"] = "identifier"
if isinstance(value, IndexCalculation):
variable["type"] = "index"
for key, default in variable_attributes.items():
val = getattr(value, key)
if val != default and val is not undefined:
variable[key] = val
if "variable" in variable:
variable["variable"] = self.calc_variable_path(object_path, variable["variable"])
if variable.get('type') == 'identifier' and 'identifier' in variable:
del variable["type"]
return variable
elif isinstance(value, Param):
param_attributes = self.get_object_informations(value, ["type", "key"])
if list(param_attributes) == ['value']:
variable = value.value
else:
variable = CommentedMap()
if isinstance(value, IdentifierParam):
variable["type"] = "identifier"
if isinstance(value, IndexParam):
variable["type"] = "index"
for key, default in param_attributes.items():
val = getattr(value, key)
if val != default and val is not undefined:
variable[key] = val
if variable.get('type') == 'identifier' and 'identifier' in variable:
del variable["type"]
if "variable" in variable:
variable["variable"] = self.calc_variable_path(object_path, variable["variable"])
return {value.key: variable}
elif type_ == 'port' and isinstance(value, str) and value.isnumeric():
return int(value)
return value
def calc_variable_path(self, object_path, variable_path):
if not variable_path.startswith("_"):
common_path = get_common_path(object_path, variable_path)
if not self.rougail.namespace or common_path:
if not common_path:
len_common_path = 0
else:
len_common_path = len(common_path) + 1
relative_object_path = object_path[len_common_path:]
final_path = "_" * (relative_object_path.count(".") + 1) + '.'
return "_" * (relative_object_path.count(".") + 1) + '.' + variable_path[len_common_path:]
return variable_path
def get_object_informations(self, value, excludes=[]):
return {attr: obj.get("default") for attr, obj in value.__class__.model_json_schema()["properties"].items() if attr not in excludes}
def get_parent_name(self, path):
if "." in path:
return path.rsplit(".", 1)
return None, path
def parse_yaml(self, path: str) -> dict:
def _yaml(y):
if not subpath:
return y
name = subpath.pop(0)
if name not in y and name.endswith('{{ identifier }}'):
name = name[:-16]
return _yaml(y[name])
if self.main_namespace:
subpath = path.split('.')[1:]
else:
subpath = path.split('.')
return _yaml(self.original_yaml)
RougailOutput = RougailOutputFormatter
__all__ = ("RougailOutputFormatter",)