rougail/creole/objspace.py
Emmanuel Garette 590283ef25 fill is a value
2019-11-26 22:05:28 +01:00

693 lines
31 KiB
Python

"""
Creole flattener. Takes a bunch of Creole XML dispatched in differents folders
as an input and outputs a human readable flatened XML
Sample usage::
>>> from creole.objspace import CreoleObjSpace
>>> eolobj = CreoleObjSpace('/usr/share/creole/creole.dtd')
>>> eolobj.create_or_populate_from_xml('creole', ['/usr/share/eole/creole/dicos'])
>>> eolobj.space_visitor()
>>> eolobj.save('/tmp/creole_flatened_output.xml')
The CreoleObjSpace
- loads the XML into an internal CreoleObjSpace representation
- visits/annotates the objects
- dumps the object space as XML output into a single XML target
The visit/annotation stage is a complex step that corresponds to the Creole
procedures.
For example: a variable is redefined and shall be moved to another family
means that a variable1 = Variable() object in the object space who lives in the family1 parent
has to be moved in family2. The visit procedure changes the varable1's object space's parent.
"""
from collections import OrderedDict
from lxml.etree import Element, SubElement # pylint: disable=E0611
from json import dump
from .i18n import _
from .xmlreflector import XMLReflector, HIGH_COMPATIBILITY
from .annotator import ERASED_ATTRIBUTES, ActionAnnotator, ContainerAnnotator, SpaceAnnotator
from .utils import normalize_family
from .error import CreoleOperationError, SpaceObjShallNotBeUpdated, CreoleDictConsistencyError
# CreoleObjSpace's elements like 'family' or 'slave', that shall be forced to the Redefinable type
FORCE_REDEFINABLES = ('family', 'slave', 'container', 'disknod', 'variables', 'family_action')
# CreoleObjSpace's elements that shall be forced to the UnRedefinable type
FORCE_UNREDEFINABLES = ('value', 'input', 'profile', 'ewtapp', 'tag', 'saltaction')
# CreoleObjSpace's elements that shall be set to the UnRedefinable type
UNREDEFINABLE = ('multi', 'type')
PROPERTIES = ('hidden', 'frozen', 'auto_freeze', 'auto_save', 'force_default_on_freeze',
'force_store_value', 'disabled', 'mandatory')
CONVERT_PROPERTIES = {'auto_save': ['force_store_value'], 'auto_freeze': ['force_store_value', 'auto_freeze']}
RENAME_ATTIBUTES = {'description': 'doc'}
#TYPE_TARGET_CONDITION = ('variable', 'family')
# _____________________________________________________________________________
# special types definitions for the Object Space's internal representation
class RootCreoleObject(object):
""
class CreoleObjSpace(object):
"""DOM XML reflexion free internal representation of a Creole Dictionary
"""
choice = type('Choice', (RootCreoleObject,), OrderedDict())
property_ = type('Property', (RootCreoleObject,), OrderedDict())
# Creole ObjectSpace's Leadership variable class type
Leadership = type('Leadership', (RootCreoleObject,), OrderedDict())
"""
This Atom type stands for singleton, that is
an Object Space's atom object is present only once in the
object space's tree
"""
Atom = type('Atom', (RootCreoleObject,), OrderedDict())
"A variable that can't be redefined"
Redefinable = type('Redefinable', (RootCreoleObject,), OrderedDict())
"A variable can be redefined"
UnRedefinable = type('UnRedefinable', (RootCreoleObject,), OrderedDict())
def __init__(self, dtdfilename): # pylint: disable=R0912
self.index = 0
class ObjSpace(object): # pylint: disable=R0903
"""
Base object space
"""
self.space = ObjSpace()
self.xmlreflector = XMLReflector()
self.xmlreflector.parse_dtd(dtdfilename)
self.redefine_variables = None
self.probe_variables = []
# elt container's attrs list
self.container_elt_attr_list = [] #
# ['variable', 'separator', 'family']
self.forced_text_elts = set()
# ['disknod', 'follower', 'target', 'service', 'package', 'ip', 'value', 'tcpwrapper',
# 'interface', 'input', 'port']
self.forced_text_elts_as_name = set(['choice', 'property'])
self.forced_choice_option = {}
self.paths = Path()
self.list_conditions = {}
self.booleans_attributs = []
for elt in self.xmlreflector.dtd.iterelements():
attrs = {}
clstype = self.UnRedefinable
atomic = True
forced_text_elt = False
if elt.type == 'mixed':
forced_text_elt = True
if elt.name == 'container':
self.container_elt_attr_list = [elt.content.left.name]
self.parse_dtd_right_left_elt(elt.content)
for attr in elt.iterattributes():
atomic = False
if attr.default_value:
if attr.default_value == 'True':
default_value = True
elif attr.default_value == 'False':
default_value = False
else:
default_value = attr.default_value
attrs[attr.name] = default_value
if not attr.name.endswith('_type'):
values = list(attr.itervalues())
if values != []:
self.forced_choice_option.setdefault(elt.name, {})[attr.name] = values
if attr.name == 'redefine':
clstype = self.Redefinable
if attr.name == 'name' and forced_text_elt is True:
self.forced_text_elts.add(elt.name)
forced_text_elt = False
if set(attr.itervalues()) == set(['True', 'False']):
self.booleans_attributs.append(attr.name)
if forced_text_elt is True:
self.forced_text_elts_as_name.add(elt.name)
if elt.name in FORCE_REDEFINABLES:
clstype = self.Redefinable
elif elt.name in FORCE_UNREDEFINABLES:
clstype = self.UnRedefinable
elif atomic:
clstype = self.Atom
# Creole ObjectSpace class types, it enables us to create objects like:
# Service_restriction(), Ip(), Interface(), Host(), Fstab(), Package(), Disknod(),
# File(), Variables(), Family(), Variable(), Separators(), Separator(), Value(),
# Constraints()... and so on. Creole ObjectSpace is an object's reflexion of
# the XML elements
setattr(self, elt.name, type(elt.name.capitalize(), (clstype,), attrs))
def parse_dtd_right_left_elt(self, elt):
if elt.right.type == 'or':
self.container_elt_attr_list.append(elt.right.left.name)
self.parse_dtd_right_left_elt(elt.right)
else:
self.container_elt_attr_list.append(elt.right.name)
def _convert_boolean(self, value): # pylint: disable=R0201
"""Boolean coercion. The Creole XML may contain srings like `True` or `False`
"""
if isinstance(value, bool):
return value
if value == 'True':
return True
elif value == 'False':
return False
else:
raise TypeError(_('{} is not True or False').format(value)) # pragma: no cover
def _is_already_exists(self, name, space, child, namespace):
if isinstance(space, self.family): # pylint: disable=E1101
if namespace != 'creole':
name = space.path + '.' + name
return self.paths.path_is_defined(name)
if child.tag in ['family', 'family_action']:
norm_name = normalize_family(name)
else:
norm_name = name
return norm_name in getattr(space, child.tag, {})
def _translate_in_space(self, name, family, variable, namespace):
if not isinstance(family, self.family): # pylint: disable=E1101
if variable.tag in ['family', 'family_action']:
norm_name = normalize_family(name)
else:
norm_name = name
return getattr(family, variable.tag)[norm_name]
if namespace == 'creole':
path = name
else:
path = family.path + '.' + name
old_family_name = self.paths.get_variable_family_name(path)
if normalize_family(family.name) == old_family_name:
return getattr(family, variable.tag)[name]
old_family = self.space.variables['creole'].family[old_family_name] # pylint: disable=E1101
variable_obj = old_family.variable[name]
del old_family.variable[name]
if 'variable' not in vars(family):
family.variable = OrderedDict()
family.variable[name] = variable_obj
self.paths.append('variable', name, namespace, family.name, variable_obj)
return variable_obj
def remove_check(self, name): # pylint: disable=C0111
if hasattr(self.space, 'constraints') and hasattr(self.space.constraints, 'check'):
remove_checks = []
for idx, check in enumerate(self.space.constraints.check): # pylint: disable=E1101
if hasattr(check, 'target') and check.target == name:
remove_checks.append(idx)
remove_checks = list(set(remove_checks))
remove_checks.sort(reverse=True)
for idx in remove_checks:
self.space.constraints.check.pop(idx) # pylint: disable=E1101
def remove_condition(self, name): # pylint: disable=C0111
for idx, condition in enumerate(self.space.constraints.condition): # pylint: disable=E1101
remove_targets = []
if hasattr(condition, 'target'):
for target_idx, target in enumerate(condition.target):
if target.name == name:
remove_targets.append(target_idx)
remove_targets = list(set(remove_targets))
remove_targets.sort(reverse=True)
for idx in remove_targets:
del condition.target[idx]
def create_or_update_space_object(self, subspace, space, child, namespace):
"""Creates or retrieves the space object that corresponds
to the `child` XML object
Two attributes of the `child` XML object are important:
- with the `redefine` boolean flag attribute we know whether
the corresponding space object shall be created or updated
- `True` means that the corresponding space object shall be updated
- `False` means that the corresponding space object shall be created
- with the `exists` boolean flag attribute we know whether
the corresponding space object shall be created
(or nothing -- that is the space object isn't modified)
- `True` means that the corresponding space object shall be created
- `False` means that the corresponding space object is not updated
In the special case `redefine` is True and `exists` is False,
we create the corresponding space object if it doesn't exist
and we update it if it exists.
:return: the corresponding space object of the `child` XML object
"""
if child.tag in self.forced_text_elts_as_name:
name = child.text
else:
name = subspace['name']
if self._is_already_exists(name, space, child, namespace):
if child.tag in FORCE_REDEFINABLES:
redefine = self._convert_boolean(subspace.get('redefine', True))
else:
redefine = self._convert_boolean(subspace.get('redefine', False))
exists = self._convert_boolean(subspace.get('exists', True))
if redefine is True:
return self._translate_in_space(name, space, child, namespace)
elif exists is False:
raise SpaceObjShallNotBeUpdated()
else:
raise CreoleDictConsistencyError(_('Already present in another XML file, {} '
'cannot be re-created').format(name))
else:
redefine = self._convert_boolean(subspace.get('redefine', False))
exists = self._convert_boolean(subspace.get('exists', False))
if redefine is False or exists is True:
return getattr(self, child.tag)()
else:
raise CreoleDictConsistencyError(_('Redefined object: '
'{} does not exist yet').format(name))
def generate_creoleobj(self, child, space, namespace):
"""
instanciates or creates Creole Object Subspace objects
"""
if issubclass(getattr(self, child.tag), self.Redefinable):
creoleobj = self.create_or_update_space_object(child.attrib, space, child, namespace)
else:
# instanciates an object from the CreoleObjSpace's builtins types
# example : child.tag = constraints -> a self.Constraints() object is created
creoleobj = getattr(self, child.tag)()
# this Atom instance has to be a singleton here
# we do not re-create it, we reuse it
if isinstance(creoleobj, self.Atom) and child.tag in vars(space):
creoleobj = getattr(space, child.tag)
self.create_tree_structure(space, child, creoleobj)
return creoleobj
def create_tree_structure(self, space, child, creoleobj): # pylint: disable=R0201
"""
Builds the tree structure of the object space here
we set containers attributes in order to be populated later on
for example::
space = Family()
space.variable = OrderedDict()
another example:
space = Variable()
space.value = list()
"""
if child.tag not in vars(space):
if isinstance(creoleobj, self.Redefinable):
setattr(space, child.tag, OrderedDict())
elif isinstance(creoleobj, self.UnRedefinable):
setattr(space, child.tag, [])
elif isinstance(creoleobj, self.Atom):
pass
else: # pragma: no cover
raise CreoleOperationError(_("Creole object {} "
"has a wrong type").format(type(creoleobj)))
def _add_to_tree_structure(self, creoleobj, space, child): # pylint: disable=R0201
if isinstance(creoleobj, self.Redefinable):
name = creoleobj.name
if child.tag == 'family' or child.tag == 'family_action':
name = normalize_family(name)
getattr(space, child.tag)[name] = creoleobj
elif isinstance(creoleobj, self.UnRedefinable):
getattr(space, child.tag).append(creoleobj)
else:
setattr(space, child.tag, creoleobj)
def _set_text_to_obj(self, child, creoleobj):
if child.text is None:
text = None
else:
text = child.text.strip()
if text:
if child.tag in self.forced_text_elts_as_name:
creoleobj.name = text
else:
creoleobj.text = text
def _set_xml_attributes_to_obj(self, child, creoleobj):
redefine = self._convert_boolean(child.attrib.get('redefine', False))
has_value = hasattr(creoleobj, 'value')
if HIGH_COMPATIBILITY and has_value:
has_value = len(child) != 1 or child[0].text != None
if (redefine is True and child.tag == 'variable' and has_value
and len(child) != 0):
del creoleobj.value
for attr, val in child.attrib.items():
if redefine and attr in UNREDEFINABLE:
# UNREDEFINABLE concerns only 'variable' node so we can fix name
# to child.attrib['name']
name = child.attrib['name']
raise CreoleDictConsistencyError(_("cannot redefine attribute {} for variable {}").format(attr, name))
if isinstance(getattr(creoleobj, attr, None), bool):
if val == 'False':
val = False
elif val == 'True':
val = True
else: # pragma: no cover
raise CreoleOperationError(_('value for {} must be True or False, '
'not {}').format(attr, val))
if not (attr == 'name' and getattr(creoleobj, 'name', None) != None):
setattr(creoleobj, attr, val)
def _creoleobj_tree_visitor(self, child, creoleobj, namespace):
"""Creole object tree manipulations
"""
if child.tag == 'variable' and child.attrib.get('remove_check', False):
self.remove_check(creoleobj.name)
if child.tag == 'variable' and child.attrib.get('remove_condition', False):
self.remove_condition(creoleobj.name)
if child.tag in ['auto', 'fill', 'check']:
variable_name = child.attrib['target']
# XXX not working with variable not in creole and in leader/followers
if variable_name in self.redefine_variables:
creoleobj.redefine = True
else:
creoleobj.redefine = False
if not hasattr(creoleobj, 'index'):
creoleobj.index = self.index
if child.tag in ['auto', 'fill', 'condition', 'check', 'action']:
creoleobj.namespace = namespace
def xml_parse_document(self, document, space, namespace, is_in_family=False):
"""Parses a Creole XML file
populates the CreoleObjSpace
"""
family_names = []
for child in document:
# this index enables us to reorder the 'fill' and 'auto' objects
self.index += 1
# doesn't proceed the XML commentaries
if not isinstance(child.tag, str):
continue
if child.tag == 'family':
is_in_family = True
if child.attrib['name'] in family_names:
raise CreoleDictConsistencyError(_('Family {} is set several times').format(child.attrib['name']))
family_names.append(child.attrib['name'])
if child.tag == 'variables':
child.attrib['name'] = namespace
if HIGH_COMPATIBILITY and child.tag == 'value' and child.text == None:
continue
# creole objects creation
try:
creoleobj = self.generate_creoleobj(child, space, namespace)
except SpaceObjShallNotBeUpdated:
continue
self._set_text_to_obj(child, creoleobj)
self._set_xml_attributes_to_obj(child, creoleobj)
self._creoleobj_tree_visitor(child, creoleobj, namespace)
self._fill_creoleobj_path_attribute(space, child, namespace, document, creoleobj)
self._add_to_tree_structure(creoleobj, space, child)
if list(child) != []:
self.xml_parse_document(child, creoleobj, namespace, is_in_family)
def _fill_creoleobj_path_attribute(self, space, child, namespace, document, creoleobj): # pylint: disable=R0913
"""Fill self.paths attributes
"""
if not isinstance(space, self.help): # pylint: disable=E1101
if child.tag == 'variable':
family_name = normalize_family(document.attrib['name'])
self.paths.append('variable', child.attrib['name'], namespace, family_name,
creoleobj)
if child.attrib.get('redefine', 'False') == 'True':
if namespace == 'creole':
self.redefine_variables.append(child.attrib['name'])
else:
self.redefine_variables.append(namespace + '.' + family_name + '.' +
child.attrib['name'])
if child.tag == 'family':
family_name = normalize_family(child.attrib['name'])
if namespace != 'creole':
family_name = namespace + '.' + family_name
self.paths.append('family', family_name, namespace, creoleobj=creoleobj)
creoleobj.path = self.paths.get_family_path(family_name, namespace)
def create_or_populate_from_xml(self, namespace, xmlfolders, from_zephir=None):
"""Parses a bunch of XML files
populates the CreoleObjSpace
"""
documents = self.xmlreflector.load_xml_from_folders(xmlfolders, from_zephir)
for xmlfile, document in documents:
try:
self.redefine_variables = []
self.xml_parse_document(document, self.space, namespace)
except Exception as err:
#print(_('error in XML file {}').format(xmlfile))
raise err
def populate_from_zephir(self, namespace, xmlfile):
self.redefine_variables = []
document = self.xmlreflector.parse_xmlfile(xmlfile, from_zephir=True, zephir2=True)
self.xml_parse_document(document, self.space, namespace)
def space_visitor(self, eosfunc_file): # pylint: disable=C0111
ActionAnnotator(self)
ContainerAnnotator(self)
SpaceAnnotator(self, eosfunc_file)
def save(self, filename, force_no_save=False):
"""Save an XML output on disk
:param filename: the full XML filename
"""
xml = Element('creole')
self._xml_export(xml, self.space)
if not force_no_save:
self.xmlreflector.save_xmlfile(filename, xml)
return xml
def save_probes(self, filename, force_no_save=False):
"""Save an XML output on disk
:param filename: the full XML filename
"""
ret = {}
for variable in self.probe_variables:
args = []
kwargs = {}
if hasattr(variable, 'param'):
for param in variable.param:
list_param = list(vars(param).keys())
if 'index' in list_param:
list_param.remove('index')
if list_param == ['text']:
args.append(param.text)
elif list_param == ['text', 'name']:
kwargs[param.name] = param.text
else:
print(vars(param))
raise Exception('hu?')
ret[variable.target] = {'function': variable.name,
'args': args,
'kwargs': kwargs}
if not force_no_save:
with open(filename, 'w') as fhj:
dump(ret, fhj)
return ret
def _get_attributes(self, space): # pylint: disable=R0201
for attr in dir(space):
if not attr.startswith('_'):
yield attr
def _sub_xml_export(self, name, node, node_name, space, current_space):
if isinstance(space, dict):
space = list(space.values())
if isinstance(space, list):
for subspace in space:
if isinstance(subspace, self.Leadership):
_name = 'leader'
subspace.doc = subspace.variable[0].description
#subspace.doc = 'Leadership {}'.format(subspace.name)
else:
_name = name
if name in ['containers', 'variables', 'actions']:
_name = 'family'
if HIGH_COMPATIBILITY and not hasattr(subspace, 'doc'):
subspace.doc = ''
if _name == 'value' and (not hasattr(subspace, 'name') or subspace.name is None):
continue
child_node = SubElement(node, _name)
self._xml_export(child_node, subspace, _name)
elif isinstance(space, self.Atom):
if name == 'containers':
child_node = SubElement(node, 'family')
child_node.attrib['name'] = name
else:
child_node = SubElement(node, name)
for subname in self._get_attributes(space):
subspace = getattr(space, subname)
self._sub_xml_export(subname, child_node, name, subspace, space)
elif isinstance(space, self.Redefinable):
child_node = SubElement(node, 'family')
child_node.attrib['name'] = name
for subname in self._get_attributes(space):
subspace = getattr(space, subname)
self._sub_xml_export(subname, child_node, name, subspace, space)
else:
# FIXME plutot dans annotator ...
if name in PROPERTIES and node.tag in ['variable', 'family', 'leader']:
if space is True:
for prop in CONVERT_PROPERTIES.get(name, [name]):
SubElement(node, 'property').text = prop
elif name not in ERASED_ATTRIBUTES:
if name == 'name' and node_name in self.forced_text_elts_as_name and not hasattr(current_space, 'param'):
if isinstance(space, str):
node.text = space
else:
node.text = str(space)
elif name == 'text' and node_name in self.forced_text_elts:
node.text = space
elif node.tag == 'family' and name == 'name':
if 'doc' not in node.attrib.keys():
node.attrib['doc'] = space
node.attrib['name'] = normalize_family(space, check_name=False)
elif node.tag in ['variable', 'family', 'leader'] and name == 'mode':
if space is not None:
SubElement(node, 'property').text = space
else:
if name in RENAME_ATTIBUTES:
name = RENAME_ATTIBUTES[name]
if space is not None:
node.attrib[name] = str(space)
def _xml_export(self, node, space, node_name='creole'):
for name in self._get_attributes(space):
subspace = getattr(space, name)
self._sub_xml_export(name, node, node_name, subspace, space)
class Path(object):
"""Helper class to handle the `path` attribute of a CreoleObjSpace
instance.
sample: path="creole.general.condition"
"""
def __init__(self):
self.variables = {}
self.families = {}
def append(self, pathtype, name, namespace, family=None, creoleobj=None): # pylint: disable=C0111
if pathtype == 'family':
self.families[name] = dict(name=name, namespace=namespace, creoleobj=creoleobj)
elif pathtype == 'variable':
if namespace == 'creole':
varname = name
else:
if '.' in name:
varname = name
else:
varname = '.'.join([namespace, family, name])
self.variables[varname] = dict(name=name, family=family, namespace=namespace,
leader=None, creoleobj=creoleobj)
else: # pragma: no cover
raise Exception('unknown pathtype {}'.format(pathtype))
def get_family_path(self, name, current_namespace): # pylint: disable=C0111
if current_namespace is None: # pragma: no cover
raise CreoleOperationError('current_namespace must not be None')
dico = self.families[normalize_family(name, check_name=False)]
if dico['namespace'] != 'creole' and current_namespace != dico['namespace']:
raise CreoleDictConsistencyError(_('A family located in the {} namespace '
'shall not be used in the {} namespace').format(
dico['namespace'], current_namespace))
path = dico['name']
if dico['namespace'] is not None and '.' not in dico['name']:
path = '.'.join([dico['namespace'], path])
return path
def get_family_namespace(self, name): # pylint: disable=C0111
dico = self.families[name]
if dico['namespace'] is None:
return dico['name']
return dico['namespace']
def get_family_obj(self, name): # pylint: disable=C0111
if name not in self.families:
raise CreoleDictConsistencyError(_('unknown family {}').format(name))
dico = self.families[name]
return dico['creoleobj']
def get_variable_name(self, name): # pylint: disable=C0111
dico = self._get_variable(name)
return dico['name']
def get_variable_obj(self, name): # pylint: disable=C0111
dico = self._get_variable(name)
return dico['creoleobj']
def get_variable_family_name(self, name): # pylint: disable=C0111
dico = self._get_variable(name)
return dico['family']
def get_variable_family_path(self, name): # pylint: disable=C0111
dico = self._get_variable(name)
list_path = [dico['namespace'], dico['family']]
if dico['leader'] is not None:
list_path.append(dico['leader'])
return '.'.join(list_path)
def get_variable_namespace(self, name): # pylint: disable=C0111
return self._get_variable(name)['namespace']
def get_variable_path(self, name, current_namespace, allow_source=False): # pylint: disable=C0111
if current_namespace is None: # pragma: no cover
raise CreoleOperationError('current_namespace must not be None')
dico = self._get_variable(name)
if not allow_source:
if dico['namespace'] != 'creole' and current_namespace != dico['namespace']:
raise CreoleDictConsistencyError(_('A variable located in the {} namespace '
'shall not be used in the {} namespace').format(
dico['namespace'], current_namespace))
if '.' in dico['name']:
return dico['name']
list_path = [dico['namespace'], dico['family']]
if dico['leader'] is not None:
list_path.append(dico['leader'])
list_path.append(dico['name'])
return '.'.join(list_path)
def path_is_defined(self, name): # pylint: disable=C0111
return name in self.variables
def set_leader(self, name, leader): # pylint: disable=C0111
dico = self._get_variable(name)
namespace = dico['namespace']
if dico['leader'] != None:
raise CreoleDictConsistencyError(_('Already defined leader {} for variable'
' {}'.format(dico['leader'], name)))
dico['leader'] = leader
if namespace != 'creole':
new_path = self.get_variable_path(name, namespace)
self.append('variable', new_path, namespace, family=dico['family'], creoleobj=dico['creoleobj'])
self.variables[new_path]['leader'] = leader
del self.variables[name]
def _get_variable(self, name):
if name not in self.variables:
if name.startswith('creole.'):
name = name.split('.')[-1]
if name not in self.variables:
raise CreoleDictConsistencyError(_('unknown option {}').format(name))
return self.variables[name]
def get_leader(self, name): # pylint: disable=C0111
dico = self._get_variable(name)
return dico['leader']