from yaml import load, SafeLoader from os import environ from os.path import join, isdir from revprox_client import Authentication from execute import run from re import search from time import sleep from imaplib2 import IMAP4_SSL from smtplib import SMTP from email import message_from_bytes from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText DATA = None DATA_IMAP = None IMAP = None IMAP_FAMILY = None def get_data(): global DATA if not DATA: conf_file = f'{environ["MACHINE_TEST_DIR"]}/mailman.yml' with open(conf_file) as yaml: DATA = load(yaml, Loader=SafeLoader) return DATA def get_imap_data(): global DATA_IMAP if not DATA_IMAP: machine_test_dir = environ['MACHINE_TEST_DIR'] print("FIXME") machine_test_dir = machine_test_dir.replace('mailman', 'dovecot') if not isdir(machine_test_dir): print('!!! No local IMAP server found !!!') return conf_file = f'{machine_test_dir}/imap.yml' with open(conf_file) as yaml: DATA_IMAP = load(yaml, Loader=SafeLoader) return DATA_IMAP def get_authentication(data, family=False): if family: username = data['username_family'] password = data['password_family'] else: username = data['username'] password = data['password'] return Authentication(data['auth_url'], data['auth_server'], data['revprox_ip'], username, password, f""" Listes - {data["domain_name"]} """, ) def check_mail(subject, family=False, reply=False): global IMAP, IMAP_FAMILY data = get_imap_data() if data is None: return if family: if IMAP_FAMILY is None: IMAP_FAMILY = IMAP4_SSL(data['address']) IMAP_FAMILY.LOGIN(data['username_family'], data['password_family'] + '2') imap = IMAP_FAMILY else: if IMAP is None: IMAP = IMAP4_SSL(data['address']) IMAP.LOGIN(data['username'], data['password']) imap = IMAP imap.SELECT(readonly=False) typ, req = imap.SEARCH(None, 'ALL') assert typ == 'OK' if not req[0].decode(): raise Exception('pas de mail') num = req[0].decode().split()[-1] field = imap.FETCH(num, '(RFC822)') assert field[0] == 'OK' msg = message_from_bytes(field[1][-2][-1]) #if msg.is_multipart(): # for part in msg.walk(): # # extract content type of email # try: # print(part.get_payload(decode=True).decode()) # except: # pass #else: # print(msg.get_payload(decode=True).decode()) if subject is not None: assert subject == msg['Subject'] if reply: reply_message(msg) ret =, '+FLAGS', '\\Deleted') assert ret[0] == 'OK', f'error when deleting mail: {ret}' imap.expunge() # imap.CLOSE() # imap.LOGOUT() def reply_message(msg): data = get_imap_data() body = MIMEText('resend') message = MIMEMultipart() message["to"] = msg["From"] message['from'] = msg["To"] message['subject'] = f'Re: {msg["Subject"]}' message.add_header('reply-to', msg["From"]) message.attach(body) # smtp = SMTP(data['address'], '587') smtp.starttls() smtp.login(data['username_family'], data['password_family'] + '2') smtp.sendmail(msg["To"], msg["From"], message.as_string(), ) smtp.quit() def send_mail(subject, data, data_mm, family=False, add_mail=''): list_name = 'test' smtp = SMTP(data['address'], '587') smtp.starttls() if not family: sender = data['username'] smtp.login(sender, data['password']) else: sender = data['username_family'] smtp.login(sender, data['password_family'] + '2') list_addr = f'{list_name}{add_mail}@{data_mm["mailman_domain"]}' msg = f"""From: {sender}\r\n\ To: {list_addr}\r\n\ Subject: {subject}\r\n\ \r\n\ MESSAGE""" smtp.sendmail(sender, list_addr, msg, ) smtp.quit() def test_mailman(): data = get_data() get_authentication(data) def test_mailman_login(): data = get_data() authentication = get_authentication(data) content = authentication.get(data['auth_url']) login = data['username'].split('@')[0] assert f'Successfully signed in as {login}.' in content def test_mailman_list(): data = get_data() authentication = get_authentication(data) list_name = 'test' search_list = f'href="/mailman/postorius/lists/{list_name}.{data["mailman_domain"]}/"' if 'FIRST_RUN' in environ: content = authentication.get(data['auth_url']) assert search_list not in content result = run(data['internal_address'], ['/usr/bin/mailman3', 'create', '--language', 'fr', '-o', data['username'], '-N', '-d', f'{list_name}@{data["mailman_domain"]}'], 'mailman', ) assert list(result) == [f'Liste de diffusion créée : {list_name}@{data["mailman_domain"]}'] content = authentication.get(data['auth_url']) assert search_list in content def test_mailman_inscription(): data = get_data() authentication = get_authentication(data) list_name = 'test' search_inscription = f'Adresse principale ({data["username"]})' url = join(data['base_url'], f'postorius/lists/{list_name}.{data["mailman_domain"]}/') if 'FIRST_RUN' in environ: content = authentication.get(url) assert search_inscription not in content pattern_csrf = r'name="csrfmiddlewaretoken" value="([a-zA-Z0-9\-\_=]+)"' pattern_sub = r'