# -*- coding: utf-8 -*- "pretty small and local configuration management tool" # Copyright (C) 2012 Team tiramisu (see README for all contributors) # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # The original `Config` design model is unproudly borrowed from # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the whole pypy projet is under MIT licence # ____________________________________________________________ from error import (HiddenOptionError, ConfigError, NotFoundError, AmbigousOptionError, ConflictConfigError, NoMatchingOptionFound, SpecialOwnersError, MandatoryError, MethodCallError, DisabledOptionError, ModeOptionError) from option import (OptionDescription, Option, SymLinkOption, group_types, apply_requires, modes) import autolib from autolib import special_owners, special_owner_factory # ____________________________________________________________ class Config(object): _cfgimpl_hidden = True _cfgimpl_disabled = True _cfgimpl_mandatory = True _cfgimpl_frozen = False _cfgimpl_owner = "user" _cfgimpl_toplevel = None _cfgimpl_mode = 'normal' def __init__(self, descr, parent=None, **overrides): self._cfgimpl_descr = descr self._cfgimpl_value_owners = {} self._cfgimpl_parent = parent # `Config()` indeed takes care of the `Option()`'s values self._cfgimpl_values = {} self._cfgimpl_previous_values = {} # XXX warnings are a great idea, let's make up a better use of it self._cfgimpl_warnings = [] self._cfgimpl_toplevel = self._cfgimpl_get_toplevel() # `freeze()` allows us to carry out this calculation again if necessary self._cfgimpl_frozen = self._cfgimpl_toplevel._cfgimpl_frozen self._cfgimpl_build(overrides) def _validate_duplicates(self, children): duplicates = [] for dup in children: if dup._name not in duplicates: duplicates.append(dup._name) else: raise ConflictConfigError('duplicate option name: ' '{0}'.format(dup._name)) def _cfgimpl_build(self, overrides): self._validate_duplicates(self._cfgimpl_descr._children) for child in self._cfgimpl_descr._children: if isinstance(child, Option): self._cfgimpl_values[child._name] = child.getdefault() self._cfgimpl_previous_values[child._name] = child.getdefault() if child.getcallback() is not None: if child._is_hidden(): self._cfgimpl_value_owners[child._name] = 'auto' else: self._cfgimpl_value_owners[child._name] = 'fill' else: if child.is_multi(): self._cfgimpl_value_owners[child._name] = ['default'] else: self._cfgimpl_value_owners[child._name] = 'default' elif isinstance(child, OptionDescription): self._validate_duplicates(child._children) self._cfgimpl_values[child._name] = Config(child, parent=self) self.override(overrides) def cfgimpl_update(self): "dynamically adds `Option()` or `OptionDescription()`" # Nothing is static. Everything evolve. # FIXME this is an update for new options in the schema only # see the update_child() method of the descr object for child in self._cfgimpl_descr._children: if isinstance(child, Option): if child._name not in self._cfgimpl_values: self._cfgimpl_values[child._name] = child.getdefault() # FIXME and ['default', ...] if is_multi() ? self._cfgimpl_value_owners[child._name] = 'default' elif isinstance(child, OptionDescription): if child._name not in self._cfgimpl_values: self._cfgimpl_values[child._name] = Config(child, parent=self) def override(self, overrides): for name, value in overrides.iteritems(): homeconfig, name = self._cfgimpl_get_home_by_path(name) # if there are special_owners, impossible to override if homeconfig._cfgimpl_value_owners[name] in special_owners: raise SpecialOwnersError("cannot override option: {0} because " "of its special owner".format(name)) homeconfig.setoption(name, value, 'default') def cfgimpl_set_owner(self, owner): self._cfgimpl_owner = owner for child in self._cfgimpl_descr._children: if isinstance(child, OptionDescription): self._cfgimpl_values[child._name].cfgimpl_set_owner(owner) # ____________________________________________________________ def cfgimpl_hide(self): if self._cfgimpl_parent != None: raise MethodCallError("this method root_hide() shall not be" "used with non-root Config() object") rootconfig = self._cfgimpl_get_toplevel() rootconfig._cfgimpl_hidden = True def cfgimpl_show(self): if self._cfgimpl_parent != None: raise MethodCallError("this method root_hide() shall not be" "used with non-root Config() object") rootconfig = self._cfgimpl_get_toplevel() rootconfig._cfgimpl_hidden = False # ____________________________________________________________ def cfgimpl_disable(self): if self._cfgimpl_parent != None: raise MethodCallError("this method root_hide() shall not be" "used with non-root Confit() object") rootconfig = self._cfgimpl_get_toplevel() rootconfig._cfgimpl_disabled = True def cfgimpl_enable(self): if self._cfgimpl_parent != None: raise MethodCallError("this method root_hide() shall not be" "used with non-root Confit() object") rootconfig = self._cfgimpl_get_toplevel() rootconfig._cfgimpl_disabled = False # ____________________________________________________________ def __setattr__(self, name, value): if '.' in name: homeconfig, name = self._cfgimpl_get_home_by_path(name) return setattr(homeconfig, name, value) if name.startswith('_cfgimpl_'): self.__dict__[name] = value return if self._cfgimpl_frozen and getattr(self, name) != value: raise TypeError("trying to change a value in a frozen config" ": {0} {1}".format(name, value)) if type(getattr(self._cfgimpl_descr, name)) != SymLinkOption: self._validate(name, getattr(self._cfgimpl_descr, name)) self.setoption(name, value, self._cfgimpl_owner) def _validate(self, name, opt_or_descr): if not type(opt_or_descr) == OptionDescription: apply_requires(opt_or_descr, self) # hidden options if self._cfgimpl_toplevel._cfgimpl_hidden and \ (opt_or_descr._is_hidden() or self._cfgimpl_descr._is_hidden()): raise HiddenOptionError("trying to access to a hidden option:" " {0}".format(name)) # disabled options if self._cfgimpl_toplevel._cfgimpl_disabled and \ (opt_or_descr._is_disabled() or self._cfgimpl_descr._is_disabled()): raise DisabledOptionError("this option is disabled:" " {0}".format(name)) # expert options # XXX currently doesn't look at the group, is it really necessary ? if self._cfgimpl_toplevel._cfgimpl_mode != 'normal': if opt_or_descr.get_mode() != 'normal': raise ModeOptionError("this option's mode is not normal:" " {0}".format(name)) if type(opt_or_descr) == OptionDescription: apply_requires(opt_or_descr, self) def __getattr__(self, name): # attribute access by passing a path, # for instance getattr(self, "creole.general.family.adresse_ip_eth0") if '.' in name: homeconfig, name = self._cfgimpl_get_home_by_path(name) return getattr(homeconfig, name) opt_or_descr = getattr(self._cfgimpl_descr, name) # symlink options if type(opt_or_descr) == SymLinkOption: return getattr(self, opt_or_descr.path) self._validate(name, opt_or_descr) # special attributes if name.startswith('_cfgimpl_'): # if it were in __dict__ it would have been found already return self.__dict__[name] raise AttributeError("%s object has no attribute %s" % (self.__class__, name)) if name not in self._cfgimpl_values: raise AttributeError("%s object has no attribute %s" % (self.__class__, name)) if name in self._cfgimpl_value_owners: owner = self._cfgimpl_value_owners[name] # special owners if owner in special_owners: value = self._cfgimpl_values[name] if value != None: if opt_or_descr.is_multi(): if owner == 'fill' and None not in value: return value else: if owner == 'fill' and value != None: return value result = special_owner_factory(name, owner, value=value, callback=opt_or_descr.getcallback(), callback_params=opt_or_descr.getcallback_params(), config=self._cfgimpl_get_toplevel()) # this result **shall not** be a list # for example, [1, 2, 3, None] -> [1, 2, 3, result] # if type(result) == list: raise ConfigError('invalid calculated value returned' ' for option {0} : shall not be a list'.format(name)) if result != None and not opt_or_descr._validate(result): raise ConfigError('invalid calculated value returned' ' for option {0}'.format(name)) if opt_or_descr.is_multi(): if value == None: _result = [result] else: _result = [] for val in value: if val == None: val = result _result.append(val) else: _result = result return _result # mandatory options if not isinstance(opt_or_descr, OptionDescription): homeconfig = self._cfgimpl_get_toplevel() mandatory = homeconfig._cfgimpl_mandatory if opt_or_descr.is_mandatory() and mandatory: if self._cfgimpl_values[name] == None\ and opt_or_descr.getdefault() == None: raise MandatoryError("option: {0} is mandatory " "and shall have a value".format(name)) return self._cfgimpl_values[name] def __dir__(self): #from_type = dir(type(self)) from_dict = list(self.__dict__) extras = list(self._cfgimpl_values) return sorted(set(extras + from_dict)) def unwrap_from_name(self, name): # didn't have to stoop so low: `self.get()` must be the proper method # **and it is slow**: it recursively searches into the namespaces paths = self.getpaths(allpaths=True) opts = dict([(path, self.unwrap_from_path(path)) for path in paths]) all_paths = [p.split(".") for p in self.getpaths()] for pth in all_paths: if name in pth: return opts[".".join(pth)] raise NotFoundError("name: {0} not found".format(name)) def unwrap_from_path(self, path): # didn't have to stoop so low, `geattr(self, path)` is much better # **fast**: finds the option directly in the appropriate namespace if '.' in path: homeconfig, path = self._cfgimpl_get_home_by_path(path) return getattr(homeconfig._cfgimpl_descr, path) return getattr(self._cfgimpl_descr, path) def __delattr__(self, name): # if you use delattr you are responsible for all bad things happening if name.startswith('_cfgimpl_'): del self.__dict__[name] return self._cfgimpl_value_owners[name] = 'default' opt = getattr(self._cfgimpl_descr, name) if isinstance(opt, OptionDescription): raise AttributeError("can't option subgroup") self._cfgimpl_values[name] = getattr(opt, 'default', None) def setoption(self, name, value, who=None): #who is **not necessarily** a owner, because it cannot be a list child = getattr(self._cfgimpl_descr, name) if who == None: if child.is_multi(): newowner = [self._cfgimpl_owner for i in range(len(value))] else: newowner = self._cfgimpl_owner else: if type(child) != SymLinkOption: if child.is_multi(): if type(value) != list: raise ConfigError("invalid value for option:" " {0} that is set to multi".format(name)) newowner = [who for i in range(len(value))] else: newowner = who if type(child) != SymLinkOption: if name not in self._cfgimpl_values: raise AttributeError('unknown option %s' % (name,)) # special owners, a value with a owner *auto* cannot be changed oldowner = self._cfgimpl_value_owners[child._name] if oldowner == 'auto': if who == 'auto': raise ConflictConfigError('cannot override value to %s for ' 'option %s' % (value, name)) if oldowner == who: oldvalue = getattr(self, name) if oldvalue == value: #or who in ("default",): return child.setoption(self, value, who) # if the value owner is 'auto', set the option to hidden if who == 'auto': if not child._is_hidden(): child.hide() if (value is None and who != 'default' and not child.is_multi()): child.setowner(self, 'default') self._cfgimpl_values[name] = child.getdefault() elif (value == [] and who != 'default' and child.is_multi()): child.setowner(self, ['default' for i in range(len(child.getdefault()))]) self._cfgimpl_values[name] = child.getdefault() else: child.setowner(self, newowner) else: homeconfig = self._cfgimpl_get_toplevel() child.setoption(homeconfig, value, who) def set(self, **kwargs): all_paths = [p.split(".") for p in self.getpaths(allpaths=True)] for key, value in kwargs.iteritems(): key_p = key.split('.') candidates = [p for p in all_paths if p[-len(key_p):] == key_p] if len(candidates) == 1: name = '.'.join(candidates[0]) homeconfig, name = self._cfgimpl_get_home_by_path(name) try: getattr(homeconfig, name) except MandatoryError: pass except Exception, e: raise e # HiddenOptionError or DisabledOptionError homeconfig.setoption(name, value, self._cfgimpl_owner) elif len(candidates) > 1: raise AmbigousOptionError( 'more than one option that ends with %s' % (key, )) else: raise NoMatchingOptionFound( 'there is no option that matches %s' ' or the option is hidden or disabled'% (key, )) def get(self, name): paths = self.getpaths(allpaths=True) pathsvalues = [] for path in paths: pathname = path.split('.')[-1] if pathname == name: try: value = getattr(self, path) return value except Exception, e: raise e raise NotFoundError("option {0} not found in config".format(name)) def _cfgimpl_get_home_by_path(self, path): """returns tuple (config, name)""" path = path.split('.') for step in path[:-1]: self = getattr(self, step) return self, path[-1] def _cfgimpl_get_toplevel(self): while self._cfgimpl_parent is not None: self = self._cfgimpl_parent return self def cfgimpl_previous_value(self, path): home, name = self._cfgimpl_get_home_by_path(path) return home._cfgimpl_previous_values[name] def get_previous_value(self, name): return self._cfgimpl_previous_values[name] def add_warning(self, warning): self._cfgimpl_get_toplevel()._cfgimpl_warnings.append(warning) def get_warnings(self): return self._cfgimpl_get_toplevel()._cfgimpl_warnings # ____________________________________________________________ # freeze and read-write statuses def cfgimpl_freeze(self): rootconfig = self._cfgimpl_get_toplevel() rootconfig._cfgimpl_frozen = True self._cfgimpl_frozen = True def cfgimpl_unfreeze(self): rootconfig = self._cfgimpl_get_toplevel() rootconfig._cfgimpl_frozen = False self._cfgimpl_frozen = False def is_frozen(self): # it should be the same value as self._cfgimpl_frozen... rootconfig = self._cfgimpl_get_toplevel() return rootconfig.__dict__['_cfgimpl_frozen'] def cfgimpl_read_only(self): # hung up on freeze, hidden and disabled concepts self.cfgimpl_freeze() rootconfig = self._cfgimpl_get_toplevel() rootconfig._cfgimpl_hidden = False rootconfig._cfgimpl_disabled = True rootconfig._cfgimpl_mandatory = True def cfgimpl_set_mode(self, mode): # normal or expert mode rootconfig = self._cfgimpl_get_toplevel() if mode not in modes: raise ConfigError("mode {0} not available".format(mode)) rootconfig._cfgimpl_mode = mode def cfgimpl_read_write(self): # hung up on freeze, hidden and disabled concepts self.cfgimpl_unfreeze() rootconfig = self._cfgimpl_get_toplevel() rootconfig._cfgimpl_hidden = True rootconfig._cfgimpl_disabled = False rootconfig._cfgimpl_mandatory = False # ____________________________________________________________ def getkey(self): return self._cfgimpl_descr.getkey(self) def __hash__(self): return hash(self.getkey()) def __eq__(self, other): return self.getkey() == other.getkey() def __ne__(self, other): return not self == other def __iter__(self): # iteration only on Options (not OptionDescriptions) for child in self._cfgimpl_descr._children: if isinstance(child, Option): try: yield child._name, getattr(self, child._name) except: pass # hidden, disabled option group def iter_groups(self, group_type=None): "iteration on OptionDescriptions" if group_type == None: groups = group_types else: if group_type not in group_types: raise TypeError("Unknown group_type: {0}".format(group_type)) groups = [group_type] for child in self._cfgimpl_descr._children: if isinstance(child, OptionDescription): try: if child.get_group_type() in groups: yield child._name, getattr(self, child._name) except: pass # hidden, disabled option def __str__(self, indent=""): lines = [] children = [(child._name, child) for child in self._cfgimpl_descr._children] children.sort() for name, child in children: if self._cfgimpl_value_owners.get(name, None) == 'default': continue value = getattr(self, name) if isinstance(value, Config): substr = value.__str__(indent + " ") else: substr = "%s %s = %s" % (indent, name, value) if substr: lines.append(substr) if indent and not lines: return '' # hide subgroups with all default values lines.insert(0, "%s[%s]" % (indent, self._cfgimpl_descr._name,)) return '\n'.join(lines) def getpaths(self, include_groups=False, allpaths=False): """returns a list of all paths in self, recursively, taking care of the context (hidden/disabled) """ paths = [] for path in self._cfgimpl_descr.getpaths(include_groups=include_groups): try: value = getattr(self, path) except Exception, e: if not allpaths: pass # hidden or disabled option else: paths.append(path) # hidden or disabled option added else: paths.append(path) return paths def make_dict(config, flatten=False): paths = config.getpaths() pathsvalues = [] for path in paths: if flatten: pathname = path.split('.')[-1] else: pathname = path try: value = getattr(config, path) pathsvalues.append((pathname, value)) except: pass # this just a hidden or disabled option options = dict(pathsvalues) return options # ____________________________________________________________