from OpenSSL.crypto import load_certificate, load_privatekey, \ dump_certificate, dump_privatekey, \ PKey, X509, X509Extension, \ TYPE_RSA, FILETYPE_PEM from os import makedirs, unlink, listdir from os.path import join, isdir, isfile from datetime import datetime PKI_DIR = '/srv/tls/x509/autosign' def autosign_certif(cn, authority_cn, authority_name, type_, config, ): ca_name = gen_chain(cn, authority_cn, authority_name, config, ) cert_name, key_name = gen_certificate(cn, authority_cn, authority_name, type_, config, ) return ca_name, cert_name, key_name def gen_chain(cn: str, authority_cn: str, authority_name: str, config, ): week_number = datetime.now().isocalendar().week root_dir_name = join(PKI_DIR, authority_cn) ca_dir_name = join(root_dir_name, 'ca') key_ca_name = join(ca_dir_name, 'private.key') cert_ca_name = f'certificate_{week_number}.crt' cert_ca_filename = join(ca_dir_name, cert_ca_name) local_ca_dir_name = join(root_dir_name, 'certificats', cn, 'ca') if not isfile(cert_ca_filename): if not isdir(ca_dir_name): makedirs(ca_dir_name) _gen_cert(True, [authority_cn], root_dir_name, key_ca_name, cert_ca_filename, config, ) for filename in listdir(ca_dir_name): if not filename.endswith('.crt') or filename == cert_ca_name: continue unlink(join(ca_dir_name, filename)) return cert_ca_filename def _gen_cert(is_ca, common_names, root_dir_name, key_file, cert_file, config, type=None, ca_cert=None, ca_key=None, ): validity_end_in_seconds = 10*24*60*60 email_address = config['email'] country_name = config['country'] locality_name = config['locality'] state_or_province_name = config['state'] organization_name = config['org_name'] organization_unit_name = config['org_unit_name'] #can look at generated file using openssl: #openssl x509 -inform pem -in selfsigned.crt -noout -text # create a key pair if isfile(key_file): with open(key_file) as fh: filecontent = bytes(fh.read(), 'utf-8') key = load_privatekey(FILETYPE_PEM, filecontent) else: key = _gen_key_pair() cert = X509() cert.set_version(2) cert.get_subject().C = country_name cert.get_subject().ST = state_or_province_name cert.get_subject().L = locality_name cert.get_subject().O = organization_name cert.get_subject().OU = organization_unit_name cert.get_subject().CN = common_names[0] cert.get_subject().emailAddress = email_address cert_ext = [] if not is_ca: cert_ext.append(X509Extension(b'basicConstraints', False, b'CA:FALSE')) cert_ext.append(X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment')) cert_ext.append(X509Extension(b'subjectAltName', False, ", ".join([f'DNS:{common_name}' for common_name in common_names]).encode('ascii'))) if type == 'server': cert_ext.append(X509Extension(b'extendedKeyUsage', True, b'serverAuth')) else: cert_ext.append(X509Extension(b'extendedKeyUsage', True, b'clientAuth')) else: cert_ext.append(X509Extension(b'basicConstraints', False, b'CA:TRUE')) cert_ext.append(X509Extension(b"keyUsage", True, b'keyCertSign, cRLSign')) cert_ext.append(X509Extension(b'subjectAltName', False, f'email:{email_address}'.encode())) cert_ext.append(X509Extension(b'subjectKeyIdentifier', False, b"hash", subject=cert)) cert.add_extensions(cert_ext) sn_filename = join(root_dir_name, 'serial_number') if isfile(sn_filename): with open(sn_filename, 'r') as fh: serial_number = int(fh.read().strip()) + 1 else: serial_number = 0 cert.set_serial_number(serial_number) with open(sn_filename, 'w') as fh: fh.write(str(serial_number)) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(validity_end_in_seconds) if is_ca: ca_cert = cert ca_key = key else: with open(ca_cert) as fh: filecontent = bytes(fh.read(), 'utf-8') ca_cert = load_certificate(FILETYPE_PEM, filecontent) with open(ca_key) as fh: filecontent = bytes(fh.read(), 'utf-8') ca_key = load_privatekey(FILETYPE_PEM, filecontent) cert.set_issuer(ca_cert.get_subject()) cert.add_extensions([X509Extension(b"authorityKeyIdentifier", False, b'keyid:always', issuer=ca_cert)]) cert.set_pubkey(key) cert.sign(ca_key, "sha512") with open(cert_file, "wt") as f: f.write(dump_certificate(FILETYPE_PEM, cert).decode("utf-8")) if not is_ca: f.write(dump_certificate(FILETYPE_PEM, ca_cert).decode("utf-8")) with open(key_file, "wt") as f: f.write(dump_privatekey(FILETYPE_PEM, key).decode("utf-8")) def _gen_key_pair(): key = PKey() key.generate_key(TYPE_RSA, 4096) return key def gen_certificate(cn, authority_cn, authority_name, type, config, ): root_cert_dir_name = join(PKI_DIR, authority_cn, 'certificats', cn) week_number = datetime.now().isocalendar().week root_dir_name = join(PKI_DIR, authority_cn) ca_dir_name = join(root_dir_name, 'ca') key_ca_name = join(ca_dir_name, 'private.key') certificate_name = f'certificate_{week_number}.crt' cert_ca_name = join(ca_dir_name, certificate_name) cert_ca_external_name = join(root_cert_dir_name, 'ca', certificate_name) dir_name = join(root_cert_dir_name, type) key_name = join(dir_name, f'private.key') cert_name = join(dir_name, certificate_name) external = False if isfile(cert_ca_external_name): external = True elif not isfile(cert_ca_name): raise Exception(f'cannot find CA file "{cert_ca_name}" for "{cn}"') if not isfile(cert_name): if external: raise Exception(f"cannot find CA private key (\"{authority_cn}\") to sign certificat for \"{cn}\" ({key_ca_name}), is it sign with external authority (like Let's Encrypt certification)?") if not isdir(dir_name): makedirs(dir_name) common_names = [cn] _gen_cert(False, common_names, root_dir_name, key_name, cert_name, config, ca_cert=cert_ca_name, ca_key=key_ca_name, type=type, ) for filename in listdir(dir_name): if not filename.endswith('.crt') or filename == certificate_name: continue unlink(join(dir_name, filename)) return cert_name, key_name