from OpenSSL.crypto import load_certificate, load_privatekey, dump_certificate, dump_privatekey, dump_publickey, PKey, X509, X509Extension, TYPE_RSA, FILETYPE_PEM from os import makedirs, symlink from os.path import join, isdir, isfile, exists #from shutil import rmtree from datetime import datetime PKI_DIR = 'pki/x509' #FIXME EMAIL = 'gnunux@gnunux.info' COUNTRY = 'FR' LOCALITY = 'Dijon' STATE = 'France' ORG_NAME = 'Cadoles' ORG_UNIT_NAME = 'CSS' def _gen_key_pair(): key = PKey() key.generate_key(TYPE_RSA, 4096) return key def _gen_cert(is_ca, common_names, serial_number, validity_end_in_seconds, key_file, cert_file, type=None, ca_cert=None, ca_key=None, email_address=None, country_name=None, locality_name=None, state_or_province_name=None, organization_name=None, organization_unit_name=None, ): #can look at generated file using openssl: #openssl x509 -inform pem -in selfsigned.crt -noout -text # create a key pair if isfile(key_file): with open(key_file) as fh: filecontent = bytes(fh.read(), 'utf-8') key = load_privatekey(FILETYPE_PEM, filecontent) else: key = _gen_key_pair() cert = X509() cert.set_version(2) cert.get_subject().C = country_name cert.get_subject().ST = state_or_province_name cert.get_subject().L = locality_name cert.get_subject().O = organization_name cert.get_subject().OU = organization_unit_name cert.get_subject().CN = common_names[0] cert.get_subject().emailAddress = email_address cert_ext = [] if not is_ca: cert_ext.append(X509Extension(b'basicConstraints', False, b'CA:FALSE')) cert_ext.append(X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment')) cert_ext.append(X509Extension(b'subjectAltName', False, ", ".join([f'DNS:{common_name}' for common_name in common_names]).encode('ascii'))) if type == 'server': cert_ext.append(X509Extension(b'extendedKeyUsage', True, b'serverAuth')) else: cert_ext.append(X509Extension(b'extendedKeyUsage', True, b'clientAuth')) else: cert_ext.append(X509Extension(b'basicConstraints', False, b'CA:TRUE')) cert_ext.append(X509Extension(b"keyUsage", True, b'keyCertSign, cRLSign')) cert_ext.append(X509Extension(b'subjectAltName', False, f'email:{email_address}'.encode())) cert_ext.append(X509Extension(b'subjectKeyIdentifier', False, b"hash", subject=cert)) cert.add_extensions(cert_ext) cert.set_serial_number(serial_number) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(validity_end_in_seconds) if is_ca: ca_cert = cert ca_key = key else: with open(ca_cert) as fh: filecontent = bytes(fh.read(), 'utf-8') ca_cert = load_certificate(FILETYPE_PEM, filecontent) with open(ca_key) as fh: filecontent = bytes(fh.read(), 'utf-8') ca_key = load_privatekey(FILETYPE_PEM, filecontent) cert.set_issuer(ca_cert.get_subject()) cert.add_extensions([X509Extension(b"authorityKeyIdentifier", False, b'keyid:always', issuer=ca_cert)]) cert.set_pubkey(key) cert.sign(ca_key, "sha512") with open(cert_file, "wt") as f: f.write(dump_certificate(FILETYPE_PEM, cert).decode("utf-8")) with open(key_file, "wt") as f: f.write(dump_privatekey(FILETYPE_PEM, key).decode("utf-8")) def gen_ca(authority_dns, authority_name, base_dir, ): authority_cn = authority_name + '+' + authority_dns week_number = datetime.now().isocalendar().week root_dir_name = join(base_dir, PKI_DIR, authority_cn) ca_dir_name = join(root_dir_name, 'ca') sn_ca_name = join(ca_dir_name, 'serial_number') key_ca_name = join(ca_dir_name, 'private.key') cert_ca_name = join(ca_dir_name, f'certificate_{week_number}.crt') if not isfile(cert_ca_name): if not isdir(ca_dir_name): # rmtree(ca_dir_name) makedirs(ca_dir_name) if isfile(sn_ca_name): with open(sn_ca_name, 'r') as fh: serial_number = int(fh.read().strip()) + 1 else: serial_number = 0 _gen_cert(True, [authority_cn], serial_number, 10*24*60*60, key_ca_name, cert_ca_name, email_address=EMAIL, country_name=COUNTRY, locality_name=LOCALITY, state_or_province_name=STATE, organization_name=ORG_NAME, organization_unit_name=ORG_UNIT_NAME, ) with open(sn_ca_name, 'w') as fh: fh.write(str(serial_number)) with open(cert_ca_name, 'r') as fh: return fh.read().strip() def gen_cert_iter(cn, extra_domainnames, authority_cn, authority_name, type, base_dir, dir_name, ): week_number = datetime.now().isocalendar().week root_dir_name = join(base_dir, PKI_DIR, authority_cn) ca_dir_name = join(root_dir_name, 'ca') key_ca_name = join(ca_dir_name, 'private.key') cert_ca_name = join(ca_dir_name, f'certificate_{week_number}.crt') sn_name = join(dir_name, f'serial_number') key_name = join(dir_name, f'private.key') cert_name = join(dir_name, f'certificate_{week_number}.crt') if not isfile(cert_ca_name): raise Exception(f'cannot find CA file "{cert_ca_name}"') if not isfile(cert_name): if not isdir(dir_name): makedirs(dir_name) if isfile(sn_name): with open(sn_name, 'r') as fh: serial_number = int(fh.read().strip()) + 1 else: serial_number = 0 common_names = [cn] common_names.extend(extra_domainnames) _gen_cert(False, common_names, serial_number, 10*24*60*60, key_name, cert_name, ca_cert=cert_ca_name, ca_key=key_ca_name, type=type, email_address=EMAIL, country_name=COUNTRY, locality_name=LOCALITY, state_or_province_name=STATE, organization_name=ORG_NAME, organization_unit_name=ORG_UNIT_NAME, ) with open(sn_name, 'w') as fh: fh.write(str(serial_number)) for extra in extra_domainnames: extra_dir_name = join(base_dir, PKI_DIR, authority_name + '+' + extra) if not exists(extra_dir_name): symlink(root_dir_name, extra_dir_name) for extra in extra_domainnames: extra_dir_name = join(base_dir, PKI_DIR, authority_name + '+' + extra) if not exists(extra_dir_name): raise Exception(f'file {extra_dir_name} not already exists that means subjectAltName is not set in certificat, please remove {cert_name}') return cert_name def gen_cert(cn, extra_domainnames, authority_cn, authority_name, type, file_type, base_dir, ): if '.' in authority_name: raise Exception(f'dot is not allowed in authority_name "{authority_name}"') if type == 'server' and authority_cn is None: authority_cn = cn if authority_cn is None: raise Exception(f'authority_cn is mandatory when authority type is client') if extra_domainnames is None: extra_domainnames = [] auth_cn = authority_name + '+' + authority_cn dir_name = join(base_dir, PKI_DIR, auth_cn, 'certificats', cn, type) if file_type == 'crt': filename = gen_cert_iter(cn, extra_domainnames, auth_cn, authority_name, type, base_dir, dir_name, ) else: filename = join(dir_name, f'private.key') with open(filename, 'r') as fh: return fh.read().strip() def has_pub(cn, base_dir, ): dir_name = join(base_dir, PKI_DIR, 'public', cn) cert_name = join(dir_name, f'public.pub') return isfile(cert_name) def gen_pub(cn, file_type, base_dir, ): dir_name = join(base_dir, PKI_DIR, 'public', cn) key_name = join(dir_name, f'private.key') if file_type == 'pub': pub_name = join(dir_name, f'public.pub') if not isfile(pub_name): if not isdir(dir_name): makedirs(dir_name) key = _gen_key_pair() with open(pub_name, "wt") as f: f.write(dump_publickey(FILETYPE_PEM, key).decode("utf-8")) with open(key_name, "wt") as f: f.write(dump_privatekey(FILETYPE_PEM, key).decode("utf-8")) filename = pub_name else: filename = key_name with open(filename, 'r') as fh: return fh.read().strip()