# -*- coding: utf-8 -*- "option types and option description for the configuration management" # Copyright (C) 2012-2013 Team tiramisu (see AUTHORS for all contributors) # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # The original `Config` design model is unproudly borrowed from # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ import re from copy import copy from types import FunctionType from tiramisu.error import (ConfigError, NotFoundError, ConflictConfigError, RequiresError, RequirementRecursionError, PropertiesOptionError) from tiramisu.autolib import carry_out_calculation from tiramisu.setting import groups, multitypes name_regexp = re.compile(r'^\d+') def valid_name(name): try: name = str(name) except: raise ValueError("not a valid string name") if re.match(name_regexp, name) is None: return True else: return False #____________________________________________________________ # class BaseInformation(object): __slots__ = ('informations') def set_information(self, key, value): """updates the information's attribute (wich is a dictionnary) :param key: information's key (ex: "help", "doc" :param value: information's value (ex: "the help string") """ self.informations[key] = value def get_information(self, key, default=None): """retrieves one information's item :param key: the item string (ex: "help") """ if key in self.informations: return self.informations[key] elif default is not None: return default else: raise ValueError("Information's item not found: {0}".format(key)) class Option(BaseInformation): """ Abstract base class for configuration option's. Reminder: an Option object is **not** a container for the value """ __slots__ = ('_name', '_requires', 'multi', '_validator', 'default_multi', 'default', '_properties', 'callback', 'multitype', 'master_slaves') def __init__(self, name, doc, default=None, default_multi=None, requires=None, multi=False, callback=None, callback_params=None, validator=None, validator_args=None, properties=None): """ :param name: the option's name :param doc: the option's description :param default: specifies the default value of the option, for a multi : ['bla', 'bla', 'bla'] :param default_multi: 'bla' (used in case of a reset to default only at a given index) :param requires: is a list of names of options located anywhere in the configuration. :param multi: if true, the option's value is a list :param callback: the name of a function. If set, the function's output is responsible of the option's value :param callback_params: the callback's parameter :param validator: the name of a function wich stands for a custom validation of the value :param validator_args: the validator's parameters """ if not valid_name(name): raise NameError("invalid name: {0} for option".format(name)) self._name = name self.informations = {} self.set_information('doc', doc) validate_requires_arg(requires, self._name) self._requires = requires self.multi = multi #self._validator_args = None if validator is not None: if type(validator) != FunctionType: raise TypeError("validator must be a function") self._validator = (validator, validator_args) else: self._validator = None if not self.multi and default_multi is not None: raise ConfigError("a default_multi is set whereas multi is False" " in option: {0}".format(name)) if default_multi is not None and not self._validate(default_multi): raise ConfigError("invalid default_multi value {0} " "for option {1}".format(str(default_multi), name)) if callback is not None and (default is not None or default_multi is not None): raise ConfigError("defaut values not allowed if option: {0} " "is calculated".format(name)) if callback is None and callback_params is not None: raise ConfigError("params defined for a callback function but " "no callback defined yet for option {0}".format(name)) if callback is not None: self.callback = (callback, callback_params) else: self.callback = None if self.multi: if default is None: default = [] if not isinstance(default, list): raise ConfigError("invalid default value {0} " "for option {1} : not list type" "".format(str(default), name)) if not self.validate(default, False): raise ConfigError("invalid default value {0} " "for option {1}" "".format(str(default), name)) self.multitype = multitypes.default self.default_multi = default_multi else: if default is not None and not self.validate(default, False): raise ConfigError("invalid default value {0} " "for option {1}".format(str(default), name)) self.default = default if properties is None: properties = () if not isinstance(properties, tuple): raise ConfigError('invalid properties type {0} for {1},' ' must be a tuple'.format(type(properties), self._name)) self._properties = properties # 'hidden', 'disabled'... def validate(self, value, validate=True): """ :param value: the option's value :param validate: if true enables ``self._validator`` validation """ # generic calculation if not self.multi: # None allows the reset of the value if value is not None: # customizing the validator if validate and self._validator is not None and \ not self._validator[0](value, **self._validator[1]): return False return self._validate(value) else: if not isinstance(value, list): raise ConfigError("invalid value {0} " "for option {1} which must be a list" "".format(value, self._name)) for val in value: # None allows the reset of the value if val is not None: # customizing the validator if validate and self._validator is not None and \ not self._validator[0](val, **self._validator[1]): return False if not self._validate(val): return False return True def getdefault(self, default_multi=False): "accessing the default value" if not default_multi or not self.is_multi(): return self.default else: return self.getdefault_multi() def getdefault_multi(self): "accessing the default value for a multi" return self.default_multi def is_empty_by_default(self): "no default value has been set yet" if ((not self.is_multi() and self.default is None) or (self.is_multi() and (self.default == [] or None in self.default))): return True return False def getdoc(self): "accesses the Option's doc" return self.get_information('doc') def has_callback(self): "to know if a callback has been defined or not" if self.callback is None: return False else: return True def getcallback_value(self, config): callback, callback_params = self.callback if callback_params is None: callback_params = {} return carry_out_calculation(self._name, config=config, callback=callback, callback_params=callback_params) def reset(self, config): """resets the default value and owner """ config._cfgimpl_context._cfgimpl_values.reset(self) def setoption(self, config, value): """changes the option's value with the value_owner's who :param config: the parent config is necessary here to store the value """ name = self._name setting = config.cfgimpl_get_settings() if not self.validate(value, setting.has_property('validator')): raise ConfigError('invalid value %s for option %s' % (value, name)) if self not in config._cfgimpl_descr._children[1]: raise AttributeError('unknown option %s' % (name)) if setting.has_property('everything_frozen'): raise TypeError("cannot set a value to the option {} if the whole " "config has been frozen".format(name)) if setting.has_property('frozen') and setting.has_property('frozen', self): raise TypeError('cannot change the value to %s for ' 'option %s this option is frozen' % (str(value), name)) apply_requires(self, config) config.cfgimpl_get_values()[self] = value def getkey(self, value): return value def is_multi(self): return self.multi class ChoiceOption(Option): __slots__ = ('values', 'open_values', 'opt_type') opt_type = 'string' def __init__(self, name, doc, values, default=None, default_multi=None, requires=None, multi=False, callback=None, callback_params=None, open_values=False, validator=None, validator_args=None, properties=()): if not isinstance(values, tuple): raise ConfigError('values must be a tuple for {0}'.format(name)) self.values = values if open_values not in (True, False): raise ConfigError('Open_values must be a boolean for ' '{0}'.format(name)) self.open_values = open_values super(ChoiceOption, self).__init__(name, doc, default=default, default_multi=default_multi, callback=callback, callback_params=callback_params, requires=requires, multi=multi, validator=validator, validator_args=validator_args, properties=properties) def _validate(self, value): if not self.open_values: return value is None or value in self.values else: return True class BoolOption(Option): __slots__ = ('opt_type') opt_type = 'bool' def _validate(self, value): return isinstance(value, bool) class IntOption(Option): __slots__ = ('opt_type') opt_type = 'int' def _validate(self, value): return isinstance(value, int) class FloatOption(Option): __slots__ = ('opt_type') opt_type = 'float' def _validate(self, value): return isinstance(value, float) class StrOption(Option): __slots__ = ('opt_type') opt_type = 'string' def _validate(self, value): return isinstance(value, str) class UnicodeOption(Option): __slots__ = ('opt_type') opt_type = 'unicode' def _validate(self, value): return isinstance(value, unicode) class SymLinkOption(object): __slots__ = ('_name', 'opt') opt_type = 'symlink' def __init__(self, name, path, opt): self._name = name self.opt = opt def setoption(self, config, value): context = config.cfgimpl_get_context() path = context.cfgimpl_get_description().get_path_by_opt(self.opt) setattr(context, path, value) def __getattr__(self, name): if name in ('_name', 'opt', 'setoption'): return object.__gettattr__(self, name) else: return getattr(self.opt, name) class IPOption(Option): __slots__ = ('opt_type') opt_type = 'ip' def _validate(self, value): # by now the validation is nothing but a string, use IPy instead return isinstance(value, str) class NetmaskOption(Option): __slots__ = ('opt_type') opt_type = 'netmask' def _validate(self, value): # by now the validation is nothing but a string, use IPy instead return isinstance(value, str) class OptionDescription(BaseInformation): """Config's schema (organisation, group) and container of Options""" __slots__ = ('_name', '_requires', '_cache_paths', '_group_type', '_properties', '_children') def __init__(self, name, doc, children, requires=None, properties=()): """ :param children: is a list of option descriptions (including ``OptionDescription`` instances for nested namespaces). """ if not valid_name(name): raise NameError("invalid name: {0} for option descr".format(name)) self._name = name self.informations = {} self.set_information('doc', doc) child_names = [child._name for child in children] #better performance like this valid_child = copy(child_names) valid_child.sort() old = None for child in valid_child: if child == old: raise ConflictConfigError('duplicate option name: ' '{0}'.format(child)) old = child self._children = (tuple(child_names), tuple(children)) validate_requires_arg(requires, self._name) self._requires = requires self._cache_paths = None if not isinstance(properties, tuple): raise ConfigError('invalid properties type {0} for {1},' ' must be a tuple'.format(type(properties), self._name)) self._properties = properties # 'hidden', 'disabled'... # the group_type is useful for filtering OptionDescriptions in a config self._group_type = groups.default def getdoc(self): return self.get_information('doc') def __getattr__(self, name): try: return self._children[1][self._children[0].index(name)] except ValueError: raise AttributeError('unknown Option {} in OptionDescription {}' ''.format(name, self._name)) def getkey(self, config): return tuple([child.getkey(getattr(config, child._name)) for child in self._children[1]]) def getpaths(self, include_groups=False, _currpath=None): """returns a list of all paths in self, recursively _currpath should not be provided (helps with recursion) """ #FIXME : cache if _currpath is None: _currpath = [] paths = [] for option in self._children[1]: attr = option._name if isinstance(option, OptionDescription): if include_groups: paths.append('.'.join(_currpath + [attr])) paths += option.getpaths(include_groups=include_groups, _currpath=_currpath + [attr]) else: paths.append('.'.join(_currpath + [attr])) return paths def getchildren(self): return self._children[1] def build_cache(self, cache_path=None, cache_option=None, _currpath=None): if _currpath is None and self._cache_paths is not None: return if _currpath is None: save = True _currpath = [] else: save = False if cache_path is None: cache_path = [self._name] cache_option = [self] for option in self._children[1]: attr = option._name if attr.startswith('_cfgimpl'): continue cache_option.append(option) cache_path.append(str('.'.join(_currpath + [attr]))) if isinstance(option, OptionDescription): _currpath.append(attr) option.build_cache(cache_path, cache_option, _currpath) _currpath.pop() if save: #valid no duplicated option valid_child = copy(cache_option) valid_child.sort() old = None for child in valid_child: if child == old: raise ConflictConfigError('duplicate option: ' '{0}'.format(child)) old = child self._cache_paths = (tuple(cache_option), tuple(cache_path)) def get_opt_by_path(self, path): try: return self._cache_paths[0][self._cache_paths[1].index(path)] except ValueError: raise NotFoundError('no option for path {}'.format(path)) def get_path_by_opt(self, opt): try: return self._cache_paths[1][self._cache_paths[0].index(opt)] except ValueError: raise NotFoundError('no option {} found'.format(opt)) # ____________________________________________________________ def set_group_type(self, group_type): """sets a given group object to an OptionDescription :param group_type: an instance of `GroupType` or `MasterGroupType` that lives in `setting.groups` """ if self._group_type != groups.default: ConfigError('cannot change group_type if already set ' '(old {}, new {})'.format(self._group_type, group_type)) if isinstance(group_type, groups.GroupType): self._group_type = group_type if isinstance(group_type, groups.MasterGroupType): #if master (same name has group) is set identical_master_child_name = False #for collect all slaves slaves = [] master = None for child in self._children[1]: if isinstance(child, OptionDescription): raise ConfigError("master group {} shall not have " "a subgroup".format(self._name)) if not child.multi: raise ConfigError("not allowed option {0} in group {1}" ": this option is not a multi" "".format(child._name, self._name)) if child._name == self._name: identical_master_child_name = True child.multitype = multitypes.master master = child else: slaves.append(child) if master is None: raise ConfigError('master group with wrong master name for {}' ''.format(self._name)) master.master_slaves = tuple(slaves) for child in self._children[1]: if child != master: child.master_slaves = master child.multitype = multitypes.slave if not identical_master_child_name: raise ConfigError("the master group: {} has not any " "master child".format(self._name)) else: raise ConfigError('not allowed group_type : {0}'.format(group_type)) def get_group_type(self): return self._group_type def validate_requires_arg(requires, name): "check malformed requirements" if requires is not None: config_action = {} for req in requires: if not type(req) == tuple: raise RequiresError("malformed requirements type for option:" " {0}, must be a tuple".format(name)) if len(req) == 3: action = req[2] inverse = False elif len(req) == 4: action = req[2] inverse = req[3] else: raise RequiresError("malformed requirements for option: {0}" " invalid len".format(name)) if action in config_action: if inverse != config_action[action]: raise RequiresError("inconsistency in action types for option: {0}" " action: {1}".format(name, action)) else: config_action[action] = inverse def apply_requires(opt, config): "carries out the jit (just in time requirements between options" def build_actions(requires): "action are hide, show, enable, disable..." trigger_actions = {} for require in requires: action = require[2] trigger_actions.setdefault(action, []).append(require) return trigger_actions #for symlink if hasattr(opt, '_requires') and opt._requires is not None: # filters the callbacks setting = config.cfgimpl_get_settings() trigger_actions = build_actions(opt._requires) if isinstance(opt, OptionDescription): optpath = config._cfgimpl_get_path() + '.' + opt._name else: optpath = config.cfgimpl_get_context().cfgimpl_get_description().get_path_by_opt(opt) for requires in trigger_actions.values(): matches = False for require in requires: if len(require) == 3: path, expected, action = require inverse = False elif len(require) == 4: path, expected, action, inverse = require if path.startswith(optpath): raise RequirementRecursionError("malformed requirements " "imbrication detected for option: '{0}' " "with requirement on: '{1}'".format(optpath, path)) try: value = config.cfgimpl_get_context()._getattr(path, force_permissive=True) except PropertiesOptionError, err: properties = err.proptype raise NotFoundError("option '{0}' has requirement's property error: " "{1} {2}".format(opt._name, path, properties)) except Exception, err: raise NotFoundError("required option not found: " "{0}".format(path)) if value == expected: if inverse: setting.del_property(action, opt) else: setting.add_property(action, opt) matches = True #FIXME optimisation : fait un double break non ? voire un return # no requirement has been triggered, then just reverse the action if not matches: if inverse: setting.add_property(action, opt) else: setting.del_property(action, opt)