202 lines
8.7 KiB
Python
202 lines
8.7 KiB
Python
from .i18n import _
|
|
from .utils import normalize_family
|
|
from .error import OperationError, DictConsistencyError
|
|
from .config import Config
|
|
|
|
|
|
class Path:
|
|
"""Helper class to handle the `path` attribute of a CreoleObjSpace
|
|
instance.
|
|
|
|
sample: path="creole.general.condition"
|
|
"""
|
|
def __init__(self):
|
|
self.variables = {}
|
|
self.families = {}
|
|
self.full_paths_families = {}
|
|
self.full_paths_variables = {}
|
|
|
|
# Family
|
|
def add_family(self,
|
|
namespace: str,
|
|
name: str,
|
|
variableobj: str,
|
|
) -> str: # pylint: disable=C0111
|
|
if '.' not in name and namespace == Config['variable_namespace']:
|
|
full_name = '.'.join([namespace, name])
|
|
self.full_paths_families[name] = full_name
|
|
else:
|
|
full_name = name
|
|
if full_name in self.families and self.families[full_name]['variableobj'] != variableobj:
|
|
raise DictConsistencyError(_(f'Duplicate family name {name}'), 37)
|
|
self.families[full_name] = dict(name=name,
|
|
namespace=namespace,
|
|
variableobj=variableobj,
|
|
)
|
|
|
|
def get_family_path(self,
|
|
name: str,
|
|
current_namespace: str,
|
|
) -> str: # pylint: disable=C0111
|
|
#name = normalize_family(name)
|
|
if '.' not in name and current_namespace == Config['variable_namespace'] and name in self.full_paths_families:
|
|
name = self.full_paths_families[name]
|
|
if current_namespace is None: # pragma: no cover
|
|
raise OperationError('current_namespace must not be None')
|
|
dico = self.families[name]
|
|
if dico['namespace'] != Config['variable_namespace'] and current_namespace != dico['namespace']:
|
|
raise DictConsistencyError(_(f'A family located in the "{dico["namespace"]}" namespace shall not be used in the "{current_namespace}" namespace'), 38)
|
|
return dico['name']
|
|
|
|
def get_family_obj(self,
|
|
name: str,
|
|
) -> 'Family': # pylint: disable=C0111
|
|
if '.' not in name and name in self.full_paths_families:
|
|
name = self.full_paths_families[name]
|
|
if name not in self.families:
|
|
raise DictConsistencyError(_('unknown family {}').format(name), 39)
|
|
dico = self.families[name]
|
|
return dico['variableobj']
|
|
|
|
def family_is_defined(self,
|
|
name: str,
|
|
) -> str: # pylint: disable=C0111
|
|
if '.' not in name and name not in self.families and name in self.full_paths_families:
|
|
return True
|
|
return name in self.families
|
|
|
|
# Leadership
|
|
def set_leader(self,
|
|
namespace: str,
|
|
leader_family_name: str,
|
|
name: str,
|
|
leader_name: str,
|
|
) -> None: # pylint: disable=C0111
|
|
# need rebuild path and move object in new path
|
|
old_path = namespace + '.' + leader_family_name + '.' + name
|
|
dico = self._get_variable(old_path)
|
|
del self.variables[old_path]
|
|
new_path = namespace + '.' + leader_family_name + '.' + leader_name + '.' + name
|
|
self.add_variable(namespace,
|
|
new_path,
|
|
dico['family'],
|
|
False,
|
|
dico['variableobj'],
|
|
)
|
|
if namespace == Config['variable_namespace']:
|
|
self.full_paths_variables[name] = new_path
|
|
else:
|
|
name = new_path
|
|
dico = self._get_variable(name)
|
|
if dico['leader'] != None:
|
|
raise DictConsistencyError(_('Already defined leader {} for variable'
|
|
' {}'.format(dico['leader'], name)), 40)
|
|
dico['leader'] = leader_name
|
|
|
|
def get_leader(self, name): # pylint: disable=C0111
|
|
return self._get_variable(name)['leader']
|
|
|
|
# Variable
|
|
def add_variable(self,
|
|
namespace: str,
|
|
name: str,
|
|
family: str,
|
|
is_dynamic: bool,
|
|
variableobj,
|
|
) -> str: # pylint: disable=C0111
|
|
if '.' not in name:
|
|
full_name = '.'.join([namespace, family, name])
|
|
self.full_paths_variables[name] = full_name
|
|
else:
|
|
full_name = name
|
|
if namespace == Config['variable_namespace']:
|
|
name = name.rsplit('.', 1)[1]
|
|
self.variables[full_name] = dict(name=name,
|
|
family=family,
|
|
namespace=namespace,
|
|
leader=None,
|
|
is_dynamic=is_dynamic,
|
|
variableobj=variableobj)
|
|
|
|
def get_variable_name(self,
|
|
name,
|
|
): # pylint: disable=C0111
|
|
return self._get_variable(name)['name']
|
|
|
|
def get_variable_obj(self,
|
|
name:str,
|
|
) -> 'Variable': # pylint: disable=C0111
|
|
return self._get_variable(name)['variableobj']
|
|
|
|
def get_variable_family_name(self,
|
|
name: str,
|
|
) -> str: # pylint: disable=C0111
|
|
return self._get_variable(name)['family']
|
|
|
|
def get_variable_namespace(self,
|
|
name: str,
|
|
) -> str: # pylint: disable=C0111
|
|
return self._get_variable(name)['namespace']
|
|
|
|
def get_variable_path(self,
|
|
name: str,
|
|
current_namespace: str,
|
|
allow_source: str=False,
|
|
with_suffix: bool=False,
|
|
) -> str: # pylint: disable=C0111
|
|
if current_namespace is None: # pragma: no cover
|
|
raise OperationError('current_namespace must not be None')
|
|
if with_suffix:
|
|
dico, suffix = self._get_variable(name,
|
|
with_suffix=True,
|
|
)
|
|
else:
|
|
dico = self._get_variable(name)
|
|
if not allow_source:
|
|
if dico['namespace'] not in [Config['variable_namespace'], 'services'] and current_namespace != dico['namespace']:
|
|
raise DictConsistencyError(_(f'A variable located in the "{dico["namespace"]}" namespace shall not be used in the "{current_namespace}" namespace'), 41)
|
|
if '.' in dico['name']:
|
|
value = dico['name']
|
|
else:
|
|
list_path = [dico['namespace'], dico['family']]
|
|
if dico['leader'] is not None:
|
|
list_path.append(dico['leader'])
|
|
list_path.append(dico['name'])
|
|
value = '.'.join(list_path)
|
|
if with_suffix:
|
|
return value, suffix
|
|
return value
|
|
|
|
def path_is_defined(self,
|
|
name: str,
|
|
) -> str: # pylint: disable=C0111
|
|
if '.' not in name and name not in self.variables and name in self.full_paths_variables:
|
|
return True
|
|
return name in self.variables
|
|
|
|
def _get_variable(self,
|
|
name: str,
|
|
with_suffix: bool=False,
|
|
) -> str:
|
|
if name not in self.variables:
|
|
if name not in self.variables:
|
|
if '.' not in name and name in self.full_paths_variables:
|
|
name = self.full_paths_variables[name]
|
|
if name not in self.variables:
|
|
for var_name, variable in self.variables.items():
|
|
if variable['is_dynamic'] and name.startswith(var_name):
|
|
if not with_suffix:
|
|
raise Exception('This option is dynamic, should use "with_suffix" attribute')
|
|
return variable, name[len(var_name):]
|
|
if '.' not in name:
|
|
for var_name, path in self.full_paths_variables.items():
|
|
if name.startswith(var_name):
|
|
variable = self.variables[self.full_paths_variables[var_name]]
|
|
if variable['is_dynamic']:
|
|
if not with_suffix:
|
|
raise Exception('This option is dynamic, should use "with_suffix" attribute')
|
|
return variable, name[len(var_name):]
|
|
raise DictConsistencyError(_('unknown option {}').format(name), 42)
|
|
if with_suffix:
|
|
return self.variables[name], None
|
|
return self.variables[name]
|