add CIDR support

This commit is contained in:
Emmanuel Garette 2019-02-24 15:11:08 +01:00
parent 7c710d50b9
commit af5da925e2
7 changed files with 216 additions and 81 deletions

View file

@ -3,8 +3,8 @@ do_autopath()
import warnings import warnings
from py.test import raises from py.test import raises
from tiramisu import Config ,IPOption, NetworkOption, NetmaskOption, \ from tiramisu import Config, IPOption, NetworkOption, NetmaskOption, \
PortOption, BroadcastOption, OptionDescription PortOption, BroadcastOption, OptionDescription
from tiramisu.error import ValueWarning from tiramisu.error import ValueWarning
from tiramisu.storage import list_sessions from tiramisu.storage import list_sessions
@ -38,6 +38,21 @@ def test_ip():
assert len(w) == 1 assert len(w) == 1
def test_ip_cidr():
b = IPOption('b', '', private_only=True, cidr=True)
c = IPOption('c', '', private_only=True)
warnings.simplefilter("always", ValueWarning)
od = OptionDescription('od', '', [b, c])
config = Config(od)
raises(ValueError, "config.option('b').value.set('192.168.1.1')")
config.option('b').value.set('192.168.1.1/24')
raises(ValueError, "config.option('b').value.set('192.168.1.1/32')")
#
config.option('c').value.set('192.168.1.1')
raises(ValueError, "config.option('c').value.set('192.168.1.1/24')")
raises(ValueError, "config.option('c').value.set('192.168.1.1/32')")
def test_ip_default(): def test_ip_default():
a = IPOption('a', '', '88.88.88.88') a = IPOption('a', '', '88.88.88.88')
od = OptionDescription('od', '', [a]) od = OptionDescription('od', '', [a])
@ -79,6 +94,18 @@ def test_network():
assert len(w) == 1 assert len(w) == 1
def test_network_cidr():
a = NetworkOption('a', '', cidr=True)
od = OptionDescription('od', '', [a])
cfg = Config(od)
cfg.option('a').value.set('192.168.1.1/32')
cfg.option('a').value.set('192.168.1.0/24')
cfg.option('a').value.set('88.88.88.88/32')
cfg.option('a').value.set('0.0.0.0/0')
raises(ValueError, "cfg.option('a').value.set('192.168.1.1')")
raises(ValueError, "cfg.option('a').value.set('192.168.1.1/24')")
def test_network_invalid(): def test_network_invalid():
raises(ValueError, "NetworkOption('a', '', default='toto')") raises(ValueError, "NetworkOption('a', '', default='toto')")

View file

@ -913,8 +913,9 @@ def test_consistency_ip_netmask_dyndescription():
cfg.option('dodval1.aval1').value.set('192.168.1.1') cfg.option('dodval1.aval1').value.set('192.168.1.1')
cfg.option('dodval1.bval1').value.set('255.255.255.0') cfg.option('dodval1.bval1').value.set('255.255.255.0')
cfg.option('dodval2.aval2').value.set('192.168.1.2') cfg.option('dodval2.aval2').value.set('192.168.1.2')
cfg.option('dodval2.bval2').value.set('255.255.255.255') cfg.option('dodval2.bval2').value.set('255.255.255.128')
cfg.option('dodval2.bval2').value.set('255.255.255.0') cfg.option('dodval2.bval2').value.set('255.255.255.0')
raises(ValueError, "cfg.option('dodval2.bval2').value.set('255.255.255.255')")
def test_consistency_ip_in_network_dyndescription(): def test_consistency_ip_in_network_dyndescription():

View file

@ -500,7 +500,7 @@ def test_consistency_ip_netmask():
api.option('a').value.set('192.168.1.1') api.option('a').value.set('192.168.1.1')
api.option('b').value.set('255.255.255.0') api.option('b').value.set('255.255.255.0')
api.option('a').value.set('192.168.1.2') api.option('a').value.set('192.168.1.2')
api.option('b').value.set('255.255.255.255') api.option('b').value.set('255.255.255.128')
api.option('b').value.set('255.255.255.0') api.option('b').value.set('255.255.255.0')
raises(ValueError, "api.option('a').value.set('192.168.1.0')") raises(ValueError, "api.option('a').value.set('192.168.1.0')")
raises(ValueError, "api.option('a').value.set('192.168.1.255')") raises(ValueError, "api.option('a').value.set('192.168.1.255')")
@ -567,6 +567,25 @@ def test_consistency_ip_in_network():
assert len(w) == 1 assert len(w) == 1
def test_consistency_ip_in_network_cidr():
a = NetworkOption('a', '', cidr=True)
c = IPOption('c', '')
d = IPOption('d', '')
od = OptionDescription('od', '', [a, c, d])
c.impl_add_consistency('in_network', a)
d.impl_add_consistency('in_network', a, warnings_only=True)
warnings.simplefilter("always", ValueWarning)
api = Config(od)
api.option('a').value.set('192.168.1.0/24')
api.option('c').value.set('192.168.1.1')
raises(ValueError, "api.option('c').value.set('192.168.2.1')")
raises(ValueError, "api.option('c').value.set('192.168.1.0')")
raises(ValueError, "api.option('c').value.set('192.168.1.255')")
with warnings.catch_warnings(record=True) as w:
api.option('d').value.set('192.168.2.1')
assert len(w) == 1
def test_consistency_ip_in_network_invalid(): def test_consistency_ip_in_network_invalid():
a = NetworkOption('a', '') a = NetworkOption('a', '')
b = NetmaskOption('b', '') b = NetmaskOption('b', '')
@ -593,7 +612,7 @@ def test_consistency_ip_netmask_multi():
api.option('a.a').value.set(['192.168.1.1']) api.option('a.a').value.set(['192.168.1.1'])
api.option('a.b', 0).value.set('255.255.255.0') api.option('a.b', 0).value.set('255.255.255.0')
api.option('a.a').value.set(['192.168.1.2']) api.option('a.a').value.set(['192.168.1.2'])
api.option('a.b', 0).value.set('255.255.255.255') api.option('a.b', 0).value.set('255.255.255.128')
api.option('a.b', 0).value.set('255.255.255.0') api.option('a.b', 0).value.set('255.255.255.0')
raises(ValueError, "api.option('a.a').value.set(['192.168.1.0'])") raises(ValueError, "api.option('a.a').value.set(['192.168.1.0'])")
# #
@ -722,7 +741,7 @@ def test_consistency_ip_netmask_multi_leader():
api.option('a.a').value.set(['192.168.1.1']) api.option('a.a').value.set(['192.168.1.1'])
api.option('a.b', 0).value.set('255.255.255.0') api.option('a.b', 0).value.set('255.255.255.0')
api.option('a.a').value.set(['192.168.1.2']) api.option('a.a').value.set(['192.168.1.2'])
api.option('a.b', 0).value.set('255.255.255.255') api.option('a.b', 0).value.set('255.255.255.128')
api.option('a.b', 0).value.set('255.255.255.0') api.option('a.b', 0).value.set('255.255.255.0')
raises(ValueError, "api.option('a.a').value.set(['192.168.1.0'])") raises(ValueError, "api.option('a.a').value.set(['192.168.1.0'])")
api.option('a.a').value.set(['192.168.1.128']) api.option('a.a').value.set(['192.168.1.128'])

View file

@ -18,13 +18,15 @@
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence # the whole pypy projet is under MIT licence
# ____________________________________________________________ # ____________________________________________________________
from ipaddress import ip_address, ip_network, IPv4Address from ipaddress import ip_address, ip_interface, ip_network, IPv4Address, IPv4Interface
from ..error import ConfigError from ..error import ConfigError
from ..setting import undefined, Undefined, OptionBag from ..setting import undefined, Undefined, OptionBag
from ..i18n import _ from ..i18n import _
from .option import Option from .option import Option
from .stroption import StrOption from .stroption import StrOption
from .netmaskoption import NetmaskOption
from .networkoption import NetworkOption
class IPOption(StrOption): class IPOption(StrOption):
@ -46,22 +48,24 @@ class IPOption(StrOption):
properties=None, properties=None,
private_only=False, private_only=False,
allow_reserved=False, allow_reserved=False,
warnings_only=False): warnings_only=False,
cidr=False):
extra = {'_private_only': private_only, extra = {'_private_only': private_only,
'_allow_reserved': allow_reserved} '_allow_reserved': allow_reserved,
super(IPOption, self).__init__(name, '_cidr': cidr}
doc, super().__init__(name,
default=default, doc,
default_multi=default_multi, default=default,
callback=callback, default_multi=default_multi,
callback_params=callback_params, callback=callback,
requires=requires, callback_params=callback_params,
multi=multi, requires=requires,
validator=validator, multi=multi,
validator_params=validator_params, validator=validator,
properties=properties, validator_params=validator_params,
warnings_only=warnings_only, properties=properties,
extra=extra) warnings_only=warnings_only,
extra=extra)
def _validate(self, def _validate(self,
value: str, value: str,
@ -73,32 +77,53 @@ class IPOption(StrOption):
raise ValueError(_('invalid string')) raise ValueError(_('invalid string'))
if value.count('.') != 3: if value.count('.') != 3:
raise ValueError() raise ValueError()
for val in value.split('.'): cidr = self.impl_get_extra('_cidr')
if cidr:
if '/' not in value:
raise ValueError(_('must use CIDR notation'))
value_ = value.split('/')[0]
else:
value_ = value
for val in value_.split('.'):
if val.startswith("0") and len(val) > 1: if val.startswith("0") and len(val) > 1:
raise ValueError() raise ValueError()
# 'standard' validation # 'standard' validation
try: try:
if not isinstance(ip_address(value), IPv4Address): if not cidr:
raise ValueError() if not isinstance(ip_address(value), IPv4Address):
raise ValueError()
else:
if not isinstance(ip_interface(value), IPv4Address):
raise ValueError()
except ValueError: except ValueError:
raise ValueError() raise ValueError()
def _second_level_validation(self, def _second_level_validation(self,
value, value,
warnings_only): warnings_only):
ip = ip_address(value) ip = ip_interface(value)
if not self.impl_get_extra('_allow_reserved') and ip.is_reserved: if not self.impl_get_extra('_allow_reserved') and ip.is_reserved:
if warnings_only: if warnings_only:
msg = _("shouldn't in reserved class") msg = _("shouldn't be reserved IP")
else: else:
msg = _("mustn't be in reserved class") msg = _("mustn't be reserved IP")
raise ValueError(msg) raise ValueError(msg)
if self.impl_get_extra('_private_only') and not ip.is_private: if self.impl_get_extra('_private_only') and not ip.is_private:
if warnings_only: if warnings_only:
msg = _("should be in private class") msg = _("should be private IP")
else: else:
msg = _("must be in private class") msg = _("must be private IP")
raise ValueError(msg) raise ValueError(msg)
if '/' in value:
net = NetmaskOption(self.impl_getname(),
self.impl_get_display_name(),
str(ip.netmask))
net._cons_ip_netmask(self,
(net, self),
(str(ip.netmask), str(ip.ip)),
warnings_only,
None,
True)
def _cons_in_network(self, def _cons_in_network(self,
current_opt, current_opt,
@ -106,22 +131,46 @@ class IPOption(StrOption):
vals, vals,
warnings_only, warnings_only,
context): context):
if len(vals) != 3 and context is undefined: if len(opts) == 2 and isinstance(opts[0], IPOption) and \
raise ConfigError(_('ip_network needs an IP, a network and a netmask')) opts[0].impl_get_extra('_cidr') == False and \
if len(vals) != 3 or None in vals: isinstance(opts[1], NetworkOption) and \
return opts[1].impl_get_extra('_cidr') == True:
ip, network, netmask = vals if None in vals:
if ip_address(ip) not in ip_network('{0}/{1}'.format(network, return
netmask)): ip, network = vals
msg = _('"{4}" is not in network "{0}"/"{1}" ("{2}"/"{3}")') network_obj = ip_network(network)
raise ValueError(msg.format(network, if ip_interface(ip) not in network_obj:
netmask, msg = _('"{0}" is not in network "{1}" ("{2}")')
opts[1].impl_get_display_name(), raise ValueError(msg.format(ip,
opts[2].impl_get_display_name(), network,
ip)) opts[1].impl_get_display_name()))
# test if ip is not network/broadcast IP # test if ip is not network/broadcast IP
opts[2]._cons_ip_netmask(current_opt, netmask = NetmaskOption(self.impl_getname(),
(opts[2], opts[0]), self.impl_get_display_name(),
(netmask, ip), str(network_obj.netmask))
warnings_only, netmask._cons_ip_netmask(self,
context) (netmask, self),
(str(network_obj.netmask), str(ip)),
warnings_only,
None,
True)
else:
if len(vals) != 3 and context is undefined:
raise ConfigError(_('ip_network needs an IP, a network and a netmask'))
if len(vals) != 3 or None in vals:
return
ip, network, netmask = vals
if ip_interface(ip) not in ip_network('{0}/{1}'.format(network,
netmask)):
msg = _('"{4}" is not in network "{0}"/"{1}" ("{2}"/"{3}")')
raise ValueError(msg.format(network,
netmask,
opts[1].impl_get_display_name(),
opts[2].impl_get_display_name(),
ip))
# test if ip is not network/broadcast IP
opts[2]._cons_ip_netmask(current_opt,
(opts[2], opts[0]),
(netmask, ip),
warnings_only,
context)

View file

@ -60,18 +60,18 @@ class NetmaskOption(StrOption):
if None in vals or len(vals) != 2: if None in vals or len(vals) != 2:
return return
msg = None msg = None
val_netmask, val_ipnetwork = vals val_netmask, val_network = vals
try: try:
ip_network('{0}/{1}'.format(val_ipnetwork, val_netmask)) ip_network('{0}/{1}'.format(val_network, val_netmask))
except ValueError: except ValueError:
if current_opt == opts[1]: if current_opt == opts[1]:
raise ValueError(_('with netmask "{0}" ("{1}")').format(val_netmask, opts[0].impl_get_display_name())) raise ValueError(_('with netmask "{0}" ("{1}")').format(val_netmask, opts[0].impl_get_display_name()))
else: else:
raise ValueError(_('with network "{0}" ("{1}")').format(val_ipnetwork, opts[1].impl_get_display_name())) raise ValueError(_('with network "{0}" ("{1}")').format(val_network, opts[1].impl_get_display_name()))
if msg is not None: if msg is not None:
self.raise_err(msg, self.raise_err(msg,
val_netmask, val_netmask,
val_ipnetwork, val_network,
current_opt, current_opt,
opts, opts,
'network') 'network')
@ -81,38 +81,41 @@ class NetmaskOption(StrOption):
opts, opts,
vals, vals,
warnings_only, warnings_only,
context): context,
#opts must be (netmask, ip) options _cidr=False):
# opts must be (netmask, ip) options
if context is undefined and len(vals) != 2: if context is undefined and len(vals) != 2:
raise ConfigError(_('ip_netmask needs an IP and a netmask')) raise ConfigError(_('ip_netmask needs an IP and a netmask'))
if None in vals or len(vals) != 2: if None in vals or len(vals) != 2:
return return
msg = None msg = None
val_netmask, val_ipnetwork = vals val_netmask, val_ip = vals
try: try:
ip = ip_interface('{0}/{1}'.format(val_ipnetwork, val_netmask)) ip = ip_interface('{0}/{1}'.format(val_ip, val_netmask))
network = ip.network network = ip.network
#if not ip same has network # if not ip same has network
if str(network.netmask) != '255.255.255.255': if ip.ip == network.network_address:
if ip.ip == network.network_address: if not _cidr and current_opt == opts[1]:
if current_opt == opts[1]: msg = _('this is a network with netmask "{0}" ("{1}")')
msg = _('this is a network with netmask "{0}" ("{1}")') else:
else: msg = _('{2} "{0}" ("{1}") is the network')
msg = _('this is a network with {2} "{0}" ("{1}")') elif ip.ip == network.broadcast_address:
elif ip.ip == network.broadcast_address: if not _cidr and current_opt == opts[1]:
if current_opt == opts[1]: msg = _('this is a broadcast with netmask "{0}" ("{1}")')
msg = _('this is a broadcast with netmask "{0}" ("{1}")') else:
else: msg = _('{2} "{0}" ("{1}") is the broadcast')
msg = _('this is a broadcast with {2} "{0}" ("{1}")')
except ValueError: except ValueError:
import traceback
traceback.print_exc()
pass pass
if msg is not None: if msg is not None:
self.raise_err(msg, self.raise_err(msg,
val_netmask, val_netmask,
val_ipnetwork, val_ip,
current_opt, current_opt,
opts, opts,
'IP') 'IP',
_cidr)
def raise_err(self, def raise_err(self,
@ -121,8 +124,9 @@ class NetmaskOption(StrOption):
val_ipnetwork, val_ipnetwork,
current_opt, current_opt,
opts, opts,
typ): typ,
if current_opt == opts[1]: _cidr=False):
if not _cidr and current_opt == opts[1]:
raise ValueError(msg.format(val_netmask, raise ValueError(msg.format(val_netmask,
opts[1].impl_get_display_name())) opts[1].impl_get_display_name()))
else: else:

View file

@ -18,7 +18,7 @@
# the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/ # the rough pypy's guys: http://codespeak.net/svn/pypy/dist/pypy/config/
# the whole pypy projet is under MIT licence # the whole pypy projet is under MIT licence
# ____________________________________________________________ # ____________________________________________________________
from ipaddress import ip_address, IPv4Address from ipaddress import ip_address, ip_network, IPv4Network
from ..setting import undefined from ..setting import undefined
from ..i18n import _ from ..i18n import _
@ -30,6 +30,35 @@ class NetworkOption(Option):
__slots__ = tuple() __slots__ = tuple()
_display_name = _('network address') _display_name = _('network address')
def __init__(self,
name,
doc,
default=None,
default_multi=None,
requires=None,
multi=False,
callback=None,
callback_params=None,
validator=None,
validator_params=None,
properties=None,
warnings_only=False,
cidr=False):
extra = {'_cidr': cidr}
super().__init__(name,
doc,
default=default,
default_multi=default_multi,
callback=callback,
callback_params=callback_params,
requires=requires,
multi=multi,
validator=validator,
validator_params=validator_params,
properties=properties,
warnings_only=warnings_only,
extra=extra)
def _validate(self, def _validate(self,
value, value,
*args, *args,
@ -38,11 +67,18 @@ class NetworkOption(Option):
raise ValueError(_('invalid string')) raise ValueError(_('invalid string'))
if value.count('.') != 3: if value.count('.') != 3:
raise ValueError() raise ValueError()
for val in value.split('.'): cidr = self.impl_get_extra('_cidr')
if cidr:
if '/' not in value:
raise ValueError(_('must use CIDR notation'))
value_ = value.split('/')[0]
else:
value_ = value
for val in value_.split('.'):
if val.startswith("0") and len(val) > 1: if val.startswith("0") and len(val) > 1:
raise ValueError() raise ValueError()
try: try:
if not isinstance(ip_address(value), IPv4Address): if not isinstance(ip_network(value), IPv4Network):
raise ValueError() raise ValueError()
except ValueError: except ValueError:
raise ValueError() raise ValueError()
@ -50,10 +86,9 @@ class NetworkOption(Option):
def _second_level_validation(self, def _second_level_validation(self,
value, value,
warnings_only): warnings_only):
ip = ip_address(value) if ip_network(value).network_address.is_reserved:
if ip.is_reserved:
if warnings_only: if warnings_only:
msg = _("shouldn't be in reserved class") msg = _("shouldn't be reserved network")
else: else:
msg = _("mustn't be in reserved class") msg = _("mustn't be reserved network")
raise ValueError(msg) raise ValueError(msg)

View file

@ -104,7 +104,7 @@ class CacheOptionDescription(BaseOption):
if func not in ALLOWED_CONST_LIST and is_multi: if func not in ALLOWED_CONST_LIST and is_multi:
if __debug__ and not option.impl_get_leadership(): if __debug__ and not option.impl_get_leadership():
raise ConfigError(_('malformed consistency option "{0}" ' raise ConfigError(_('malformed consistency option "{0}" '
'must be a leadership').format( 'must be in same leadership').format(
option.impl_getname())) option.impl_getname()))
leadership = option.impl_get_leadership() leadership = option.impl_get_leadership()
for weak_opt in all_cons_opts: for weak_opt in all_cons_opts: