forked from stove/risotto
remove old scripts
This commit is contained in:
parent
5bec5dffed
commit
f79e486371
3 changed files with 0 additions and 528 deletions
107
bootstrap.py
107
bootstrap.py
|
@ -1,107 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
from asyncio import run
|
|
||||||
from os import listdir, link, makedirs
|
|
||||||
from os.path import isdir, join
|
|
||||||
from shutil import rmtree
|
|
||||||
from copy import copy
|
|
||||||
|
|
||||||
from risotto.utils import RISOTTO_CONFIG, SERVERS
|
|
||||||
#from risotto.image import load
|
|
||||||
from risotto.machine import templates, load, ROUGAIL_NAMESPACE
|
|
||||||
from rougail.utils import normalize_family
|
|
||||||
|
|
||||||
|
|
||||||
INSTALL_DIR = RISOTTO_CONFIG['directories']['dest']
|
|
||||||
CONFIG_DEST_DIR = 'configurations'
|
|
||||||
CONFIG_DIFF_DIR = 'diff'
|
|
||||||
CONFIG_ORI_DIR = 'ori'
|
|
||||||
SRV_DEST_DIR = 'srv'
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
if isdir(INSTALL_DIR):
|
|
||||||
rmtree(INSTALL_DIR)
|
|
||||||
makedirs(INSTALL_DIR)
|
|
||||||
try:
|
|
||||||
module_infos, rougailconfig, config = await load('a.py',
|
|
||||||
'n.py',
|
|
||||||
clean_directories=True,
|
|
||||||
copy_manual_dir=True,
|
|
||||||
copy_tests=True,
|
|
||||||
)
|
|
||||||
except Exception as err:
|
|
||||||
# import traceback
|
|
||||||
# traceback.print_exc()
|
|
||||||
print(err)
|
|
||||||
exit(1)
|
|
||||||
modules_done = []
|
|
||||||
for server_name in SERVERS:
|
|
||||||
module_name = SERVERS[server_name]['module']
|
|
||||||
module_info = module_infos[module_name]
|
|
||||||
subconfig = config.option(normalize_family(server_name))
|
|
||||||
try:
|
|
||||||
add_srv = await subconfig.option('machine.add_srv').value.get()
|
|
||||||
except AttributeError:
|
|
||||||
add_srv = False
|
|
||||||
rougailconfig['tmp_dir'] = 'tmp'
|
|
||||||
rougailconfig['destinations_dir'] = join(INSTALL_DIR, module_name, CONFIG_DEST_DIR, server_name)
|
|
||||||
rougailconfig['templates_dir'] = module_info['infos'].templates_dir
|
|
||||||
if module_name == 'host':
|
|
||||||
tmpfile = await subconfig.option(f'{ROUGAIL_NAMESPACE}.host_install_dir').value.get()
|
|
||||||
rougailconfig['tmpfile_dest_dir'] = f'{tmpfile}/host/configurations/{server_name}'
|
|
||||||
rougailconfig['default_systemd_directory'] = '/usr/local/lib/systemd'
|
|
||||||
else:
|
|
||||||
rougailconfig['tmpfile_dest_dir'] = '/usr/local/lib'
|
|
||||||
rougailconfig['default_systemd_directory'] = '/systemd'
|
|
||||||
# cfg['templates_dir'] = module_info['infos'].templates_dir
|
|
||||||
if isdir('tmp'):
|
|
||||||
rmtree('tmp')
|
|
||||||
makedirs(rougailconfig['tmp_dir'])
|
|
||||||
makedirs(rougailconfig['destinations_dir'])
|
|
||||||
if add_srv:
|
|
||||||
srv = join(INSTALL_DIR, SRV_DEST_DIR, server_name)
|
|
||||||
else:
|
|
||||||
srv = None
|
|
||||||
await templates(server_name,
|
|
||||||
subconfig,
|
|
||||||
rougailconfig,
|
|
||||||
srv=srv,
|
|
||||||
)
|
|
||||||
#
|
|
||||||
await config.property.read_write()
|
|
||||||
try:
|
|
||||||
await subconfig.option('general.hide_secret').value.set(True)
|
|
||||||
except AttributeError as err:
|
|
||||||
# print(err)
|
|
||||||
pass
|
|
||||||
await config.property.read_only()
|
|
||||||
rougailconfig['destinations_dir'] = join(INSTALL_DIR, module_name, CONFIG_DIFF_DIR, server_name)
|
|
||||||
rmtree('tmp')
|
|
||||||
makedirs(rougailconfig['tmp_dir'])
|
|
||||||
makedirs(rougailconfig['destinations_dir'])
|
|
||||||
await templates(server_name,
|
|
||||||
subconfig,
|
|
||||||
rougailconfig,
|
|
||||||
)
|
|
||||||
await config.property.read_write()
|
|
||||||
try:
|
|
||||||
await subconfig.option('general.hide_secret').value.set(False)
|
|
||||||
except AttributeError as err:
|
|
||||||
pass
|
|
||||||
await config.property.read_only()
|
|
||||||
#
|
|
||||||
if module_name not in modules_done:
|
|
||||||
rougailconfig['destinations_dir'] = join(INSTALL_DIR, module_name, CONFIG_ORI_DIR)
|
|
||||||
rmtree('tmp')
|
|
||||||
makedirs(rougailconfig['tmp_dir'])
|
|
||||||
makedirs(rougailconfig['destinations_dir'])
|
|
||||||
await templates(server_name,
|
|
||||||
subconfig,
|
|
||||||
rougailconfig,
|
|
||||||
just_copy=True,
|
|
||||||
)
|
|
||||||
modules_done.append(module_name)
|
|
||||||
with open(join(INSTALL_DIR, module_name, 'install_machines'), 'w') as fh:
|
|
||||||
fh.write(f'./install_machine {module_name} {server_name}\n')
|
|
||||||
run(main())
|
|
163
funcs.py
163
funcs.py
|
@ -1,163 +0,0 @@
|
||||||
from tiramisu import valid_network_netmask, valid_ip_netmask, valid_broadcast, valid_in_network, valid_not_equal as valid_differ, valid_not_equal, calc_value
|
|
||||||
from os.path import dirname, abspath, join as _join, isdir as _isdir, isfile as _isfile
|
|
||||||
from typing import List
|
|
||||||
from secrets import token_urlsafe as _token_urlsafe
|
|
||||||
|
|
||||||
from rougail.utils import normalize_family
|
|
||||||
|
|
||||||
from risotto.utils import multi_function, DOMAINS, ZONES, load_zones, load_zones_server, load_domains, SERVERS_JSON
|
|
||||||
from risotto.x509 import gen_cert as _x509_gen_cert, gen_ca as _x509_gen_ca, gen_pub as _x509_gen_pub, has_pub as _x509_has_pub
|
|
||||||
|
|
||||||
|
|
||||||
HERE = dirname(abspath(__file__))
|
|
||||||
|
|
||||||
|
|
||||||
@multi_function
|
|
||||||
def get_chain(authority_cn: str,
|
|
||||||
authority_name: str,
|
|
||||||
hide: bool,
|
|
||||||
):
|
|
||||||
if hide:
|
|
||||||
return "XXXXX"
|
|
||||||
if not authority_cn or not authority_name or authority_name is None:
|
|
||||||
if isinstance(authority_name, list):
|
|
||||||
return []
|
|
||||||
return
|
|
||||||
if not isinstance(authority_cn, list):
|
|
||||||
is_list = False
|
|
||||||
authority_cn = [authority_cn]
|
|
||||||
else:
|
|
||||||
is_list = True
|
|
||||||
authorities = []
|
|
||||||
|
|
||||||
for auth_cn in authority_cn:
|
|
||||||
ret = _x509_gen_ca(auth_cn,
|
|
||||||
authority_name,
|
|
||||||
HERE,
|
|
||||||
)
|
|
||||||
if not is_list:
|
|
||||||
return ret
|
|
||||||
authorities.append(ret)
|
|
||||||
return authorities
|
|
||||||
|
|
||||||
|
|
||||||
@multi_function
|
|
||||||
def get_certificate(cn,
|
|
||||||
authority_name: str,
|
|
||||||
hide: bool,
|
|
||||||
authority_cn: str=None,
|
|
||||||
extra_domainnames: list=[],
|
|
||||||
type: str='server',
|
|
||||||
):
|
|
||||||
if hide:
|
|
||||||
return "XXXXX"
|
|
||||||
if isinstance(cn, list) and extra_domainnames:
|
|
||||||
raise Exception('cn cannot be a list with extra_domainnames set')
|
|
||||||
if not cn or authority_name is None:
|
|
||||||
if isinstance(cn, list):
|
|
||||||
return []
|
|
||||||
return
|
|
||||||
return _x509_gen_cert(cn,
|
|
||||||
extra_domainnames,
|
|
||||||
authority_cn,
|
|
||||||
authority_name,
|
|
||||||
type,
|
|
||||||
'crt',
|
|
||||||
HERE,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@multi_function
|
|
||||||
def get_private_key(cn: str,
|
|
||||||
hide: bool,
|
|
||||||
authority_name: str=None,
|
|
||||||
authority_cn: str=None,
|
|
||||||
type: str='server',
|
|
||||||
):
|
|
||||||
if hide:
|
|
||||||
return "XXXXX"
|
|
||||||
if not cn:
|
|
||||||
if isinstance(cn, list):
|
|
||||||
return []
|
|
||||||
return
|
|
||||||
if authority_name is None:
|
|
||||||
if _x509_has_pub(cn, HERE):
|
|
||||||
return _x509_gen_pub(cn,
|
|
||||||
'key',
|
|
||||||
HERE,
|
|
||||||
)
|
|
||||||
if isinstance(cn, list):
|
|
||||||
return []
|
|
||||||
return
|
|
||||||
return _x509_gen_cert(cn,
|
|
||||||
[],
|
|
||||||
authority_cn,
|
|
||||||
authority_name,
|
|
||||||
type,
|
|
||||||
'key',
|
|
||||||
HERE,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def get_public_key(cn: str,
|
|
||||||
hide: bool,
|
|
||||||
):
|
|
||||||
if hide:
|
|
||||||
return "XXXXX"
|
|
||||||
if not cn:
|
|
||||||
return
|
|
||||||
return _x509_gen_pub(cn,
|
|
||||||
'pub',
|
|
||||||
HERE,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def zone_information(zone_name: str,
|
|
||||||
type: str,
|
|
||||||
multi: bool=False,
|
|
||||||
index: int=None,
|
|
||||||
) -> str:
|
|
||||||
if not zone_name:
|
|
||||||
return
|
|
||||||
if type == 'gateway' and index != 0:
|
|
||||||
return
|
|
||||||
load_zones()
|
|
||||||
if zone_name not in ZONES:
|
|
||||||
raise ValueError(f"cannot get zone informations in unknown zone '{zone_name}'")
|
|
||||||
zone = ZONES[zone_name]
|
|
||||||
if type not in zone:
|
|
||||||
raise ValueError(f"unknown type '{type}' in zone '{zone_name}'")
|
|
||||||
value = zone[type]
|
|
||||||
if multi:
|
|
||||||
value = [value]
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
def get_internal_zones() -> List[str]:
|
|
||||||
load_domains()
|
|
||||||
return list(DOMAINS.keys())
|
|
||||||
|
|
||||||
|
|
||||||
@multi_function
|
|
||||||
def get_zones_info(type: str) -> str:
|
|
||||||
load_zones_server()
|
|
||||||
ret = []
|
|
||||||
for data in SERVERS_JSON['zones'].values():
|
|
||||||
ret.append(data[type])
|
|
||||||
return ret
|
|
||||||
|
|
||||||
@multi_function
|
|
||||||
def get_internal_zone_names() -> List[str]:
|
|
||||||
load_zones()
|
|
||||||
return list(ZONES.keys())
|
|
||||||
|
|
||||||
|
|
||||||
def get_internal_zone_information(zone: str,
|
|
||||||
info: str,
|
|
||||||
) -> str:
|
|
||||||
load_domains()
|
|
||||||
if info == 'cidr':
|
|
||||||
return ZONES[zone]['gateway'] + '/' + ZONES[zone]['network'].split('/')[-1]
|
|
||||||
return ZONES[zone][info]
|
|
||||||
|
|
||||||
# =============================================================
|
|
|
@ -1,258 +0,0 @@
|
||||||
from OpenSSL.crypto import load_certificate, load_privatekey, dump_certificate, dump_privatekey, dump_publickey, PKey, X509, X509Extension, TYPE_RSA, FILETYPE_PEM
|
|
||||||
from os import makedirs, symlink
|
|
||||||
from os.path import join, isdir, isfile, exists
|
|
||||||
#from shutil import rmtree
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
|
|
||||||
PKI_DIR = 'pki/x509'
|
|
||||||
#FIXME
|
|
||||||
EMAIL = 'gnunux@gnunux.info'
|
|
||||||
COUNTRY = 'FR'
|
|
||||||
LOCALITY = 'Dijon'
|
|
||||||
STATE = 'France'
|
|
||||||
ORG_NAME = 'Cadoles'
|
|
||||||
ORG_UNIT_NAME = 'CSS'
|
|
||||||
|
|
||||||
|
|
||||||
def _gen_key_pair():
|
|
||||||
key = PKey()
|
|
||||||
key.generate_key(TYPE_RSA, 4096)
|
|
||||||
return key
|
|
||||||
|
|
||||||
|
|
||||||
def _gen_cert(is_ca,
|
|
||||||
common_names,
|
|
||||||
serial_number,
|
|
||||||
validity_end_in_seconds,
|
|
||||||
key_file,
|
|
||||||
cert_file,
|
|
||||||
type=None,
|
|
||||||
ca_cert=None,
|
|
||||||
ca_key=None,
|
|
||||||
email_address=None,
|
|
||||||
country_name=None,
|
|
||||||
locality_name=None,
|
|
||||||
state_or_province_name=None,
|
|
||||||
organization_name=None,
|
|
||||||
organization_unit_name=None,
|
|
||||||
):
|
|
||||||
#can look at generated file using openssl:
|
|
||||||
#openssl x509 -inform pem -in selfsigned.crt -noout -text
|
|
||||||
# create a key pair
|
|
||||||
if isfile(key_file):
|
|
||||||
with open(key_file) as fh:
|
|
||||||
filecontent = bytes(fh.read(), 'utf-8')
|
|
||||||
key = load_privatekey(FILETYPE_PEM, filecontent)
|
|
||||||
else:
|
|
||||||
key = _gen_key_pair()
|
|
||||||
cert = X509()
|
|
||||||
cert.set_version(2)
|
|
||||||
cert.get_subject().C = country_name
|
|
||||||
cert.get_subject().ST = state_or_province_name
|
|
||||||
cert.get_subject().L = locality_name
|
|
||||||
cert.get_subject().O = organization_name
|
|
||||||
cert.get_subject().OU = organization_unit_name
|
|
||||||
cert.get_subject().CN = common_names[0]
|
|
||||||
cert.get_subject().emailAddress = email_address
|
|
||||||
cert_ext = []
|
|
||||||
if not is_ca:
|
|
||||||
cert_ext.append(X509Extension(b'basicConstraints', False, b'CA:FALSE'))
|
|
||||||
cert_ext.append(X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment'))
|
|
||||||
cert_ext.append(X509Extension(b'subjectAltName', False, ", ".join([f'DNS:{common_name}' for common_name in common_names]).encode('ascii')))
|
|
||||||
if type == 'server':
|
|
||||||
cert_ext.append(X509Extension(b'extendedKeyUsage', True, b'serverAuth'))
|
|
||||||
else:
|
|
||||||
cert_ext.append(X509Extension(b'extendedKeyUsage', True, b'clientAuth'))
|
|
||||||
else:
|
|
||||||
cert_ext.append(X509Extension(b'basicConstraints', False, b'CA:TRUE'))
|
|
||||||
cert_ext.append(X509Extension(b"keyUsage", True, b'keyCertSign, cRLSign'))
|
|
||||||
cert_ext.append(X509Extension(b'subjectAltName', False, f'email:{email_address}'.encode()))
|
|
||||||
cert_ext.append(X509Extension(b'subjectKeyIdentifier', False, b"hash", subject=cert))
|
|
||||||
cert.add_extensions(cert_ext)
|
|
||||||
cert.set_serial_number(serial_number)
|
|
||||||
cert.gmtime_adj_notBefore(0)
|
|
||||||
cert.gmtime_adj_notAfter(validity_end_in_seconds)
|
|
||||||
if is_ca:
|
|
||||||
ca_cert = cert
|
|
||||||
ca_key = key
|
|
||||||
else:
|
|
||||||
with open(ca_cert) as fh:
|
|
||||||
filecontent = bytes(fh.read(), 'utf-8')
|
|
||||||
ca_cert = load_certificate(FILETYPE_PEM, filecontent)
|
|
||||||
with open(ca_key) as fh:
|
|
||||||
filecontent = bytes(fh.read(), 'utf-8')
|
|
||||||
ca_key = load_privatekey(FILETYPE_PEM, filecontent)
|
|
||||||
cert.set_issuer(ca_cert.get_subject())
|
|
||||||
cert.add_extensions([X509Extension(b"authorityKeyIdentifier", False, b'keyid:always', issuer=ca_cert)])
|
|
||||||
cert.set_pubkey(key)
|
|
||||||
cert.sign(ca_key, "sha512")
|
|
||||||
|
|
||||||
with open(cert_file, "wt") as f:
|
|
||||||
f.write(dump_certificate(FILETYPE_PEM, cert).decode("utf-8"))
|
|
||||||
if not is_ca:
|
|
||||||
f.write(dump_certificate(FILETYPE_PEM, ca_cert).decode("utf-8"))
|
|
||||||
with open(key_file, "wt") as f:
|
|
||||||
f.write(dump_privatekey(FILETYPE_PEM, key).decode("utf-8"))
|
|
||||||
|
|
||||||
|
|
||||||
def gen_ca(authority_dns,
|
|
||||||
authority_name,
|
|
||||||
base_dir,
|
|
||||||
):
|
|
||||||
authority_cn = authority_name + '+' + authority_dns
|
|
||||||
week_number = datetime.now().isocalendar().week
|
|
||||||
root_dir_name = join(base_dir, PKI_DIR, authority_cn)
|
|
||||||
ca_dir_name = join(root_dir_name, 'ca')
|
|
||||||
sn_ca_name = join(ca_dir_name, 'serial_number')
|
|
||||||
key_ca_name = join(ca_dir_name, 'private.key')
|
|
||||||
cert_ca_name = join(ca_dir_name, f'certificate_{week_number}.crt')
|
|
||||||
if not isfile(cert_ca_name):
|
|
||||||
if not isdir(ca_dir_name):
|
|
||||||
# rmtree(ca_dir_name)
|
|
||||||
makedirs(ca_dir_name)
|
|
||||||
if isfile(sn_ca_name):
|
|
||||||
with open(sn_ca_name, 'r') as fh:
|
|
||||||
serial_number = int(fh.read().strip()) + 1
|
|
||||||
else:
|
|
||||||
serial_number = 0
|
|
||||||
_gen_cert(True,
|
|
||||||
[authority_cn],
|
|
||||||
serial_number,
|
|
||||||
10*24*60*60,
|
|
||||||
key_ca_name,
|
|
||||||
cert_ca_name,
|
|
||||||
email_address=EMAIL,
|
|
||||||
country_name=COUNTRY,
|
|
||||||
locality_name=LOCALITY,
|
|
||||||
state_or_province_name=STATE,
|
|
||||||
organization_name=ORG_NAME,
|
|
||||||
organization_unit_name=ORG_UNIT_NAME,
|
|
||||||
)
|
|
||||||
with open(sn_ca_name, 'w') as fh:
|
|
||||||
fh.write(str(serial_number))
|
|
||||||
with open(cert_ca_name, 'r') as fh:
|
|
||||||
return fh.read().strip()
|
|
||||||
|
|
||||||
|
|
||||||
def gen_cert_iter(cn,
|
|
||||||
extra_domainnames,
|
|
||||||
authority_cn,
|
|
||||||
authority_name,
|
|
||||||
type,
|
|
||||||
base_dir,
|
|
||||||
dir_name,
|
|
||||||
):
|
|
||||||
week_number = datetime.now().isocalendar().week
|
|
||||||
root_dir_name = join(base_dir, PKI_DIR, authority_cn)
|
|
||||||
ca_dir_name = join(root_dir_name, 'ca')
|
|
||||||
key_ca_name = join(ca_dir_name, 'private.key')
|
|
||||||
cert_ca_name = join(ca_dir_name, f'certificate_{week_number}.crt')
|
|
||||||
sn_name = join(dir_name, f'serial_number')
|
|
||||||
key_name = join(dir_name, f'private.key')
|
|
||||||
cert_name = join(dir_name, f'certificate_{week_number}.crt')
|
|
||||||
if not isfile(cert_ca_name):
|
|
||||||
raise Exception(f'cannot find CA file "{cert_ca_name}"')
|
|
||||||
if not isfile(cert_name):
|
|
||||||
if not isfile(key_ca_name):
|
|
||||||
raise Exception(f"cannot find CA private key (\"{authority_cn}\") to sign certificat for \"{cn}\" ({key_ca_name}), is it a Let's Encrypt certification?")
|
|
||||||
if not isdir(dir_name):
|
|
||||||
makedirs(dir_name)
|
|
||||||
if isfile(sn_name):
|
|
||||||
with open(sn_name, 'r') as fh:
|
|
||||||
serial_number = int(fh.read().strip()) + 1
|
|
||||||
else:
|
|
||||||
serial_number = 0
|
|
||||||
common_names = [cn]
|
|
||||||
common_names.extend(extra_domainnames)
|
|
||||||
_gen_cert(False,
|
|
||||||
common_names,
|
|
||||||
serial_number,
|
|
||||||
10*24*60*60,
|
|
||||||
key_name,
|
|
||||||
cert_name,
|
|
||||||
ca_cert=cert_ca_name,
|
|
||||||
ca_key=key_ca_name,
|
|
||||||
type=type,
|
|
||||||
email_address=EMAIL,
|
|
||||||
country_name=COUNTRY,
|
|
||||||
locality_name=LOCALITY,
|
|
||||||
state_or_province_name=STATE,
|
|
||||||
organization_name=ORG_NAME,
|
|
||||||
organization_unit_name=ORG_UNIT_NAME,
|
|
||||||
)
|
|
||||||
with open(sn_name, 'w') as fh:
|
|
||||||
fh.write(str(serial_number))
|
|
||||||
for extra in extra_domainnames:
|
|
||||||
extra_dir_name = join(base_dir, PKI_DIR, authority_name + '+' + extra)
|
|
||||||
if not exists(extra_dir_name):
|
|
||||||
symlink(root_dir_name, extra_dir_name)
|
|
||||||
for extra in extra_domainnames:
|
|
||||||
extra_dir_name = join(base_dir, PKI_DIR, authority_name + '+' + extra)
|
|
||||||
if not exists(extra_dir_name):
|
|
||||||
raise Exception(f'file {extra_dir_name} not already exists that means subjectAltName is not set in certificat, please remove {cert_name}')
|
|
||||||
return cert_name
|
|
||||||
|
|
||||||
|
|
||||||
def gen_cert(cn,
|
|
||||||
extra_domainnames,
|
|
||||||
authority_cn,
|
|
||||||
authority_name,
|
|
||||||
type,
|
|
||||||
file_type,
|
|
||||||
base_dir,
|
|
||||||
):
|
|
||||||
if '.' in authority_name:
|
|
||||||
raise Exception(f'dot is not allowed in authority_name "{authority_name}"')
|
|
||||||
if type == 'server' and authority_cn is None:
|
|
||||||
authority_cn = cn
|
|
||||||
if authority_cn is None:
|
|
||||||
raise Exception(f'authority_cn is mandatory when authority type is client')
|
|
||||||
if extra_domainnames is None:
|
|
||||||
extra_domainnames = []
|
|
||||||
auth_cn = authority_name + '+' + authority_cn
|
|
||||||
dir_name = join(base_dir, PKI_DIR, auth_cn, 'certificats', cn, type)
|
|
||||||
if file_type == 'crt':
|
|
||||||
filename = gen_cert_iter(cn,
|
|
||||||
extra_domainnames,
|
|
||||||
auth_cn,
|
|
||||||
authority_name,
|
|
||||||
type,
|
|
||||||
base_dir,
|
|
||||||
dir_name,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
filename = join(dir_name, f'private.key')
|
|
||||||
with open(filename, 'r') as fh:
|
|
||||||
return fh.read().strip()
|
|
||||||
|
|
||||||
|
|
||||||
def has_pub(cn,
|
|
||||||
base_dir,
|
|
||||||
):
|
|
||||||
dir_name = join(base_dir, PKI_DIR, 'public', cn)
|
|
||||||
cert_name = join(dir_name, f'public.pub')
|
|
||||||
return isfile(cert_name)
|
|
||||||
|
|
||||||
|
|
||||||
def gen_pub(cn,
|
|
||||||
file_type,
|
|
||||||
base_dir,
|
|
||||||
):
|
|
||||||
dir_name = join(base_dir, PKI_DIR, 'public', cn)
|
|
||||||
key_name = join(dir_name, f'private.key')
|
|
||||||
if file_type == 'pub':
|
|
||||||
pub_name = join(dir_name, f'public.pub')
|
|
||||||
if not isfile(pub_name):
|
|
||||||
if not isdir(dir_name):
|
|
||||||
makedirs(dir_name)
|
|
||||||
key = _gen_key_pair()
|
|
||||||
with open(pub_name, "wt") as f:
|
|
||||||
f.write(dump_publickey(FILETYPE_PEM, key).decode("utf-8"))
|
|
||||||
with open(key_name, "wt") as f:
|
|
||||||
f.write(dump_privatekey(FILETYPE_PEM, key).decode("utf-8"))
|
|
||||||
filename = pub_name
|
|
||||||
else:
|
|
||||||
filename = key_name
|
|
||||||
with open(filename, 'r') as fh:
|
|
||||||
return fh.read().strip()
|
|
Loading…
Reference in a new issue