532 lines
18 KiB
Python
532 lines
18 KiB
Python
"""Rougail object model
|
|
|
|
Silique (https://www.silique.fr)
|
|
Copyright (C) 2023-2024
|
|
|
|
distribued with GPL-2 or later license
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; either version 2 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
"""
|
|
|
|
from typing import Optional, Union, get_type_hints, Any, Literal, List, Dict, Iterator
|
|
from pydantic import (
|
|
BaseModel,
|
|
StrictBool,
|
|
StrictInt,
|
|
StrictFloat,
|
|
StrictStr,
|
|
ConfigDict,
|
|
)
|
|
from .utils import get_jinja_variable_to_param, get_realpath
|
|
from .error import DictConsistencyError
|
|
|
|
BASETYPE = Union[StrictBool, StrictInt, StrictFloat, StrictStr, None]
|
|
PROPERTY_ATTRIBUTE = ["frozen", "hidden", "disabled", "mandatory"]
|
|
|
|
|
|
def convert_boolean(value: str) -> bool:
|
|
"""Boolean coercion. The Rougail XML may contain srings like `True` or `False`"""
|
|
if isinstance(value, bool):
|
|
return value
|
|
value = value.lower()
|
|
if value == "true":
|
|
return True
|
|
elif value == "false":
|
|
return False
|
|
elif value in ['', None]:
|
|
return None
|
|
raise Exception(f'unknown boolean value "{value}"')
|
|
|
|
|
|
CONVERT_OPTION = {
|
|
"string": dict(opttype="StrOption"),
|
|
"number": dict(opttype="IntOption", func=int),
|
|
"float": dict(opttype="FloatOption", func=float),
|
|
"boolean": dict(opttype="BoolOption", func=convert_boolean),
|
|
"secret": dict(opttype="PasswordOption"),
|
|
"mail": dict(opttype="EmailOption"),
|
|
"unix_filename": dict(opttype="FilenameOption"),
|
|
"date": dict(opttype="DateOption"),
|
|
"unix_user": dict(opttype="UsernameOption"),
|
|
"ip": dict(opttype="IPOption", initkwargs={"allow_reserved": True}),
|
|
"cidr": dict(opttype="IPOption", initkwargs={"cidr": True}),
|
|
"netmask": dict(opttype="NetmaskOption"),
|
|
"network": dict(opttype="NetworkOption"),
|
|
"network_cidr": dict(opttype="NetworkOption", initkwargs={"cidr": True}),
|
|
"broadcast": dict(opttype="BroadcastOption"),
|
|
"netbios": dict(
|
|
opttype="DomainnameOption",
|
|
initkwargs={"type": "netbios", "warnings_only": True},
|
|
),
|
|
"domainname": dict(
|
|
opttype="DomainnameOption", initkwargs={"type": "domainname", "allow_ip": False}
|
|
),
|
|
"hostname": dict(
|
|
opttype="DomainnameOption", initkwargs={"type": "hostname", "allow_ip": False}
|
|
),
|
|
"web_address": dict(
|
|
opttype="URLOption", initkwargs={"allow_ip": False, "allow_without_dot": True}
|
|
),
|
|
"port": dict(opttype="PortOption", initkwargs={"allow_private": True}),
|
|
"mac": dict(opttype="MACOption"),
|
|
"unix_permissions": dict(
|
|
opttype="PermissionsOption", initkwargs={"warnings_only": True}, func=int
|
|
),
|
|
"choice": dict(opttype="ChoiceOption"),
|
|
#
|
|
"symlink": dict(opttype="SymLinkOption"),
|
|
}
|
|
|
|
|
|
class Param(BaseModel):
|
|
key: str
|
|
model_config = ConfigDict(extra="forbid")
|
|
|
|
def __init__(self,
|
|
path,
|
|
attribute,
|
|
family_is_dynamic,
|
|
is_follower,
|
|
xmlfiles,
|
|
**kwargs,
|
|
) -> None:
|
|
super().__init__(**kwargs)
|
|
|
|
|
|
class AnyParam(Param):
|
|
type: str
|
|
value: BASETYPE
|
|
|
|
|
|
class VariableParam(Param):
|
|
type: str
|
|
variable: str
|
|
propertyerror: bool = True
|
|
optional: bool = False
|
|
|
|
|
|
class SuffixParam(Param):
|
|
type: str
|
|
suffix: Optional[int] = None
|
|
|
|
def __init__(self,
|
|
**kwargs,
|
|
) -> None:
|
|
if not kwargs['family_is_dynamic']:
|
|
msg = f'suffix parameter for "{kwargs["attribute"]}" in "{kwargs["path"]}" cannot be set none dynamic family'
|
|
raise DictConsistencyError(msg, 10, kwargs['xmlfiles'])
|
|
super().__init__(**kwargs)
|
|
|
|
|
|
class InformationParam(Param):
|
|
type: str
|
|
information: str
|
|
variable: Optional[str] = None
|
|
|
|
|
|
class IndexParam(Param):
|
|
type: str
|
|
|
|
def __init__(self,
|
|
**kwargs,
|
|
) -> None:
|
|
|
|
if not kwargs["is_follower"]:
|
|
msg = f'the variable "{kwargs["path"]}" is not a follower, so cannot have index type for param in "{kwargs["attribute"]}"'
|
|
raise DictConsistencyError(msg, 25, kwargs['xmlfiles'])
|
|
super().__init__(**kwargs)
|
|
|
|
|
|
|
|
PARAM_TYPES = {
|
|
"any": AnyParam,
|
|
"variable": VariableParam,
|
|
"suffix": SuffixParam,
|
|
"information": InformationParam,
|
|
"index": IndexParam,
|
|
}
|
|
|
|
|
|
class Calculation(BaseModel):
|
|
path_prefix: Optional[str]
|
|
path: str
|
|
inside_list: bool
|
|
version: str
|
|
namespace: str
|
|
xmlfiles: List[str]
|
|
|
|
model_config = ConfigDict(extra="forbid")
|
|
|
|
def get_realpath(
|
|
self,
|
|
path: str,
|
|
) -> str:
|
|
return get_realpath(path, self.path_prefix)
|
|
|
|
def get_params(self, objectspace):
|
|
if not self.params:
|
|
return {}
|
|
params = {}
|
|
for param_obj in self.params:
|
|
param = param_obj.model_dump()
|
|
if param.get("type") == "variable":
|
|
variable, suffix = objectspace.paths.get_with_dynamic(
|
|
param["variable"], self.path_prefix, self.path, self.version, self.namespace, self.xmlfiles
|
|
)
|
|
if not variable:
|
|
if not param.get("optional"):
|
|
raise Exception(f"cannot find {param['variable']}")
|
|
continue
|
|
if not isinstance(variable, objectspace.variable):
|
|
raise Exception("pfff it's a family")
|
|
param["variable"] = variable
|
|
if suffix:
|
|
param["suffix"] = suffix
|
|
if param.get("type") == "information":
|
|
if param["variable"]:
|
|
variable, suffix = objectspace.paths.get_with_dynamic(
|
|
param["variable"], self.path_prefix, self.path, self.version, self.namespace, self.xmlfiles
|
|
)
|
|
if not variable:
|
|
msg = f'cannot find variable "{param["variable"]}" defined in "{self.attribute_name}" for "{self.path}"'
|
|
raise DictConsistencyError(msg, 14, self.xmlfiles)
|
|
param["variable"] = variable
|
|
if suffix:
|
|
msg = f'variable "{param["variable"]}" defined in "{self.attribute_name}" for "{self.path}" is a dynamic variable'
|
|
raise DictConsistencyError(msg, 15, self.xmlfiles)
|
|
else:
|
|
del param["variable"]
|
|
params[param.pop("key")] = param
|
|
return params
|
|
|
|
|
|
class JinjaCalculation(Calculation):
|
|
attribute_name: Literal[
|
|
"frozen", "hidden", "mandatory", "disabled", "default", "validators", "choices", "dynamic"
|
|
]
|
|
jinja: StrictStr
|
|
params: Optional[List[Param]] = None
|
|
return_type: BASETYPE = None
|
|
|
|
def _jinja_to_function(
|
|
self,
|
|
function,
|
|
return_type,
|
|
multi,
|
|
objectspace,
|
|
*,
|
|
add_help=False,
|
|
params: Optional[dict] = None,
|
|
):
|
|
variable = objectspace.paths[self.path]
|
|
jinja_path = f"{self.attribute_name}_{self.path}"
|
|
idx = 0
|
|
while jinja_path in objectspace.jinja:
|
|
jinja_path = f"{self.attribute_name}_{self.path}_{idx}"
|
|
idx += 1
|
|
objectspace.jinja[jinja_path] = self.jinja
|
|
default = {
|
|
"function": function,
|
|
"params": {
|
|
"__internal_jinja": jinja_path,
|
|
"__internal_type": return_type,
|
|
"__internal_multi": multi,
|
|
},
|
|
}
|
|
if add_help:
|
|
default["help"] = function + "_help"
|
|
if self.params:
|
|
default["params"] |= self.get_params(objectspace)
|
|
if params:
|
|
default["params"] |= params
|
|
for sub_variable, suffix, true_path in get_jinja_variable_to_param(
|
|
self.path,
|
|
self.jinja,
|
|
objectspace,
|
|
variable.xmlfiles,
|
|
objectspace.functions,
|
|
self.path_prefix,
|
|
self.version,
|
|
self.namespace,
|
|
):
|
|
if sub_variable.path in objectspace.variables:
|
|
default["params"][true_path] = {
|
|
"type": "variable",
|
|
"variable": sub_variable,
|
|
}
|
|
if suffix:
|
|
default["params"][true_path]["suffix"] = suffix
|
|
return default
|
|
|
|
def to_function(
|
|
self,
|
|
objectspace,
|
|
) -> dict:
|
|
if self.attribute_name == "default":
|
|
if self.return_type:
|
|
raise Exception("return_type not allowed!")
|
|
variable = objectspace.paths[self.path]
|
|
return_type = variable.type
|
|
if self.inside_list:
|
|
multi = False
|
|
elif self.path in objectspace.followers:
|
|
multi = objectspace.multis[self.path] == "submulti"
|
|
else:
|
|
multi = self.path in objectspace.multis
|
|
return self._jinja_to_function(
|
|
"jinja_to_function",
|
|
return_type,
|
|
multi,
|
|
objectspace,
|
|
)
|
|
elif self.attribute_name == "validators":
|
|
if self.return_type:
|
|
raise Exception("pfff")
|
|
return self._jinja_to_function(
|
|
"valid_with_jinja",
|
|
"string",
|
|
False,
|
|
objectspace,
|
|
)
|
|
elif self.attribute_name in PROPERTY_ATTRIBUTE:
|
|
if self.return_type:
|
|
raise Exception("return_type not allowed!")
|
|
return self._jinja_to_function(
|
|
"jinja_to_property",
|
|
"string",
|
|
False,
|
|
objectspace,
|
|
add_help=True,
|
|
params={None: [self.attribute_name]},
|
|
)
|
|
elif self.attribute_name == "choices":
|
|
return_type = self.return_type
|
|
if return_type is None:
|
|
return_type = "string"
|
|
return self._jinja_to_function(
|
|
"jinja_to_function",
|
|
return_type,
|
|
not self.inside_list,
|
|
objectspace,
|
|
)
|
|
elif self.attribute_name == "dynamic":
|
|
return self._jinja_to_function(
|
|
"jinja_to_function",
|
|
"string",
|
|
True,
|
|
objectspace,
|
|
)
|
|
raise Exception("hu?")
|
|
|
|
|
|
class VariableCalculation(Calculation):
|
|
attribute_name: Literal[
|
|
"frozen", "hidden", "mandatory", "disabled", "default", "choices", "dynamic"
|
|
]
|
|
variable: StrictStr
|
|
propertyerror: bool = True
|
|
allow_none: bool = False
|
|
|
|
def to_function(
|
|
self,
|
|
objectspace,
|
|
) -> dict:
|
|
variable, suffix = objectspace.paths.get_with_dynamic(
|
|
self.variable, self.path_prefix, self.path, self.version, self.namespace, self.xmlfiles
|
|
)
|
|
if not variable:
|
|
msg = f'Variable not found "{self.variable}" for attribut "{self.attribute_name}" for variable "{self.path}"'
|
|
raise DictConsistencyError(msg, 88, self.xmlfiles)
|
|
if not isinstance(variable, objectspace.variable):
|
|
# FIXME remove the pfff
|
|
raise Exception("pfff it's a family")
|
|
param = {
|
|
"type": "variable",
|
|
"variable": variable,
|
|
"propertyerror": self.propertyerror,
|
|
}
|
|
if suffix:
|
|
param["suffix"] = suffix
|
|
params = {None: [param]}
|
|
function = "calc_value"
|
|
help_function = None
|
|
if self.attribute_name in PROPERTY_ATTRIBUTE:
|
|
function = "variable_to_property"
|
|
help_function = "variable_to_property"
|
|
if variable.type != "boolean":
|
|
raise Exception("only boolean!")
|
|
params[None].insert(0, self.attribute_name)
|
|
if self.allow_none:
|
|
params["allow_none"] = True
|
|
# current variable is a multi
|
|
if self.attribute_name in PROPERTY_ATTRIBUTE:
|
|
needs_multi = False
|
|
elif self.attribute_name != "default":
|
|
needs_multi = True
|
|
else:
|
|
needs_multi = self.path in objectspace.multis
|
|
calc_variable_is_multi = variable.path in objectspace.multis or (variable.path in objectspace.paths._dynamics and (suffix is None or suffix[-1] is None) and objectspace.paths._dynamics[variable.path] != objectspace.paths._dynamics.get(self.path))
|
|
if needs_multi:
|
|
if calc_variable_is_multi:
|
|
if self.inside_list:
|
|
msg = f'the variable "{self.path}" has an invalid attribute "{self.attribute_name}", the variable "{variable.path}" is multi but is inside a list'
|
|
raise DictConsistencyError(msg, 18, self.xmlfiles)
|
|
elif not self.inside_list:
|
|
msg = f'the variable "{self.path}" has an invalid attribute "{self.attribute_name}", the variable "{variable.path}" is not multi but is not inside a list'
|
|
raise DictConsistencyError(msg, 20, self.xmlfiles)
|
|
elif self.inside_list:
|
|
msg = f'the variable "{self.path}" has an invalid attribute "{self.attribute_name}", it\'s a list'
|
|
raise DictConsistencyError(msg, 23, self.xmlfiles)
|
|
elif calc_variable_is_multi:
|
|
msg = f'the variable "{self.path}" has an invalid attribute "{self.attribute_name}", the variable "{variable.path}" is a multi'
|
|
raise DictConsistencyError(msg, 21, self.xmlfiles)
|
|
ret = {
|
|
"function": function,
|
|
"params": params,
|
|
}
|
|
if help_function:
|
|
ret["help"] = help_function
|
|
return ret
|
|
|
|
|
|
class InformationCalculation(Calculation):
|
|
attribute_name: Literal["default", "choice", "dynamic"]
|
|
information: StrictStr
|
|
variable: Optional[StrictStr]
|
|
|
|
def to_function(
|
|
self,
|
|
objectspace,
|
|
) -> dict:
|
|
param = {
|
|
"type": "information",
|
|
"information": self.information,
|
|
}
|
|
if self.variable:
|
|
variable, suffix = objectspace.paths.get_with_dynamic(
|
|
self.variable, self.path_prefix, self.path, self.version, self.namespace, self.xmlfiles
|
|
)
|
|
if variable is None or suffix is not None:
|
|
raise Exception("pfff")
|
|
param["variable"] = variable
|
|
return {
|
|
"function": "calc_value",
|
|
"params": {None: [param]},
|
|
}
|
|
|
|
|
|
class SuffixCalculation(Calculation):
|
|
attribute_name: Literal["default", "choice", "dynamic"]
|
|
suffix: Optional[int] = None
|
|
|
|
def to_function(
|
|
self,
|
|
objectspace,
|
|
) -> dict:
|
|
suffix = {"type": "suffix"}
|
|
if self.suffix is not None:
|
|
suffix['suffix'] = self.suffix
|
|
return {
|
|
"function": "calc_value",
|
|
"params": {None: [suffix]},
|
|
}
|
|
|
|
|
|
class IndexCalculation(Calculation):
|
|
attribute_name: Literal["default", "choice", "dynamic"]
|
|
|
|
def to_function(
|
|
self,
|
|
objectspace,
|
|
) -> dict:
|
|
if self.path not in objectspace.followers:
|
|
msg = f'the variable "{self.path}" is not a follower, so cannot have index type for "{self.attribute_name}"'
|
|
raise DictConsistencyError(msg, 60, self.xmlfiles)
|
|
return {
|
|
"function": "calc_value",
|
|
"params": {None: [{"type": "index"}]},
|
|
}
|
|
|
|
|
|
CALCULATION_TYPES = {
|
|
"jinja": JinjaCalculation,
|
|
"variable": VariableCalculation,
|
|
"information": InformationCalculation,
|
|
"suffix": SuffixCalculation,
|
|
"index": IndexCalculation,
|
|
}
|
|
BASETYPE_CALC = Union[StrictBool, StrictInt, StrictFloat, StrictStr, Calculation, None]
|
|
|
|
|
|
class Family(BaseModel):
|
|
name: str
|
|
description: Optional[str] = None
|
|
type: Literal["family", "leadership", "dynamic"] = "family"
|
|
path: str
|
|
help: Optional[str] = None
|
|
mode: Optional[str] = None
|
|
hidden: Union[bool, Calculation] = False
|
|
disabled: Union[bool, Calculation] = False
|
|
namespace: Optional[str]
|
|
xmlfiles: List[str] = []
|
|
|
|
model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
|
|
|
|
|
|
class Dynamic(Family):
|
|
variable: str=None
|
|
# None only for format 1.0
|
|
dynamic: Union[List[Union[StrictStr, Calculation]], Calculation]
|
|
|
|
|
|
class Variable(BaseModel):
|
|
# type will be set dynamically in `annotator/value.py`, default is None
|
|
type: str = None
|
|
name: str
|
|
description: Optional[str] = None
|
|
default: Union[List[BASETYPE_CALC], BASETYPE_CALC] = None
|
|
choices: Optional[Union[List[BASETYPE_CALC], Calculation]] = None
|
|
params: Optional[List[Param]] = None
|
|
validators: Optional[List[Calculation]] = None
|
|
multi: Optional[bool] = None
|
|
unique: Optional[bool] = None
|
|
help: Optional[str] = None
|
|
hidden: Union[bool, Calculation] = False
|
|
disabled: Union[bool, Calculation] = False
|
|
mandatory: Union[None, bool, Calculation] = True
|
|
auto_save: bool = False
|
|
mode: Optional[str] = None
|
|
test: Optional[list] = None
|
|
path: str
|
|
namespace: str
|
|
version: str
|
|
xmlfiles: List[str] = []
|
|
|
|
model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
|
|
|
|
|
|
#class Choice(Variable):
|
|
# type: Literal["choice"] = "choice"
|
|
# choices: Union[List[BASETYPE_CALC], Calculation]
|
|
|
|
|
|
class SymLink(BaseModel):
|
|
name: str
|
|
type: Literal["symlink"] = "symlink"
|
|
opt: Variable
|
|
xmlfiles: List[str] = []
|
|
path: str
|
|
|
|
model_config = ConfigDict(extra="forbid")
|