from typing import List as _List from os.path import join as _join, isdir as _isdir, abspath as _abspath from datetime import datetime as _datetime from ipaddress import ip_network, ip_address from subprocess import run as _run from os import makedirs as _makedirs, unlink as _unlink from shutil import rmtree as _rmtree from glob import glob as _glob _PKI_DIR = _abspath('pki/dnssec') _ALGO = 'ECDSAP256SHA256' _ZSK_LEN = 512 _KSK_LEN = _ZSK_LEN def nsd_serial() -> str: return _datetime.now().strftime('%m%d%H%M%S') def value_in(value: str, values: _List[str], ) -> bool: for val in values: if value == val: return True return False def nsd_concat_lists(list1: _List[str], list2: _List[str], str1: str=None, ) -> _List[str]: ret = list1 + list2 if str1: ret.append(str1) return ret def get_reverse_name(network: str) -> str: if not network: return network_obj = ip_network(network) if network_obj.prefixlen != 24: raise ValueError('only netmask "255.255.255.0" is supported for DNS reverse name') o1, o2, o3, o4 = network.split('.') return f'{o3}.{o2}.{o1}.in-addr.arpa.' def _gen_key(cn:str, authority_cn: str, type: str, ) -> str: dir_name = _join(_PKI_DIR, cn, authority_cn, type) filename = None if _isdir(dir_name): filenames = _glob(_join(dir_name, f'K{authority_cn}.+*.key')) if filenames: filename = filenames[0].rsplit('.', 1)[0] if filename is None: if _isdir(dir_name): _rmtree(dir_name) _makedirs(dir_name) if type == 'zsk': cmd = ['ldns-keygen', '-a', _ALGO, '-b', str(_ZSK_LEN), authority_cn] else: cmd = ['ldns-keygen', '-a', _ALGO, '-b', str(_KSK_LEN), '-k', authority_cn] proc = _run(cmd, cwd=dir_name, capture_output=True, ) if proc.returncode != 0: raise Exception(f'cannot generate {type}: {proc.stdout.decode()}, {proc.stderr.decode()}') filename = _join(dir_name, proc.stdout.decode().strip()) return filename def _gen_keys(cn, authority_cn, ) -> str: zsk = _gen_key(cn, authority_cn, 'zsk') ksk = _gen_key(cn, authority_cn, 'ksk') return zsk, ksk def gen_cert(cn: str, authority_cn: str, ) -> str: zsk, ksk = _gen_keys(cn, authority_cn) with open(f'{ksk}.key') as fh: content = fh.read().strip() scontent = content.split() infos = ' '.join(scontent[3:6]) return f'"{authority_cn}." {infos} "{scontent[6]}";' def sign(zone_filename: str, cn: str, ) -> str: authority_cn = zone_filename.rsplit('/', 1)[-1].rsplit('.', 1)[0] zsk, ksk = _gen_keys(cn, authority_cn) cmd = ['ldns-signzone', '-n', zone_filename, zsk, ksk] proc = _run(cmd, capture_output=True) if proc.returncode != 0: raise Exception(f'cannot sign {zone_filename}: {proc.stdout.decode()}, {proc.stderr.decode()}') signed_filename = f'{zone_filename}.signed' with open(signed_filename) as fh: content = fh.read().strip() _unlink(signed_filename) return content