# -*- coding: utf-8 -*- "takes care of the option's values and multi values" # Copyright (C) 2013-2014 Team tiramisu (see AUTHORS for all contributors) # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by the # Free Software Foundation, either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # ____________________________________________________________ from time import time import sys import weakref from tiramisu.error import ConfigError, SlaveError, PropertiesOptionError from tiramisu.setting import owners, expires_time, undefined from tiramisu.autolib import carry_out_calculation from tiramisu.i18n import _ from tiramisu.option import SymLinkOption, DynSymLinkOption, Option class Values(object): """The `Config`'s root is indeed in charge of the `Option()`'s values, but the values are physicaly located here, in `Values`, wich is also responsible of a caching utility. """ __slots__ = ('context', '_p_', '__weakref__') def __init__(self, context, storage): """ Initializes the values's dict. :param context: the context is the home config's values """ self.context = weakref.ref(context) # the storage type is dictionary or sqlite3 self._p_ = storage def _getcontext(self): """context could be None, we need to test it context is None only if all reference to `Config` object is deleted (for example we delete a `Config` and we manipulate a reference to old `SubConfig`, `Values`, `Multi` or `Settings`) """ context = self.context() if context is None: # pragma: optional cover raise ConfigError(_('the context does not exist anymore')) return context def _getvalue(self, opt, path, is_default, index=undefined, with_meta=True, self_properties=undefined): """actually retrieves the value :param opt: the `option.Option()` object :returns: the option's value (or the default value if not set) """ if opt.impl_is_optiondescription(): # pragma: optional cover raise ValueError(_('optiondescription has no value')) if self_properties is undefined: self_properties = self._getcontext().cfgimpl_get_settings()._getproperties( opt, path, read_write=False) force_default = 'frozen' in self_properties and \ 'force_default_on_freeze' in self_properties if not is_default and not force_default: value = self._p_.getvalue(path) if index is not undefined: try: return value[index] except IndexError: #value is smaller than expected #so return default value pass else: return value #so default value # if value has callback and is not set if opt.impl_has_callback(): callback, callback_params = opt.impl_get_callback() value = carry_out_calculation(opt, context=self._getcontext(), callback=callback, callback_params=callback_params, index=index) try: if isinstance(value, list) and index is not undefined: #if submulti and return a list of list, just return list if opt.impl_is_submulti(): val = value[index] if isinstance(val, list): value = val else: value = value[index] return value except IndexError: pass if with_meta: meta = self._getcontext().cfgimpl_get_meta() if meta is not None: try: value = meta.cfgimpl_get_values( )._get_cached_item(opt, path) if isinstance(value, Multi): if index is not undefined: value = value[index] else: value = list(value) return value except PropertiesOptionError: pass # now try to get default value value = opt.impl_getdefault() if opt.impl_is_multi() and index is not undefined: if value is None: value = opt.impl_getdefault_multi() else: try: value = value[index] except IndexError: value = opt.impl_getdefault_multi() return value def get_modified_values(self): context = self._getcontext() if context._impl_descr is not None: for opt, path in context.cfgimpl_get_description( )._get_force_store_value(): self._getowner(opt, path, force_permissive=True) return self._p_.get_modified_values() def __contains__(self, opt): """ implements the 'in' keyword syntax in order provide a pythonic way to kow if an option have a value :param opt: the `option.Option()` object """ path = opt.impl_getpath(self._getcontext()) return self._contains(path) def _contains(self, path): return self._p_.hasvalue(path) def __delitem__(self, opt): """overrides the builtins `del()` instructions""" self.reset(opt) def reset(self, opt, path=None, validate=True): if path is None: path = opt.impl_getpath(self._getcontext()) context = self._getcontext() if validate: context.cfgimpl_get_settings().validate_properties(opt, False, True, path) hasvalue = self._contains(path) setting = context.cfgimpl_get_settings() setting_properties = setting._getproperties(read_write=False) if 'validator' in setting_properties and validate and hasvalue: fake_context = context._gen_fake_values() fake_value = fake_context.cfgimpl_get_values() fake_value.reset(opt, path, validate=False) opt.impl_validate(getattr(fake_context, path), fake_context, 'validator' in setting_properties) context.cfgimpl_reset_cache() if opt.impl_is_master_slaves('master'): opt.impl_get_master_slaves().reset(opt, self) if hasvalue: self._p_.resetvalue(path) def _isempty(self, opt, value, force_allow_empty_list=False): "convenience method to know if an option is empty" if value is undefined: return False else: empty = opt._empty if opt.impl_is_multi(): if force_allow_empty_list: allow_empty_list = True else: allow_empty_list = opt.impl_allow_empty_list() if allow_empty_list is undefined: if opt.impl_is_master_slaves('slave'): allow_empty_list = True else: allow_empty_list = False isempty = (not allow_empty_list and value == []) or \ None in value or empty in value else: isempty = value is None or value == empty return isempty def __getitem__(self, opt): "enables us to use the pythonic dictionary-like access to values" return self.getitem(opt) def getitem(self, opt, validate=True, force_permissive=False): """ """ return self._get_cached_item(opt, validate=validate, force_permissive=force_permissive) def _get_cached_item(self, opt, path=None, validate=True, force_permissive=False, force_properties=None, validate_properties=True, setting_properties=undefined, self_properties=undefined): untrusted_cached_properties = force_properties is None context = self._getcontext() if path is None: path = opt.impl_getpath(context) ntime = None if setting_properties is undefined: setting_properties = context.cfgimpl_get_settings( )._getproperties(read_write=False) if self_properties is undefined: self_properties = context.cfgimpl_get_settings()._getproperties( opt, path, read_write=False, setting_properties=setting_properties) if 'cache' in setting_properties and self._p_.hascache(path): if 'expire' in setting_properties: ntime = int(time()) is_cached, value = self._p_.getcache(path, ntime) if is_cached: if opt.impl_is_multi() and not isinstance(value, Multi): value = Multi(value, self.context, opt, path) if not untrusted_cached_properties: # revalidate properties (because not default properties) context.cfgimpl_get_settings().validate_properties(opt, False, False, value=value, path=path, force_permissive=force_permissive, force_properties=force_properties, setting_properties=setting_properties, self_properties=self_properties) return value val = self._getitem(opt, path, validate, force_permissive, force_properties, validate_properties, setting_properties, self_properties=self_properties) if 'cache' in setting_properties and validate and validate_properties \ and force_permissive is False and force_properties is None: if 'expire' in setting_properties: if ntime is None: ntime = int(time()) ntime = ntime + expires_time self._p_.setcache(path, val, ntime) return val def _getitem(self, opt, path, validate, force_permissive, force_properties, validate_properties, setting_properties=undefined, self_properties=undefined): if opt.impl_is_master_slaves(): return opt.impl_get_master_slaves().getitem(self, opt, path, validate, force_permissive, force_properties, validate_properties, setting_properties=setting_properties, self_properties=self_properties) else: return self._get_validated_value(opt, path, validate, force_permissive, force_properties, validate_properties, setting_properties=setting_properties, self_properties=self_properties) def _get_validated_value(self, opt, path, validate, force_permissive, force_properties, validate_properties, index=undefined, submulti_index=undefined, with_meta=True, setting_properties=undefined, self_properties=undefined): """same has getitem but don't touch the cache index is None for slave value, if value returned is not a list, just return [] """ context = self._getcontext() setting = context.cfgimpl_get_settings() if setting_properties is undefined: setting_properties = setting._getproperties(read_write=False) if self_properties is undefined: self_properties = setting._getproperties(opt, path, read_write=False) is_default = self._is_default_owner(opt, path, validate_properties=False, validate_meta=False, self_properties=self_properties) try: if index is None: gv_index = undefined else: gv_index = index value = self._getvalue(opt, path, is_default, index=gv_index, with_meta=with_meta, self_properties=self_properties) config_error = None except ConfigError as err: # For calculating properties, we need value (ie for mandatory # value). # If value is calculating with a PropertiesOptionError's option # _getvalue raise a ConfigError. # We can not raise ConfigError if this option should raise # PropertiesOptionError too. So we get config_error and raise # ConfigError if properties did not raise. # cannot assign config_err directly in python 3.3 config_error = err # value is not set, for 'undefined' (cannot set None because of # mandatory property) value = undefined if config_error is None: if index is undefined: force_index = None else: force_index = index if opt.impl_is_multi(): #for slave is a multi if index is None and not isinstance(value, list): value = [] if force_index is None: value = Multi(value, self.context, opt, path) elif opt.impl_is_submulti() and submulti_index is undefined: value = SubMulti(value, self.context, opt, path, force_index) if validate: if submulti_index is undefined: force_submulti_index = None else: force_submulti_index = submulti_index try: opt.impl_validate(value, context, 'validator' in setting_properties, force_index=force_index, force_submulti_index=force_submulti_index) except ValueError, err: config_error = err value = None if is_default and 'force_store_value' in self_properties: if isinstance(value, Multi): item = list(value) else: item = value self.setitem(opt, item, path, is_write=False, force_permissive=force_permissive) if validate_properties: if config_error is not None: # should not raise PropertiesOptionError if option is # mandatory val_props = undefined else: val_props = value setting.validate_properties(opt, False, False, value=val_props, path=path, force_permissive=force_permissive, force_properties=force_properties, setting_properties=setting_properties, self_properties=self_properties) if config_error is not None: raise config_error return value def __setitem__(self, opt, value): # pragma: optional cover raise ConfigError(_('you should only set value with config')) def setitem(self, opt, value, path, force_permissive=False, is_write=True): # is_write is, for example, used with "force_store_value" # user didn't change value, so not write # valid opt context = self._getcontext() setting_properties = context.cfgimpl_get_settings()._getproperties(read_write=False) if 'validator' in setting_properties: fake_context = context._gen_fake_values() fake_context.cfgimpl_get_values()._setitem(opt, value, path, force_permissive, is_write, setting_properties) opt.impl_validate(value, fake_context) self._setitem(opt, value, path, force_permissive, is_write, setting_properties, validate_properties=False) def _setitem(self, opt, value, path, force_permissive, is_write, setting_properties, validate_properties=True): if opt.impl_is_master_slaves(): opt.impl_get_master_slaves().setitem(self, opt, value, path) self._setvalue(opt, path, value, force_permissive=force_permissive, is_write=is_write, setting_properties=setting_properties, validate_properties=validate_properties) def _setvalue(self, opt, path, value, force_permissive=False, is_write=True, validate_properties=True, setting_properties=undefined): context = self._getcontext() context.cfgimpl_reset_cache() if validate_properties: setting = context.cfgimpl_get_settings() setting.validate_properties(opt, False, is_write, value=value, path=path, force_permissive=force_permissive, setting_properties=setting_properties) if isinstance(value, Multi): value = list(value) if opt.impl_is_submulti(): for idx, val in enumerate(value): if isinstance(val, SubMulti): value[idx] = list(val) owner = context.cfgimpl_get_settings().getowner() self._p_.setvalue(path, value, owner) def _is_meta(self, opt, path): context = self._getcontext() setting = context.cfgimpl_get_settings() self_properties = setting._getproperties(opt, path, read_write=False) if 'frozen' in self_properties and 'force_default_on_freeze' in self_properties: return False if self._p_.getowner(path, owners.default) is not owners.default: return False if context.cfgimpl_get_meta() is not None: return True return False def getowner(self, opt, force_permissive=False): """ retrieves the option's owner :param opt: the `option.Option` object :param force_permissive: behaves as if the permissive property was present :returns: a `setting.owners.Owner` object """ if isinstance(opt, SymLinkOption) and \ not isinstance(opt, DynSymLinkOption): opt = opt._impl_getopt() path = opt.impl_getpath(self._getcontext()) return self._getowner(opt, path, force_permissive=force_permissive) def _getowner(self, opt, path, validate_properties=True, force_permissive=False, validate_meta=undefined, self_properties=undefined): """get owner of an option """ if not isinstance(opt, Option) and not isinstance(opt, DynSymLinkOption): raise ConfigError(_('owner only avalaible for an option')) context = self._getcontext() if self_properties is undefined: self_properties = context.cfgimpl_get_settings()._getproperties( opt, path, read_write=False) if 'frozen' in self_properties and 'force_default_on_freeze' in self_properties: return owners.default if validate_properties: self._getitem(opt, path, True, force_permissive, None, True, self_properties=self_properties) owner = self._p_.getowner(path, owners.default) if validate_meta is undefined: if opt.impl_is_master_slaves('slave'): master = opt.impl_get_master_slaves().getmaster(opt) masterp = master.impl_getpath(context) validate_meta = self._is_meta(opt, masterp) else: validate_meta = True if validate_meta: meta = context.cfgimpl_get_meta() if owner is owners.default and meta is not None: owner = meta.cfgimpl_get_values()._getowner(opt, path) return owner def setowner(self, opt, owner): """ sets a owner to an option :param opt: the `option.Option` object :param owner: a valid owner, that is a `setting.owners.Owner` object """ if not isinstance(owner, owners.Owner): # pragma: optional cover raise TypeError(_("invalid generic owner {0}").format(str(owner))) path = opt.impl_getpath(self._getcontext()) self._setowner(opt, path, owner) def _setowner(self, opt, path, owner): if not self._p_.hasvalue(path): # pragma: optional cover raise ConfigError(_('no value for {0} cannot change owner to {1}' '').format(path, owner)) self._getcontext().cfgimpl_get_settings().validate_properties(opt, False, True, path) self._p_.setowner(path, owner) def is_default_owner(self, opt, validate_properties=True, validate_meta=True): """ :param config: *must* be only the **parent** config (not the toplevel config) :return: boolean """ path = opt.impl_getpath(self._getcontext()) return self._is_default_owner(opt, path, validate_properties=validate_properties, validate_meta=validate_meta) def _is_default_owner(self, opt, path, validate_properties=True, validate_meta=True, self_properties=undefined): return self._getowner(opt, path, validate_properties, validate_meta=validate_meta, self_properties=self_properties) == \ owners.default def reset_cache(self, only_expired): """ clears the cache if necessary """ if only_expired: self._p_.reset_expired_cache(int(time())) else: self._p_.reset_all_cache() # information def set_information(self, key, value): """updates the information's attribute :param key: information's key (ex: "help", "doc" :param value: information's value (ex: "the help string") """ self._p_.set_information(key, value) def get_information(self, key, default=undefined): """retrieves one information's item :param key: the item string (ex: "help") """ try: return self._p_.get_information(key) except ValueError: # pragma: optional cover if default is not undefined: return default else: raise ValueError(_("information's item" " not found: {0}").format(key)) def mandatory_warnings(self, force_permissive=False, validate=True): """convenience function to trace Options that are mandatory and where no value has been set :param force_permissive: do raise with permissives properties :type force_permissive: `bool` :param validate: validate value when calculating properties :type validate: `bool` :returns: generator of mandatory Option's path """ context = self._getcontext() settings = context.cfgimpl_get_settings() setting_properties = context.cfgimpl_get_settings()._getproperties( read_write=False) def _mandatory_warnings(description, currpath=None): if currpath is None: currpath = [] for opt in description._impl_getchildren(context=context): name = opt.impl_getname() path = '.'.join(currpath + [name]) if opt.impl_is_optiondescription(): try: settings.validate_properties(opt, True, False, path=path, force_permissive=force_permissive, setting_properties=setting_properties) except PropertiesOptionError as err: pass else: for path in _mandatory_warnings(opt, currpath + [name]): yield path else: if isinstance(opt, SymLinkOption) and \ not isinstance(opt, DynSymLinkOption): true_opt = opt._impl_getopt() true_path = descr.impl_get_path_by_opt(true_opt) else: true_opt = opt true_path = path #FIXME attention c'est réutilisé donc jamais complet ?? self_properties = settings._getproperties(true_opt, true_path, read_write=False, setting_properties=setting_properties) if 'mandatory' in self_properties: try: self._get_cached_item(true_opt, path=true_path, force_properties=frozenset(('mandatory',)), force_permissive=force_permissive, setting_properties=setting_properties, self_properties=self_properties, validate=validate) except PropertiesOptionError as err: if err.proptype == ['mandatory']: yield path except ConfigError as err: if validate: raise err else: #assume that uncalculated value is an empty value yield path descr = self._getcontext().cfgimpl_get_description() for path in _mandatory_warnings(descr): yield path def force_cache(self): """parse all option to force data in cache """ context = self.context() if not 'cache' in context.cfgimpl_get_settings(): raise ConfigError(_('can force cache only if cache ' 'is actived in config')) #remove all cached properties and value to update "expired" time context.cfgimpl_reset_cache() for path in context.cfgimpl_get_description().impl_getpaths( include_groups=True): try: context.getattr(path) except PropertiesOptionError: pass def __getstate__(self): return {'_p_': self._p_} def _impl_setstate(self, storage): self._p_._storage = storage def __setstate__(self, states): self._p_ = states['_p_'] # ____________________________________________________________ # multi types class Multi(list): """multi options values container that support item notation for the values of multi options""" __slots__ = ('opt', 'path', 'context', '__weakref__') def __init__(self, value, context, opt, path): """ :param value: the Multi wraps a list value :param context: the home config that has the values :param opt: the option object that have this Multi value :param path: path of the option """ if value is None: value = [] if not opt.impl_is_submulti() and isinstance(value, Multi): # pragma: optional cover raise ValueError(_('{0} is already a Multi ').format( opt.impl_getname())) self.opt = opt self.path = path if not isinstance(context, weakref.ReferenceType): # pragma: optional cover raise ValueError('context must be a Weakref') self.context = context if not isinstance(value, list): if not '_index' in self.__slots__ and opt.impl_is_submulti(): value = [[value]] else: value = [value] elif value != [] and not '_index' in self.__slots__ and \ opt.impl_is_submulti() and not isinstance(value[0], list): value = [value] super(Multi, self).__init__(value) if opt.impl_is_submulti(): if not '_index' in self.__slots__: for idx, val in enumerate(self): if not isinstance(val, SubMulti): super(Multi, self).__setitem__(idx, SubMulti(val, context, opt, path, idx)) self[idx].submulti = weakref.ref(self) def _getcontext(self): """context could be None, we need to test it context is None only if all reference to `Config` object is deleted (for example we delete a `Config` and we manipulate a reference to old `SubConfig`, `Values`, `Multi` or `Settings`) """ context = self.context() if context is None: # pragma: optional cover raise ConfigError(_('the context does not exist anymore')) return context def __setitem__(self, index, value): self._setitem(index, value) def _setitem(self, index, value, validate=True): context = self._getcontext() setting = context.cfgimpl_get_settings() setting_properties = setting._getproperties(read_write=False) if 'validator' in setting_properties and validate: fake_context = context._gen_fake_values() fake_multi = fake_context.cfgimpl_get_values()._get_cached_item( self.opt, path=self.path, validate=False) fake_multi._setitem(index, value, validate=False) self._validate(value, fake_context, index, True) #assume not checking mandatory property super(Multi, self).__setitem__(index, value) context.cfgimpl_get_values()._setvalue(self.opt, self.path, self, setting_properties=setting_properties) #def __repr__(self, *args, **kwargs): # return super(Multi, self).__repr__(*args, **kwargs) #def __getitem__(self, y): # return super(Multi, self).__getitem__(y) def _get_validated_value(self, index): values = self._getcontext().cfgimpl_get_values() return values._get_validated_value(self.opt, self.path, True, False, None, True, index=index) def append(self, value=undefined, force=False, setitem=True, validate=True): """the list value can be updated (appened) only if the option is a master """ if not force and self.opt.impl_is_master_slaves('slave'): # pragma: optional cover raise SlaveError(_("cannot append a value on a multi option {0}" " which is a slave").format(self.opt.impl_getname())) index = self.__len__() if value is undefined: try: value = self._get_validated_value(index) except IndexError: value = None context = self._getcontext() setting = context.cfgimpl_get_settings() setting_properties = setting._getproperties(read_write=False) if 'validator' in setting_properties and validate and value not in [None, undefined]: fake_context = context._gen_fake_values() fake_multi = fake_context.cfgimpl_get_values()._get_cached_item( self.opt, path=self.path, validate=False) fake_multi.append(value, validate=False, force=True) self._validate(value, fake_context, index, True) if not '_index' in self.__slots__ and self.opt.impl_is_submulti(): if not isinstance(value, SubMulti): value = SubMulti(value, self.context, self.opt, self.path, index) value.submulti = weakref.ref(self) super(Multi, self).append(value) if setitem: self._store(force) def sort(self, cmp=None, key=None, reverse=False): if self.opt.impl_is_master_slaves(): raise SlaveError(_("cannot sort multi option {0} if master or slave" "").format(self.opt.impl_getname())) if sys.version_info[0] >= 3: if cmp is not None: raise ValueError(_('cmp is not permitted in python v3 or ' 'greater')) super(Multi, self).sort(key=key, reverse=reverse) else: super(Multi, self).sort(cmp=cmp, key=key, reverse=reverse) self._store() def reverse(self): if self.opt.impl_is_master_slaves(): raise SlaveError(_("cannot reverse multi option {0} if master or " "slave").format(self.opt.impl_getname())) super(Multi, self).reverse() self._store() def insert(self, index, value, validate=True): #FIXME value should be undefined if self.opt.impl_is_master_slaves(): raise SlaveError(_("cannot insert multi option {0} if master or " "slave").format(self.opt.impl_getname())) context = self._getcontext() setting = setting = context.cfgimpl_get_settings() setting_properties = setting._getproperties(read_write=False) if 'validator' in setting_properties and validate and value is not None: fake_context = context._gen_fake_values() fake_multi = fake_context.cfgimpl_get_values()._get_cached_item( self.opt, path=self.path, validate=False) fake_multi.insert(index, value, validate=False) self._validate(value, fake_context, index, True) super(Multi, self).insert(index, value) self._store() def extend(self, iterable, validate=True): if self.opt.impl_is_master_slaves(): raise SlaveError(_("cannot extend multi option {0} if master or " "slave").format(self.opt.impl_getname())) try: index = self._index except: index = None context = self._getcontext() setting = context.cfgimpl_get_settings() setting_properties = setting._getproperties(read_write=False) if 'validator' in setting_properties and validate: fake_context = context._gen_fake_values() fake_multi = fake_context.cfgimpl_get_values()._get_cached_item( self.opt, path=self.path, validate=False) fake_multi.extend(iterable, validate=False) self._validate(iterable, fake_context, index) super(Multi, self).extend(iterable) self._store() def _validate(self, value, fake_context, force_index, submulti=False): self.opt.impl_validate(value, context=fake_context, force_index=force_index) def pop(self, index, force=False): """the list value can be updated (poped) only if the option is a master :param index: remove item a index :type index: int :param force: force pop item (withoud check master/slave) :type force: boolean :returns: item at index """ context = self._getcontext() if not force: if self.opt.impl_is_master_slaves('slave'): # pragma: optional cover raise SlaveError(_("cannot pop a value on a multi option {0}" " which is a slave").format(self.opt.impl_getname())) if self.opt.impl_is_master_slaves('master'): self.opt.impl_get_master_slaves().pop(self.opt, context.cfgimpl_get_values(), index) #set value without valid properties ret = super(Multi, self).pop(index) self._store(force) return ret def _store(self, force=False): self._getcontext().cfgimpl_get_values()._setvalue(self.opt, self.path, self, validate_properties=not force) class SubMulti(Multi): __slots__ = ('_index', 'submulti') def __init__(self, value, context, opt, path, index): """ :param index: index (only for slave with submulti) :type index: `int` """ self._index = index super(SubMulti, self).__init__(value, context, opt, path) def append(self, value=undefined): super(SubMulti, self).append(value, force=True) def pop(self, index): return super(SubMulti, self).pop(index, force=True) def __setitem__(self, index, value): self._setitem(index, value) def _store(self, force=False): #force is unused here self._getcontext().cfgimpl_get_values()._setvalue(self.opt, self.path, self.submulti()) def _validate(self, value, fake_context, force_index, submulti=False): if value is not None: if submulti is False: super(SubMulti, self)._validate(value, fake_context, force_index, submulti) else: self.opt.impl_validate(value, context=fake_context, force_index=self._index, force_submulti_index=force_index) def _get_validated_value(self, index): values = self._getcontext().cfgimpl_get_values() return values._get_validated_value(self.opt, self.path, True, False, None, True, index=index, submulti_index=self._index)