rougail/src/rougail/convert.py
2023-11-02 22:14:36 +01:00

1156 lines
43 KiB
Python

"""Takes a bunch of Rougail XML dispatched in differents folders
as an input and outputs a Tiramisu's file.
Created by:
EOLE (http://eole.orion.education.fr)
Copyright (C) 2005-2018
Forked by:
Cadoles (http://www.cadoles.com)
Copyright (C) 2019-2021
Silique (https://www.silique.fr)
Copyright (C) 2022-2023
distribued with GPL-2 or later license
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
Sample usage::
>>> from rougail import RougailConvert
>>> rougail = RougailConvert()
>>> tiramisu = rougail.save('tiramisu.py')
The Rougail
- loads the XML into an internal RougailObjSpace representation
- visits/annotates the objects
- dumps the object space as Tiramisu string
The visit/annotation stage is a complex step that corresponds to the Rougail
procedures.
"""
#from typing import List
#from tiramisu import Config
#
#from .i18n import _
#from .config import RougailConfig
#from .objspace import RougailObjSpace
#from .reflector import Reflector
#from .tiramisureflector import TiramisuReflector
#from .annotator import SpaceAnnotator
#from .error import DictConsistencyError
##from .providersupplier import provider_supplier
#from .utils import normalize_family
#class RougailConvert:
# """Rougail object
# """
# def __init__(self,
# rougailconfig: RougailConfig=None,
# just_doc: bool=False,
# ) -> None:
# if rougailconfig is None:
# rougailconfig = RougailConfig
# self.rougailconfig = rougailconfig
# xmlreflector = Reflector(self.rougailconfig)
# self.rougailobjspace = RougailObjSpace(xmlreflector,
# self.rougailconfig,
# just_doc,
# )
# self.internal_functions = self.rougailconfig['internal_functions']
# self.dictionaries = False
# self.annotator = False
# self.reflector = None
#
# def load_dictionaries(self,
# path_prefix: str=None,
# ) -> None:
# self.rougailobjspace.paths.set_path_prefix(normalize_family(path_prefix))
# self._load_dictionaries(self.rougailobjspace.xmlreflector,
# self.rougailconfig['variable_namespace'],
# self.rougailconfig['dictionaries_dir'],
# path_prefix,
# self.rougailconfig['variable_namespace_description'],
# )
# for namespace, extra_dir in self.rougailconfig['extra_dictionaries'].items():
# if namespace in ['services', self.rougailconfig['variable_namespace']]:
# msg = _(f'Namespace name "{namespace}" is not allowed')
# raise DictConsistencyError(msg, 21, None)
# self._load_dictionaries(self.rougailobjspace.xmlreflector,
# namespace,
# extra_dir,
# path_prefix,
# )
## if hasattr(self.rougailobjspace.space, 'variables'):
## provider_supplier(self.rougailobjspace,
## path_prefix,
## )
# self.dictionaries = True
#
# def _load_dictionaries(self,
# xmlreflector: Reflector,
# namespace: str,
# xmlfolders: List[str],
# path_prefix: str,
# namespace_description: str=None,
# ) -> List[str]:
# for xmlfile, document in xmlreflector.load_dictionaries_from_folders(xmlfolders, self.rougailobjspace.just_doc):
# self.rougailobjspace.xml_parse_document(xmlfile,
# document,
# namespace,
# namespace_description,
# path_prefix,
# )
#
# def annotate(self):
# if self.annotator:
# raise DictConsistencyError(_('Cannot execute annotate multiple time'), 85, None)
# SpaceAnnotator(self.rougailobjspace)
# self.annotator = True
#
# def reflexion(self,
# exclude_imports: list=[],
# ):
# if not self.dictionaries:
# self.load_dictionaries()
# if not self.annotator:
# self.annotate()
# if self.reflector:
# raise DictConsistencyError(_('Cannot execute reflexion multiple time'), 86, None)
# functions_file = self.rougailconfig['functions_file']
# if not isinstance(functions_file, list):
# functions_file = [functions_file]
# functions_file = [func for func in functions_file if func not in exclude_imports]
# self.reflector = TiramisuReflector(self.rougailobjspace,
# functions_file,
# self.internal_functions,
# self.rougailconfig,
# )
#
# def save(self,
# filename: str,
# ) -> str:
# """Return tiramisu object declaration as a string
# """
# if self.reflector is None:
# self.reflexion()
# output = self.reflector.get_text() + '\n'
# if filename:
# with open(filename, 'w') as tiramisu:
# tiramisu.write(output)
# return output
from typing import Optional, Union, get_type_hints, Any, Literal, List, Dict
from pydantic import BaseModel, StrictBool, StrictInt, StrictFloat, StrictStr
from pydantic.error_wrappers import ValidationError
from yaml import safe_load
from os import listdir
from os.path import join, isdir
from .annotator import SpaceAnnotator
from .tiramisureflector import TiramisuReflector
from .utils import normalize_family, Calculation
BASETYPE_CALC = Union[StrictBool, StrictInt, StrictFloat, StrictStr, Calculation]
#for typ, values in _VALIDATORS:
# if typ in [bool, int, float, str]:
# values.pop(0)
class Family(BaseModel):
name: str
description: Optional[str]=None
type: Literal['family', 'leadership', 'dynamic']='family'
help: Optional[str]=None
mode: Optional[str]=None
hidden: Union[bool, Calculation]=False
disabled: Union[bool, Calculation]=False
xmlfiles: List[str]=[]
path: str
class Config:
arbitrary_types_allowed = True
class Dynamic(Family):
variable: str
class Variable(BaseModel):
name: str
type: Literal['number', 'float', 'string', 'password', 'secret', 'mail', 'boolean', 'filename', 'date', 'unix_user', 'ip', 'local_ip', 'netmask', 'network', 'broadcast', 'netbios', 'domainname', 'hostname', 'web_address', 'port', 'mac', 'cidr', 'network_cidr', 'choice', 'unix_permissions']='string'
description: Optional[str]=None
default: Union[BASETYPE_CALC, List[BASETYPE_CALC]]=None
validators: List[BASETYPE_CALC]=None
multi: bool=False
unique: Optional[bool]=None
help: Optional[str]=None
hidden: Union[bool, Calculation]=False
disabled: Union[bool, Calculation]=False
mandatory: Union[None, bool, Calculation]=None
auto_save: bool=False
mode: Optional[str]=None
#provider: Optional[str]=None
#supplier: Optional[str]=None
test: Any=None
xmlfiles: List[str]=[]
path: str
class Config:
arbitrary_types_allowed = True
class Choice(Variable):
choices: Union[List[BASETYPE_CALC], Calculation]=None
class SymLink(BaseModel):
name: str
type: str='symlink'
opt: Variable
xmlfiles: List[str]=[]
path: str
class Service(BaseModel):
name: str
manage: bool=True
type: Literal['service', 'mount', 'swap', 'timer', 'target']='service'
disabled: Union[bool, Calculation]=False
engine: Optional[str]=None
target: Optional[str]=None
undisable: bool=False
xmlfiles: List[str]=[]
class Config:
arbitrary_types_allowed = True
class IP(BaseModel):
name: str
netmask: Optional[str]=None
disabled: Union[bool, Calculation]=False
xmlfiles: List[str]=[]
class Config:
arbitrary_types_allowed = True
class Certificate(BaseModel):
name: str
authority: str
mode: Optional[int]=None
owner: Union[None, str, Dict[str, str]]=None
group: Union[None, str, Dict[str, str]]=None
server: Union[None, str, Dict[str, str]]=None
domain: Union[None, str, Dict[str, str]]=None
provider: Optional[str]=None
format: Literal['cert_key', 'pem']='cert_key'
type: Literal['client', 'server']='client'
disabled: Union[bool, Calculation]=False
xmlfiles: List[str]=[]
class Config:
arbitrary_types_allowed = True
class File(BaseModel):
name: str
type: Literal['string', 'variable']='string'
mode: Optional[int]=None
owner: Union[None, str, Dict[str, str]]=None
group: Union[None, str, Dict[str, str]]=None
source: Union[None, str, Dict[str, str]]=None
engine: Optional[str]=None
variable: Optional[str]=None
included: Literal['no', 'name', 'content']='no'
disabled: Union[bool, Calculation]=False
xmlfiles: List[str]=[]
class Config:
arbitrary_types_allowed = True
class Override(BaseModel):
name: str
type: Literal['string', 'variable']='string'
source: Optional[str]=None
engine: Optional[str]=None
xmlfiles: List[str]=[]
class Property:
def __init__(self) -> None:
self._properties = {}
def add(self,
path: str,
property_: str,
value: Union[True, Calculation],
) -> None:
self._properties.setdefault(path, {})[property_] = value
def __getitem__(self,
path: str,
) -> list:
return self._properties.get(path, {})
def __contains__(self,
path: str,
) -> bool:
return path in self._properties
#class Appendable:
# def __init__(self) -> None:
# self._data = {}
#
# def add(self,
# path: str,
# data: str,
# ) -> None:
# if path in self._data:
# raise Exception('hu?')
# self._data[path] = data
#
# def append(self,
# path: str,
# data: str,
# ) -> None:
# if path not in self._data:
# self._data[path] = []
# self._data[path].append(data)
#
# def __getitem__(self,
# path: str,
# ) -> list:
# return self._data.get(path, [])
#
# def __contains__(self,
# path: str,
# ) -> bool:
# return path in self._data
class Paths:
def __init__(self) -> None:
self._data = {}
self._dynamics = []
def add(self,
path: str,
data: Any,
is_dynamic: bool,
force: bool=False,
) -> None:
if not force and path in self._data:
raise Exception('pfff')
self._data[path] = data
if is_dynamic:
self._dynamics.append(path)
def get_with_dynamic(self,
path: str,
) -> Any:
suffix = None
dynamic_path = None
if not path in self._data:
for dynamic in self._dynamics:
if path.startswith(dynamic):
subpaths = path[len(dynamic):].split('.')
if len(subpaths) > 1 and subpaths[0]:
dynamic_path = dynamic
suffix = subpaths[0]
len_suffix = len(suffix)
for subpath in subpaths[1:]:
if not subpath.endswith(suffix):
suffix = None
break
dynamic_path += '.' + subpath[:-len_suffix]
if dynamic_path not in self._dynamics:
suffix = None
break
if suffix:
break
if suffix is None and not path in self._data:
return None, None
if suffix:
path = dynamic_path
return self._data[path], suffix
def __getitem__(self,
path: str,
) -> dict:
if not path in self._data:
raise Exception(f'cannot find variable or family {path}')
return self._data[path]
def __contains__(self,
path: str,
) -> bool:
return path in self._data
def __delitem__(self,
path: str,
) -> None:
del self._data[path]
def get(self):
return self._data.values()
class Informations:
def __init__(self) -> None:
self._data = {}
def add(self,
path: str,
key: str,
data: Any,
) -> None:
if data is None:
raise Exception('connard')
if path not in self._data:
self._data[path] = {}
if key in self._data[path]:
raise Exception(f'already key {key} in {path}')
self._data[path][key] = data
def get(self,
path: str,
) -> List[str]:
return self._data.get(path, [])
class ParserVariable:
def __init__(self):
self.paths = Paths()
self.families = []
self.variables = []
self.parents = {'.': []}
self.index = 0
self.reflector_names = {}
self.leaders = []
self.followers = []
self.multis = {}
self.default_multi = {}
self.jinja = {}
#
self.family = Family
self.dynamic = Dynamic
self.variable = Variable
self.choice = Choice
#FIXME
self.exclude_imports = []
self.informations = Informations()
self.properties = Property()
# self.choices = Appendable()
self.has_dyn_option = False
super().__init__()
def reflexion(self):
hint = get_type_hints(self.dynamic)
self.family_types = hint['type'].__args__
self.family_attrs = frozenset(set(hint) | {'redefine'} - {'name', 'path', 'xmlfiles'})
self.family_calculations = self.search_calculation(hint)
#
hint= get_type_hints(self.variable)
self.variable_types = hint['type'].__args__
#
hint= get_type_hints(self.choice)
self.choice_attrs = frozenset(set(hint) | {'redefine', 'exists'} - {'name', 'path', 'xmlfiles'})
self.choice_calculations = self.search_calculation(hint)
super().reflexion()
def parse_variable_file(self,
filename: str,
namespace: str,
# description: str,
) -> None:
with open(filename) as o:
objects = safe_load(o)
self.validate_file_version(objects)
self.parse_family(filename, namespace, '.', namespace, {})
for name, obj in objects.items():
self.family_or_variable(filename,
name,
namespace,
obj,
)
def get_family_or_variable_type(self,
obj: dict,
) -> Optional[str]:
if '_type' in obj:
# only variable and family has _type attributs
return obj['_type']
if 'type' in obj and \
isinstance(obj['type'], str):
return obj['type']
return
def family_or_variable(self,
filename: str,
name: str,
subpath: str,
obj: Optional[dict],
first_variable: bool=False,
family_is_leadership: bool=False,
family_is_dynamic: bool=False,
) -> None:
if subpath:
path = f'{subpath}.{name}'
else:
path = name
if obj is None:
obj = {}
typ = self.is_family_or_variable(path,
obj,
family_is_leadership,
)
if typ == 'family':
parser = self.parse_family
else:
parser = self.parse_variable
if name.startswith('_'):
raise Exception('forbidden!')
parser(filename,
name,
subpath,
path,
obj,
first_variable,
family_is_leadership,
family_is_dynamic,
)
def is_family_or_variable(self,
path: str,
obj: dict,
family_is_leadership: bool,
) -> bool:
# it's already has a variable or a family
if path in self.paths:
if path in self.families:
return 'family'
return 'variable'
# it's:
# my_variable:
if not obj:
return 'variable'
# it has a type:
# my_variable:
# type: string
obj_type = self.get_family_or_variable_type(obj)
if obj_type:
if obj_type in self.family_types:
return 'family'
if obj_type in self.variable_types:
return 'variable'
raise Exception(f'unknown type {obj_type}')
# variable is in a leadership
if family_is_leadership:
return 'variable'
# all attributes are in variable object
extra_keys = set(obj) - self.choice_attrs
if not extra_keys:
return 'variable'
# check all attributs not known in family
for key in set(obj) - self.family_attrs:
# family attribute can start with '_'
if key.startswith('_') and key[1:] in self.family_attrs:
continue
value = obj[key]
# a variable or a family is a dict (or None)
if value is not None and not isinstance(value, dict):
raise Exception(f'cannot determine if "{path}" is a variable or a family')
return 'family'
def parse_family(self,
filename: str,
name: str,
subpath: str,
path: str,
family: dict,
first_variable: bool=False,
family_is_leadership: bool=False,
family_is_dynamic: bool=False,
) -> None:
family_obj = {}
subfamily_obj = {}
force_to_attrs = []
force_to_variable = []
for key, value in family.items():
if key in force_to_variable:
continue
if key.startswith('_'):
true_key = key[1:]
force_to_attrs.append(key)
if true_key in family:
force_to_variable.append(true_key)
elif key not in self.family_attrs:
force_to_variable.append(key)
elif isinstance(value, dict):
value_type = self.get_family_or_variable_type(value)
if obj_type:
force_to_variable.append(key)
elif 'type' in value:
force_to_attrs.append(key)
else:
force_to_variable.append(key)
else:
force_to_attrs.append(key)
for key, value in family.items():
if key in force_to_attrs:
if key.startswith('_'):
key = key[1:]
family_obj[key] = value
else:
subfamily_obj[key] = value
if path in self.paths:
if family_obj:
if not family.pop('redefine', False):
raise Exception('pfff')
self.paths.add(path,
self.paths[path].copy(update=family),
force=True,
)
self.paths[path].xmlfiles.append(filename)
force_not_first = True
if self.paths[path].type == 'dynamic':
family_is_dynamic = True
else:
if 'redefine' in family and family['redefine']:
raise Exception(f'cannot redefine the inexisting family "{path}" in {filename}')
extra_attrs = set(family_obj) - self.family_attrs
if extra_attrs:
raise Exception(f'extra attrs ... {extra_attrs}')
if self.get_family_or_variable_type(family_obj) == 'dynamic':
family_is_dynamic = True
self.add_family(subpath,
path,
name,
family_obj,
filename,
family_is_dynamic,
)
force_not_first = False
if self.paths[path].type == 'leadership':
family_is_leadership = True
for idx, key in enumerate(subfamily_obj):
value = subfamily_obj[key]
if not isinstance(value, dict) and value is not None:
raise Exception(f'pfff {key}')
first_variable = not force_not_first and idx == 0
self.family_or_variable(filename,
key,
path,
value,
first_variable,
family_is_leadership,
family_is_dynamic,
)
def add_family(self,
subpath: str,
path: str,
name: str,
family: dict,
filename: str,
family_is_dynamic: bool,
) -> None:
family['path'] = path
if not isinstance(filename, list):
filename = [filename]
family['xmlfiles'] = filename
if family_is_dynamic:
family_obj = self.dynamic
else:
family_obj = self.family
for key, value in family.items():
if isinstance(value, dict) and key in self.family_calculations:
family[key] = Calculation('family',
key,
path,
value,
filename,
self,
False,
)
try:
self.paths.add(path,
family_obj(name=name, **family),
family_is_dynamic,
)
except ValidationError as err:
raise Exception(f'invalid family "{path}" in "{filename}": {err}')
self.set_name(self.paths[path], 'optiondescription_')
self.parents[subpath].append(path)
self.parents[path] = []
self.families.append(path)
def parse_variable(self,
filename: str,
name: str,
subpath: str,
path: str,
variable: dict,
first_variable: bool,
family_is_leadership: bool,
family_is_dynamic: bool,
) -> None:
extra_attrs = set(variable) - self.choice_attrs
if extra_attrs:
raise Exception(f'extra attrs ... {extra_attrs}')
for key, value in variable.items():
#if key == 'choices':
# force_multi = True
#elif key == 'validators':
# force_multi = False
#else:
# force_multi = None
if isinstance(value, dict) and key in self.choice_calculations:
variable[key] = Calculation('variable',
key,
path,
value,
filename,
self,
False,
)
if isinstance(value, list):
for idx, val in enumerate(value):
if isinstance(val, dict):
variable[key][idx] = Calculation('variable',
key,
path,
val,
filename,
self,
True,
)
# type_of_calculation = None
# datas = {}
# params = []
# for val in value:
# if isinstance(val, dict):
# if 'type' not in val:
# raise Exception(f'the variable "{path}" in "{filename}" must have a "type" attribute for "{key}"')
# if val['type'] != 'variable':
# raise Exception(f'the variable "{path}" in "{filename}" must not have a "type" attribute with value "{val["type"]}" for "{key}"')
# if not type_of_calculation:
# type_of_calculation = val['type']
# datas['type'] = val['type']
# datas[val['type']] = val[val['type']]
# datas['params'] = {None: []}
# else:
# params.append({val['type']: val[val['type']]})
# else:
# params.append(val)
# if type_of_calculation:
# datas['params'][None] = params
# force_multi = key == 'choices'
# variable[key] = Calculation('variable',
# key,
# path,
# datas,
# filename,
# self,
# force_multi,
# )
if path in self.paths:
if 'exists' in variable and not variable['exists']:
return
if not variable.pop('redefine', False):
raise Exception('pfff')
self.paths.add(path, self.paths[path].copy(update=variable), False, force=True)
self.paths[path].xmlfiles.append(filename)
else:
if 'exists' in variable and variable.pop('exists'):
# this variable must exist
# but it's not the case
# so do nothing
return
if 'redefine' in variable and variable['redefine']:
raise Exception(f'cannot redefine the inexisting variable "{path}" in {filename}')
self.add_variable(subpath,
path,
name,
variable,
filename,
family_is_dynamic,
)
if family_is_leadership:
if first_variable:
self.leaders.append(path)
else:
self.followers.append(path)
def add_variable(self,
subpath: str,
path: str,
name: str,
variable: dict,
filename: str,
family_is_dynamic: bool,
) -> None:
variable['path'] = path
if not isinstance(filename, list):
filename = [filename]
variable['xmlfiles'] = filename
try:
if self.get_family_or_variable_type(variable) == 'symlink':
variable_obj = SymLink(name=name, **variable)
elif self.get_family_or_variable_type(variable) == 'choice':
variable_obj = self.choice(name=name, **variable)
else:
variable_obj = self.variable(name=name, **variable)
except ValidationError as err:
raise Exception(f'invalid variable "{path}" in "{filename}": {err}')
self.paths.add(path,
variable_obj,
family_is_dynamic,
)
self.variables.append(path)
self.parents[subpath].append(path)
self.set_name(variable_obj, 'option_')
def del_family(self,
path: str,
) -> None:
del self.paths[path]
self.families.remove(path)
del self.parents[path]
parent = path.rsplit('.', 1)[0]
self.parents[parent].remove(path)
class ParserService:
def __init__(self):
self.service = Service
self.ip = IP
self.certificate = Certificate
self.file = File
self.override = Override
self.services = {}
self.ips = {}
self.certificates = {}
self.files = {}
self.overrides = {}
def reflexion(self):
hint = get_type_hints(self.service)
self.service_attrs = frozenset(set(hint) | {'redefine', 'type'} - {'name', 'xmlfiles'})
self.service_calculations = self.search_calculation(hint)
#
hint = get_type_hints(self.ip)
self.ip_attrs = frozenset(set(hint) - {'name', 'xmlfiles'})
self.ip_calculations = self.search_calculation(hint)
#
hint = get_type_hints(self.certificate)
self.certificate_attrs = frozenset(set(hint) | {'redefine'} - {'name', 'xmlfiles'})
self.certificate_calculations = self.search_calculation(hint)
#
hint = get_type_hints(self.file)
self.file_attrs = frozenset(set(hint) | {'redefine'} - {'name', 'xmlfiles'})
self.file_calculations = self.search_calculation(hint)
#
hint = get_type_hints(self.override)
self.override_attrs = frozenset(set(hint) - {'name', 'xmlfiles'})
self.override_calculations = self.search_calculation(hint)
def parse_service_file(self,
filename: str,
) -> None:
with open(filename) as o:
objects = safe_load(o)
self.validate_file_version(objects)
for name, obj in objects.items():
self.parse_service(filename,
name,
obj,
)
def parse_service(self,
filename: str,
name: str,
service: dict,
) -> None:
service_obj = {}
subservice_obj = {}
force_to_variable = []
force_to_attrs = []
for key, value in service.items():
if key in force_to_variable:
continue
if key.startswith('_'):
true_key = key[1:]
force_to_attrs.append(key)
if true_key in family:
force_to_variable.append(true_key)
elif key not in self.service_attrs:
force_to_variable.append(key)
#hu? variable here?
elif 'type' in value and value['type'] in self.family_types or value['type'] in self.variable_types:
force_to_variable.append(key)
else:
force_to_attrs.append(key)
for key, value in service.items():
if key in force_to_attrs:
if key.startswith('_'):
key = key[1:]
service_obj[key] = value
else:
subservice_obj[key] = value
if name in self.services:
if service_obj:
if not service.pop('redefine', False):
raise Exception('pfff')
self.services[name] = self.services[name].copy(update=service)
else:
if '.' not in name:
raise Exception('pffff')
extra_attrs = set(service_obj) - self.service_attrs
if extra_attrs:
raise Exception(f'extra attrs ... {extra_attrs}')
service_obj['type'] = name.rsplit('.')[1]
for key, value in service_obj.items():
if isinstance(value, dict) and key in self.service_calculations:
service_obj[key] = Calculation('service',
key,
f'services.{name}',
value,
filename,
self,
False,
)
try:
self.services[name] = self.service(name=name, **service_obj)
except ValidationError as err:
raise Exception(f'invalid service "{name}" in "{filename}": {err}')
self.services[name].xmlfiles.append(filename)
for key, value in subservice_obj.items():
if key == 'override':
getattr(self, f'parse_service_{key}')(key, value, f'services.{normalize_family(name)}', filename)
else:
if not isinstance(value, dict):
raise Exception(f'pfff {key}')
for subname, subvalue in value.items():
getattr(self, f'parse_service_{key}')(subname, subvalue, f'services.{normalize_family(name)}', filename)
def parse_service_ip(self,
name: str,
ip: dict,
path: str,
filename: str,
) -> None:
extra_attrs = set(ip) - self.ip_attrs
if extra_attrs:
raise Exception(f'extra attrs ... {extra_attrs}')
for key, value in ip.items():
if isinstance(value, dict) and key in self.ip_calculations:
ip[key] = Calculation('ip',
key,
path,
value,
filename,
self,
False,
)
try:
self.ips.setdefault(path, []).append(self.ip(name=name, **ip))
except ValidationError as err:
raise Exception(f'invalid IP "{path}" in "{filename}": {err}')
self.ips[path][-1].xmlfiles.append(filename)
def parse_service_certificates(self,
name: str,
certificate: dict,
path: str,
filename: str,
) -> None:
extra_attrs = set(certificate) - self.certificate_attrs
if extra_attrs:
raise Exception(f'extra attrs ... {extra_attrs}')
for key, value in certificate.items():
if isinstance(value, dict) and key in self.certificate_calculations:
certificate[key] = Calculation('certificate',
key,
path,
value,
filename,
self,
False,
)
try:
self.certificates.setdefault(path, []).append(self.certificate(name=name, **certificate))
except ValidationError as err:
raise Exception(f'invalid certificate "{path}" in "{filename}": {err}')
self.certificates[path][-1].xmlfiles.append(filename)
def parse_service_files(self,
name: str,
file: dict,
path: str,
filename: str,
) -> None:
extra_attrs = set(file) - self.file_attrs
if extra_attrs:
raise Exception(f'extra attrs ... {extra_attrs}')
for key, value in file.items():
if isinstance(value, dict) and key in self.file_calculations:
file[key] = Calculation('file',
key,
path,
value,
filename,
self,
False,
)
try:
self.files.setdefault(path, []).append(self.file(name=name, **file))
except ValidationError as err:
raise Exception(f'invalid file "{path}" in "{filename}": {err}')
self.files[path][-1].xmlfiles.append(filename)
def parse_service_override(self,
name: str,
override: dict,
path: str,
filename: str,
) -> None:
if override:
extra_attrs = set(override) - self.override_attrs
if extra_attrs:
raise Exception(f'extra attrs ... {extra_attrs}')
self.overrides.setdefault(path, []).append(self.override(name=name, **override))
else:
self.overrides.setdefault(path, []).append(self.override(name=name))
self.overrides[path][-1].xmlfiles.append(filename)
class RougailConvert(ParserVariable, ParserService):
supported_version = ['1.0']
def __init__(self,
rougailconfig: 'RougailConfig'
) -> None:
# FIXME useful?
self.annotator = False
self.reflector = None
self.rougailconfig = rougailconfig
super().__init__()
def search_calculation(self,
hint: dict,
) -> List[str]:
return [key for key, value in hint.items() if 'Union' in value.__class__.__name__ and Calculation in value.__args__]
def reflexion(self):
super().reflexion()
self.parse_directories()
self.annotate()
self.reflect()
def parse_directories(self) -> None:
for filename in self.get_sorted_filename(self.rougailconfig['dictionaries_dir']):
namespace = self.rougailconfig['variable_namespace']
self.parse_variable_file(filename,
namespace,
# self.rougailconfig['variable_namespace_description'],
)
for namespace, extra_dirs in self.rougailconfig['extra_dictionaries'].items():
for filename in self.get_sorted_filename(extra_dirs):
self.parse_variable_file(filename,
namespace,
# namespace,
)
for filename in self.get_sorted_filename(self.rougailconfig['services_dir']):
self.parse_service_file(filename)
def get_sorted_filename(self,
directories: Union[str, List[str]],
) -> List[str]:
filenames = {}
if not isinstance(directories, list):
directories = [directories]
for directory in directories:
if not isdir(directory):
continue
for filename in listdir(directory):
if not filename.endswith('.yml'):
continue
full_filename = join(directory, filename)
if filename in filenames:
raise DictConsistencyError(_(f'duplicate dictionary file name {filename}'), 78, [filenames[filename][1], full_filename])
filenames[filename] = full_filename
for filename in sorted(filenames):
yield filenames[filename]
def validate_file_version(self,
obj: dict,
) -> None:
if 'version' not in obj:
raise Exception('version ...')
version = obj.pop('version')
if version not in self.supported_version:
raise Exception(f'pffff version ... {version} not in {self.supported_version}')
def set_name(self,
obj,
option_prefix,
):
self.index += 1
self.reflector_names[obj.path] = f'{option_prefix}{self.index}{self.rougailconfig["suffix"]}'
def annotate(self):
if self.annotator:
raise DictConsistencyError(_('Cannot execute annotate multiple time'), 85, None)
SpaceAnnotator(self)
self.annotator = True
def reflect(self):
functions_file = self.rougailconfig['functions_file']
if not isinstance(functions_file, list):
functions_file = [functions_file]
functions_file = [func for func in functions_file if func not in self.exclude_imports]
self.reflector = TiramisuReflector(self,
functions_file,
)
def save(self,
filename: None,
) -> 'OptionDescription':
"""Return tiramisu object declaration as a string
"""
if self.reflector is None:
self.reflexion()
output = self.reflector.get_text() + '\n'
if filename:
with open(filename, 'w') as tiramisu:
tiramisu.write(output)
print(output)
return output