dataset/seed/nsd/funcs/funcs.py

211 lines
6.3 KiB
Python
Raw Normal View History

2022-03-08 19:42:28 +01:00
from typing import List as _List
2022-05-04 10:29:03 +02:00
from os.path import join as _join, isfile as _isfile, isdir as _isdir, abspath as _abspath, basename as _basename
2022-03-08 19:42:28 +01:00
from datetime import datetime as _datetime
from ipaddress import ip_network, ip_address
from subprocess import run as _run
2022-05-04 10:29:03 +02:00
from os import makedirs as _makedirs
from shutil import rmtree as _rmtree, copy2 as _copy2
2022-03-08 19:42:28 +01:00
from glob import glob as _glob
2022-05-04 10:29:03 +02:00
from filecmp import cmp as _cmp
2022-03-08 19:42:28 +01:00
2023-06-23 08:12:05 +02:00
from tiramisu import DomainnameOption
from risotto.utils import multi_function as _multi_function
2022-03-08 19:42:28 +01:00
_PKI_DIR = _abspath('pki/dnssec')
_ALGO = 'ECDSAP256SHA256'
_ZSK_LEN = 512
_KSK_LEN = _ZSK_LEN
def nsd_serial() -> str:
return _datetime.now().strftime('%m%d%H%M%S')
def value_in(value: str,
values: _List[str],
) -> bool:
for val in values:
if value == val:
return True
return False
2023-06-23 08:12:05 +02:00
def nsd_concat_lists(*args,
ip: str=None,
cidr: bool=False
2022-03-08 19:42:28 +01:00
) -> _List[str]:
2023-06-23 08:12:05 +02:00
ret = set()
for lst in args:
if cidr:
for l in lst:
if '/' not in l:
l = l + '/32'
ret.add(l)
else:
ret.update(lst)
if ip:
if cidr:
ip = f'{ip}/32'
ret.add(ip)
2022-08-18 10:19:43 +02:00
ret = list(ret)
ret.sort()
2022-03-08 19:42:28 +01:00
return ret
def get_reverse_name(network: str) -> str:
if not network:
return
network_obj = ip_network(network)
if network_obj.prefixlen < 24:
raise ValueError('only netmask greater than 24 is supported for DNS reverse name')
2022-03-08 19:42:28 +01:00
o1, o2, o3, o4 = network.split('.')
return f'{o3}.{o2}.{o1}.in-addr.arpa.'
def _gen_key(cn:str,
authority_cn: str,
type: str,
) -> str:
dir_name = _join(_PKI_DIR, cn, authority_cn, type)
filename = None
if _isdir(dir_name):
filenames = _glob(_join(dir_name, f'K{authority_cn}.+*.key'))
if filenames:
filename = filenames[0].rsplit('.', 1)[0]
if filename is None:
if _isdir(dir_name):
_rmtree(dir_name)
_makedirs(dir_name)
if type == 'zsk':
cmd = ['ldns-keygen', '-a', _ALGO, '-b', str(_ZSK_LEN), authority_cn]
else:
cmd = ['ldns-keygen', '-a', _ALGO, '-b', str(_KSK_LEN), '-k', authority_cn]
proc = _run(cmd,
cwd=dir_name,
capture_output=True,
)
if proc.returncode != 0:
raise Exception(f'cannot generate {type}: {proc.stdout.decode()}, {proc.stderr.decode()}')
filename = _join(dir_name, proc.stdout.decode().strip())
return filename
def _gen_keys(cn,
authority_cn,
) -> str:
zsk = _gen_key(cn, authority_cn, 'zsk')
ksk = _gen_key(cn, authority_cn, 'ksk')
return zsk, ksk
def gen_cert(cn: str,
authority_cn: str,
) -> str:
zsk, ksk = _gen_keys(cn, authority_cn)
with open(f'{ksk}.key') as fh:
content = fh.read().strip()
scontent = content.split()
infos = ' '.join(scontent[3:6])
return f'"{authority_cn}." {infos} "{scontent[6]}";'
def sign(zone_filename: str,
cn: str,
) -> str:
authority_cn = zone_filename.rsplit('/', 1)[-1].rsplit('.', 1)[0]
2022-05-04 10:29:03 +02:00
copy_file = _join(_PKI_DIR, cn, authority_cn, _basename(zone_filename))
signed_filename = f'{copy_file}.signed'
2023-06-23 08:12:05 +02:00
if not _isfile(copy_file) or not _isfile(signed_filename) or not _cmp(zone_filename, copy_file):
2022-05-04 10:29:03 +02:00
zsk, ksk = _gen_keys(cn, authority_cn)
2022-12-25 17:08:52 +01:00
_copy2(zone_filename, copy_file)
2022-05-04 10:29:03 +02:00
cmd = ['ldns-signzone', '-n', zone_filename, zsk, ksk]
proc = _run(cmd, capture_output=True)
if proc.returncode != 0:
raise Exception(f'cannot sign {zone_filename}: {proc.stdout.decode()}, {proc.stderr.decode()}')
new_signed_filename = f'{zone_filename}.signed'
with open(new_signed_filename) as fh:
content = fh.read().strip()
content.replace('0000000000', nsd_serial())
with open(signed_filename, 'w') as fh:
fh.write(content)
2022-03-08 19:42:28 +01:00
with open(signed_filename) as fh:
content = fh.read().strip()
return content
2022-08-18 10:19:43 +02:00
2022-12-25 17:08:52 +01:00
def get_internal_info_in_zone(zones: list,
domain_name: str,
2022-08-18 10:19:43 +02:00
type: str,
index: int=None,
) -> _List[str]:
2022-12-25 17:08:52 +01:00
for zone in zones.values():
if domain_name == zone['domain_name']:
break
else:
2022-08-18 10:19:43 +02:00
return []
if type == 'host':
2023-07-31 15:30:32 +02:00
return list(['host'] + list(zone['hosts']))
if not index:
return zone['host_ip']
return list(zone['hosts'].values())[index - 1]
2022-12-25 17:08:52 +01:00
2023-06-23 08:12:05 +02:00
def get_internal_zones(zones) -> _List[str]:
return [zone['domain_name'] for zone_name, zone in zones.items()]
@_multi_function
def calc_reverse_names(names):
ret = []
for name in names:
if name in ret:
continue
ret.append(name)
return ret
@_multi_function
def calc_reverse_networks(names):
ret = []
for name in names:
name = name.rsplit('.', 1)[0]
if name in ret:
continue
ret.append(name)
return ret
2023-07-31 15:30:32 +02:00
def valid_dns_hostname(hostname,
domainname,
zone_names,
):
2023-06-23 08:12:05 +02:00
if hostname != '*':
try:
DomainnameOption('a', '', hostname, type='hostname', allow_ip=False)
except ValueError as err:
err.prefix = ''
raise err from err
2023-07-31 15:30:32 +02:00
if hostname + '.' + domainname in zone_names:
raise ValueError(f'"{hostname}.{domainname}" is also a zone name')
@_multi_function
def get_dnssec_ds(cn: str,
names: str,
) -> str:
dnssec = []
for name in names:
if name.endswith('arpa.'):
name = name[:-1]
dir_name = _join(_PKI_DIR, cn, name, 'ksk')
filename = None
if _isdir(dir_name):
filenames = _glob(_join(dir_name, f'K{name}.+*.ds'))
if filenames:
filename = filenames[0]
if filename:
with open(filename) as fh:
dnssec.append(fh.read().strip())
return dnssec