dataset/seed/tls/manual/image/postinstall/x509.py

71 lines
2.9 KiB
Python
Raw Permalink Normal View History

2023-02-14 14:24:16 +01:00
from yaml import load, SafeLoader
from os.path import join, isdir, splitext, basename, dirname
from os import makedirs
from shutil import rmtree, copyfile
from .autosign import autosign_certif
from .letsencrypt import letsencrypt_certif
MACHINES_DIR = '/srv/tls/machines'
CONFIG_FILE = '/etc/risotto/configuration.yml'
CERTS_FILE = '/etc/risotto/certificates.yml'
def gen_certificates():
with open(CONFIG_FILE) as config_fh:
config = load(config_fh, Loader=SafeLoader)
with open(CERTS_FILE) as config_fh:
certificates_config = load(config_fh, Loader=SafeLoader)
for server_name, certificates in certificates_config.items():
root_dir = join(MACHINES_DIR, server_name)
if isdir(root_dir):
rmtree(root_dir)
for certificate in certificates:
cn = certificate['domain']
authority_name = splitext(basename(certificate['authority']))[0]
if certificate['authority_server']:
authority_server = certificate['authority_server']
else:
authority_server = cn
authority_cn = authority_name + '+' + authority_server
if '.' in authority_name:
raise Exception(f'dot is not allowed in authority_name "{authority_name}"')
2023-08-02 09:26:54 +02:00
if certificate['provider'] == 'self-signed':
2023-02-14 14:24:16 +01:00
func = autosign_certif
elif certificate['provider'] == 'letsencrypt':
func = letsencrypt_certif
else:
raise Exception(f'unknown certificate provider "{certificate["provider"]}"')
print(f'Get {cn} with autority {authority_cn} with type {certificate["type"]}')
ca_name, cert_name, key_name = func(cn,
authority_cn,
authority_name,
certificate['type'],
config,
)
for cert in ca_name, cert_name, key_name:
if cert:
print(f' - {cert}')
if ca_name:
copy_file(ca_name,
join(root_dir, certificate['authority'][1:]),
)
dest_cert_filename = join(root_dir, certificate['certificate'][1:])
if 'private' in certificate:
copy_file(cert_name, dest_cert_filename)
copy_file(key_name,
join(root_dir, certificate['private'][1:]),
)
else:
copy_file(key_name, dest_cert_filename)
with open(dest_cert_filename, 'a') as fh:
with open(cert_name, 'r') as source_fh:
fh.write(source_fh.read())
def copy_file(src, dest):
d_dest = dirname(dest)
if not isdir(d_dest):
makedirs(d_dest)
copyfile(src, dest)