# -*- coding: utf-8 -*- "option types and option description for the configuration management" # Copyright (C) 2012-2013 Team tiramisu (see AUTHORS for all contributors) # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # The original `Config` design model is unproudly borrowed from # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ import re from copy import copy from types import FunctionType from IPy import IP from tiramisu.error import ConflictConfigError from tiramisu.setting import groups, multitypes from tiramisu.i18n import _ name_regexp = re.compile(r'^\d+') def valid_name(name): try: name = str(name) except: return False if re.match(name_regexp, name) is None: return True else: return False #____________________________________________________________ # class BaseInformation(object): __slots__ = ('_informations') def set_information(self, key, value): """updates the information's attribute (wich is a dictionnary) :param key: information's key (ex: "help", "doc" :param value: information's value (ex: "the help string") """ self._informations[key] = value def get_information(self, key, default=None): """retrieves one information's item :param key: the item string (ex: "help") """ if key in self._informations: return self._informations[key] elif default is not None: return default else: raise ValueError(_("Information's item not found: {0}").format(key)) class Option(BaseInformation): """ Abstract base class for configuration option's. Reminder: an Option object is **not** a container for the value """ __slots__ = ('_name', '_requires', '_multi', '_validator', '_default_multi', '_default', '_properties', '_callback', '_multitype', '_master_slaves', '_consistencies') def __init__(self, name, doc, default=None, default_multi=None, requires=None, multi=False, callback=None, callback_params=None, validator=None, validator_args=None, properties=None): #FIXME : validation de callback et callback_params !!! """ :param name: the option's name :param doc: the option's description :param default: specifies the default value of the option, for a multi : ['bla', 'bla', 'bla'] :param default_multi: 'bla' (used in case of a reset to default only at a given index) :param requires: is a list of names of options located anywhere in the configuration. :param multi: if true, the option's value is a list :param callback: the name of a function. If set, the function's output is responsible of the option's value :param callback_params: the callback's parameter :param validator: the name of a function wich stands for a custom validation of the value :param validator_args: the validator's parameters """ if not valid_name(name): raise ValueError(_("invalid name: {0} for option").format(name)) self._name = name self._informations = {} self.set_information('doc', doc) validate_requires_arg(requires, self._name) self._requires = requires self._multi = multi self._consistencies = None if validator is not None: if type(validator) != FunctionType: raise TypeError(_("validator must be a function")) if validator_args is None: validator_args = {} self._validator = (validator, validator_args) else: self._validator = None if not self._multi and default_multi is not None: raise ValueError(_("a default_multi is set whereas multi is False" " in option: {0}").format(name)) if default_multi is not None and not self._validate(default_multi): raise ValueError(_("invalid default_multi value {0} " "for option {1}").format(str(default_multi), name)) if callback is not None and (default is not None or default_multi is not None): raise ValueError(_("defaut values not allowed if option: {0} " "is calculated").format(name)) if callback is None and callback_params is not None: raise ValueError(_("params defined for a callback function but " "no callback defined yet for option {0}").format(name)) if callback is not None: self._callback = (callback, callback_params) else: self._callback = None if self._multi: if default is None: default = [] #if not isinstance(default, list): # raise ValidateError("invalid default value {0} " # "for option {1} : not list type" # "".format(str(default), name)) if not self.validate(default): raise ValueError(_("invalid default value {0} " "for option {1}" "").format(str(default), name)) self._multitype = multitypes.default self._default_multi = default_multi else: if default is not None and not self.validate(default): raise ValueError(_("invalid default value {0} " "for option {1}").format(str(default), name)) self._default = default if properties is None: properties = () if not isinstance(properties, tuple): raise TypeError(_('invalid properties type {0} for {1},' ' must be a tuple').format(type(properties), self._name)) self._properties = properties # 'hidden', 'disabled'... def validate(self, value, context=None, validate=True): """ :param value: the option's value :param validate: if true enables ``self._validator`` validation """ # generic calculation if context is not None: cons = context.cfgimpl_get_description() else: cons = None if not self._multi: # None allows the reset of the value if value is not None: # customizing the validator if validate and self._validator is not None and \ not self._validator[0](value, **self._validator[1]): return False if not self._validate(value): return False if cons is not None: return cons.valid_consistency(self, value, context, None) else: if not isinstance(value, list): raise ValueError(_("invalid value {0} " "for option {1} which must be a list" "").format(value, self._name)) for index in range(0, len(value)): val = value[index] # None allows the reset of the value if val is not None: # customizing the validator if validate and self._validator is not None and \ not self._validator[0](val, **self._validator[1]): return False if not self._validate(val): return False if cons is not None and not cons.valid_consistency(self, val, context, index): return False return True def getdefault(self, default_multi=False): "accessing the default value" if not default_multi or not self.is_multi(): return self._default else: return self.getdefault_multi() def getdefault_multi(self): "accessing the default value for a multi" return self._default_multi def get_multitype(self): return self._multitype def get_master_slaves(self): return self._master_slaves def is_empty_by_default(self): "no default value has been set yet" if ((not self.is_multi() and self._default is None) or (self.is_multi() and (self._default == [] or None in self._default))): return True return False def getdoc(self): "accesses the Option's doc" return self.get_information('doc') def has_callback(self): "to know if a callback has been defined or not" if self._callback is None: return False else: return True def reset(self, config): """resets the default value and owner """ config._cfgimpl_context._cfgimpl_values.reset(self) def getkey(self, value): return value def is_multi(self): return self._multi def cons_not_equal(self, opt, value, context, index, opts): values = [value] descr = context.cfgimpl_get_description() for opt_ in opts: if opt_ is not opt: path = descr.get_path_by_opt(opt_) val = context._getattr(path, validate=False) if val is not None: if val in values: return False values.append(val) return True def add_consistency(self, func, opts): pass if self._consistencies is None: self._consistencies = [] if self not in opts: opts = list(opts) opts.append(self) opts = tuple(opts) self._consistencies.append(('cons_{}'.format(func), opts)) class ChoiceOption(Option): __slots__ = ('_values', '_open_values', 'opt_type') opt_type = 'string' def __init__(self, name, doc, values, default=None, default_multi=None, requires=None, multi=False, callback=None, callback_params=None, open_values=False, validator=None, validator_args=None, properties=()): if not isinstance(values, tuple): raise TypeError(_('values must be a tuple for {0}').format(name)) self._values = values if open_values not in (True, False): raise TypeError(_('open_values must be a boolean for ' '{0}').format(name)) self._open_values = open_values super(ChoiceOption, self).__init__(name, doc, default=default, default_multi=default_multi, callback=callback, callback_params=callback_params, requires=requires, multi=multi, validator=validator, validator_args=validator_args, properties=properties) def _validate(self, value): if not self._open_values: return value is None or value in self._values else: return True class BoolOption(Option): __slots__ = ('opt_type') opt_type = 'bool' def _validate(self, value): return isinstance(value, bool) class IntOption(Option): __slots__ = ('opt_type') opt_type = 'int' def _validate(self, value): return isinstance(value, int) class FloatOption(Option): __slots__ = ('opt_type') opt_type = 'float' def _validate(self, value): return isinstance(value, float) class StrOption(Option): __slots__ = ('opt_type') opt_type = 'string' def _validate(self, value): return isinstance(value, str) class UnicodeOption(Option): __slots__ = ('opt_type') opt_type = 'unicode' def _validate(self, value): return isinstance(value, unicode) class SymLinkOption(object): __slots__ = ('_name', 'opt', '_consistencies') opt_type = 'symlink' _consistencies = None def __init__(self, name, path, opt): self._name = name self.opt = opt def setoption(self, context, value): path = context.cfgimpl_get_description().get_path_by_opt(self.opt) setattr(context, path, value) def __getattr__(self, name): if name in ('_name', 'opt', 'setoption'): return object.__gettattr__(self, name) else: return getattr(self.opt, name) class IPOption(Option): __slots__ = ('opt_type', '_only_private') opt_type = 'ip' def set_private(self): self._only_private = True def _validate(self, value): try: only_private = self._only_private except AttributeError: only_private = False try: ip = IP('{0}/32'.format(value)) if only_private: return ip.iptype() == 'PRIVATE' return True except ValueError: return False class NetworkOption(Option): __slots__ = ('opt_type') opt_type = 'network' def _validate(self, value): try: IP(value) return True except ValueError: return False class NetmaskOption(Option): __slots__ = ('opt_type') opt_type = 'netmask' def __init__(self, name, doc, default=None, default_multi=None, requires=None, multi=False, callback=None, callback_params=None, validator=None, validator_args=None, properties=None, opt_ip=None): if opt_ip is not None and not isinstance(opt_ip, IPOption) and \ not isinstance(opt_ip, NetworkOption): raise TypeError(_('opt_ip must be a IPOption not {}').format(type(opt_ip))) super(NetmaskOption, self).__init__(name, doc, default=default, default_multi=default_multi, callback=callback, callback_params=callback_params, requires=requires, multi=multi, validator=validator, validator_args=validator_args, properties=properties) if opt_ip is None: pass elif isinstance(opt_ip, IPOption): self._consistencies = [('cons_ip_netmask', (self, opt_ip))] elif isinstance(opt_ip, NetworkOption): self._consistencies = [('cons_network_netmask', (self, opt_ip))] else: raise TypeError(_('unknown type for opt_ip')) def _validate(self, value): try: IP('0.0.0.0/{}'.format(value)) return True except ValueError: return False def cons_network_netmask(self, opt, value, context, index, opts): #opts must be (netmask, network) options return self._cons_netmask(opt, value, context, index, opts, False) def cons_ip_netmask(self, opt, value, context, index, opts): #opts must be (netmask, ip) options return self._cons_netmask(opt, value, context, index, opts, True) def _cons_netmask(self, opt, value, context, index, opts, make_net): opt_netmask, opt_ipnetwork = opts descr = context.cfgimpl_get_description() if opt is opt_ipnetwork: val_ipnetwork = value path = descr.get_path_by_opt(opt_netmask) val_netmask = context._getattr(path, validate=False) if opt_netmask.is_multi(): val_netmask = val_netmask[index] if val_netmask is None: return True else: val_netmask = value path = descr.get_path_by_opt(opt_ipnetwork) val_ipnetwork = getattr(context, path) if opt_ipnetwork.is_multi(): val_ipnetwork = val_ipnetwork[index] if val_ipnetwork is None: return True try: IP('{}/{}'.format(val_ipnetwork, val_netmask, make_net=make_net)) return True except ValueError: return False class OptionDescription(BaseInformation): """Config's schema (organisation, group) and container of Options""" __slots__ = ('_name', '_requires', '_cache_paths', '_group_type', '_properties', '_children', '_consistencies') def __init__(self, name, doc, children, requires=None, properties=()): """ :param children: is a list of option descriptions (including ``OptionDescription`` instances for nested namespaces). """ if not valid_name(name): raise ValueError(_("invalid name: {0} for option descr").format(name)) self._name = name self._informations = {} self.set_information('doc', doc) child_names = [child._name for child in children] #better performance like this valid_child = copy(child_names) valid_child.sort() old = None for child in valid_child: if child == old: raise ConflictConfigError(_('duplicate option name: ' '{0}').format(child)) old = child self._children = (tuple(child_names), tuple(children)) validate_requires_arg(requires, self._name) self._requires = requires self._cache_paths = None self._consistencies = None if not isinstance(properties, tuple): raise TypeError(_('invalid properties type {0} for {1},' ' must be a tuple').format(type(properties), self._name)) self._properties = properties # 'hidden', 'disabled'... # the group_type is useful for filtering OptionDescriptions in a config self._group_type = groups.default def getdoc(self): return self.get_information('doc') def __getattr__(self, name): try: return self._children[1][self._children[0].index(name)] except ValueError: raise AttributeError(_('unknown Option {} in OptionDescription {}' '').format(name, self._name)) def getkey(self, config): return tuple([child.getkey(getattr(config, child._name)) for child in self._children[1]]) def getpaths(self, include_groups=False, _currpath=None): """returns a list of all paths in self, recursively _currpath should not be provided (helps with recursion) """ #FIXME : cache if _currpath is None: _currpath = [] paths = [] for option in self._children[1]: attr = option._name if isinstance(option, OptionDescription): if include_groups: paths.append('.'.join(_currpath + [attr])) paths += option.getpaths(include_groups=include_groups, _currpath=_currpath + [attr]) else: paths.append('.'.join(_currpath + [attr])) return paths def getchildren(self): return self._children[1] def build_cache(self, cache_path=None, cache_option=None, _currpath=None, _consistencies=None): if _currpath is None and self._cache_paths is not None: return if _currpath is None: save = True _currpath = [] _consistencies = {} else: save = False if cache_path is None: cache_path = [self._name] cache_option = [self] for option in self._children[1]: attr = option._name if attr.startswith('_cfgimpl'): continue cache_option.append(option) cache_path.append(str('.'.join(_currpath + [attr]))) if not isinstance(option, OptionDescription): if option._consistencies is not None: for consistency in option._consistencies: func, opts = consistency for opt in opts: _consistencies.setdefault(opt, []).append((func, opts)) else: _currpath.append(attr) option.build_cache(cache_path, cache_option, _currpath, _consistencies) _currpath.pop() if save: #valid no duplicated option valid_child = copy(cache_option) valid_child.sort() old = None for child in valid_child: if child == old: raise ConflictConfigError(_('duplicate option: ' '{0}').format(child)) old = child self._cache_paths = (tuple(cache_option), tuple(cache_path)) self._consistencies = _consistencies def get_opt_by_path(self, path): try: return self._cache_paths[0][self._cache_paths[1].index(path)] except ValueError: raise AttributeError(_('no option for path {}').format(path)) def get_path_by_opt(self, opt): try: return self._cache_paths[1][self._cache_paths[0].index(opt)] except ValueError: raise AttributeError(_('no option {} found').format(opt)) # ____________________________________________________________ def set_group_type(self, group_type): """sets a given group object to an OptionDescription :param group_type: an instance of `GroupType` or `MasterGroupType` that lives in `setting.groups` """ if self._group_type != groups.default: raise TypeError(_('cannot change group_type if already set ' '(old {}, new {})').format(self._group_type, group_type)) if isinstance(group_type, groups.GroupType): self._group_type = group_type if isinstance(group_type, groups.MasterGroupType): #if master (same name has group) is set identical_master_child_name = False #for collect all slaves slaves = [] master = None for child in self._children[1]: if isinstance(child, OptionDescription): raise ValueError(_("master group {} shall not have " "a subgroup").format(self._name)) if not child.is_multi(): raise ValueError(_("not allowed option {0} in group {1}" ": this option is not a multi" "").format(child._name, self._name)) if child._name == self._name: identical_master_child_name = True child._multitype = multitypes.master master = child else: slaves.append(child) if master is None: raise ValueError(_('master group with wrong master name for {}' '').format(self._name)) master._master_slaves = tuple(slaves) for child in self._children[1]: if child != master: child._master_slaves = master child._multitype = multitypes.slave if not identical_master_child_name: raise ValueError(_("the master group: {} has not any " "master child").format(self._name)) else: raise ValueError(_('not allowed group_type : {0}').format(group_type)) def get_group_type(self): return self._group_type def valid_consistency(self, opt, value, context, index): consistencies = self._consistencies.get(opt) if consistencies is not None: for consistency in consistencies: func, opts = consistency ret = getattr(opts[0], func)(opt, value, context, index, opts) if ret is False: return False return True def validate_requires_arg(requires, name): "check malformed requirements" if requires is not None: config_action = {} for req in requires: if not type(req) == tuple: raise ValueError(_("malformed requirements type for option:" " {0}, must be a tuple").format(name)) if len(req) == 3: action = req[2] inverse = False elif len(req) == 4: action = req[2] inverse = req[3] else: raise ValueError(_("malformed requirements for option: {0}" " invalid len").format(name)) if action in config_action: if inverse != config_action[action]: raise ValueError(_("inconsistency in action types for option: {0}" " action: {1}").format(name, action)) else: config_action[action] = inverse