from yaml import load, SafeLoader from os import environ from os.path import join, isdir from revprox import Authentication from execute import run from re import search from time import sleep from imaplib2 import IMAP4_SSL from smtplib import SMTP from email import message_from_bytes from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText DATA = None DATA_IMAP = None IMAP = None IMAP_FAMILY = None def get_data(): global DATA if not DATA: conf_file = f'{environ["MACHINE_TEST_DIR"]}/mailman.yml' with open(conf_file) as yaml: DATA = load(yaml, Loader=SafeLoader) return DATA def get_imap_data(): global DATA_IMAP if not DATA_IMAP: machine_test_dir = environ['MACHINE_TEST_DIR'] print("FIXME") machine_test_dir = machine_test_dir.replace('mailman', 'dovecot') if not isdir(machine_test_dir): print('!!! No local IMAP server found !!!') return conf_file = f'{machine_test_dir}/imap.yml' with open(conf_file) as yaml: DATA_IMAP = load(yaml, Loader=SafeLoader) return DATA_IMAP def get_authentication(data, family=False): if family: username = data['username_family'] password = data['password_family'] else: username = data['username'] password = data['password'] return Authentication(data['auth_url'], data['auth_server'], data['revprox_ip'], username, password, f"""<title> Listes - {data["domain_name"]} </title>""", ) def check_mail(subject, family=False, reply=False): global IMAP, IMAP_FAMILY data = get_imap_data() if data is None: return if family: if IMAP_FAMILY is None: IMAP_FAMILY = IMAP4_SSL(data['address']) IMAP_FAMILY.LOGIN(data['username_family'], data['password_family'] + '2') imap = IMAP_FAMILY else: if IMAP is None: IMAP = IMAP4_SSL(data['address']) IMAP.LOGIN(data['username'], data['password']) imap = IMAP imap.SELECT(readonly=False) typ, req = imap.SEARCH(None, 'ALL') assert typ == 'OK' if not req[0].decode(): raise Exception('pas de mail') num = req[0].decode().split()[-1] field = imap.FETCH(num, '(RFC822)') assert field[0] == 'OK' msg = message_from_bytes(field[1][-2][-1]) #if msg.is_multipart(): # for part in msg.walk(): # # extract content type of email # try: # print(part.get_payload(decode=True).decode()) # except: # pass #else: # print(msg.get_payload(decode=True).decode()) if subject is not None: assert subject == msg['Subject'] if reply: reply_message(msg) ret = imap.store(num, '+FLAGS', '\\Deleted') assert ret[0] == 'OK', f'error when deleting mail: {ret}' imap.expunge() # imap.CLOSE() # imap.LOGOUT() def reply_message(msg): data = get_imap_data() body = MIMEText('resend') message = MIMEMultipart() message["to"] = msg["From"] message['from'] = msg["To"] message['subject'] = f'Re: {msg["Subject"]}' message.add_header('reply-to', msg["From"]) message.attach(body) # smtp = SMTP(data['address'], '587') smtp.starttls() smtp.login(data['username_family'], data['password_family'] + '2') smtp.sendmail(msg["To"], msg["From"], message.as_string(), ) smtp.quit() def send_mail(subject, data, data_mm, family=False, add_mail=''): list_name = 'test' smtp = SMTP(data['address'], '587') smtp.starttls() if not family: sender = data['username'] smtp.login(sender, data['password']) else: sender = data['username_family'] smtp.login(sender, data['password_family'] + '2') list_addr = f'{list_name}{add_mail}@{data_mm["mailman_domain"]}' msg = f"""From: {sender}\r\n\ To: {list_addr}\r\n\ Subject: {subject}\r\n\ \r\n\ MESSAGE""" smtp.sendmail(sender, list_addr, msg, ) smtp.quit() def test_mailman(): data = get_data() get_authentication(data) def test_mailman_login(): data = get_data() authentication = get_authentication(data) content = authentication.get(data['auth_url']) login = data['username'].split('@')[0] assert f'Successfully signed in as {login}.' in content def test_mailman_list(): data = get_data() authentication = get_authentication(data) list_name = 'test' search_list = f'href="/mailman/postorius/lists/{list_name}.{data["mailman_domain"]}/"' if 'FIRST_RUN' in environ: content = authentication.get(data['auth_url']) assert search_list not in content result = run(data['internal_address'], ['/usr/bin/mailman3', 'create', '--language', 'fr', '-o', data['username'], '-N', '-d', f'{list_name}@{data["mailman_domain"]}'], 'mailman', ) assert list(result) == [f'Liste de diffusion créée : {list_name}@{data["mailman_domain"]}'] content = authentication.get(data['auth_url']) assert search_list in content def test_mailman_inscription(): data = get_data() authentication = get_authentication(data) list_name = 'test' search_inscription = f'<td>Adresse principale ({data["username"]})</td>' url = join(data['base_url'], f'postorius/lists/{list_name}.{data["mailman_domain"]}/') if 'FIRST_RUN' in environ: content = authentication.get(url) assert search_inscription not in content pattern_csrf = r'name="csrfmiddlewaretoken" value="([a-zA-Z0-9\-\_=]+)"' pattern_sub = r'<option value="([a-zA-Z0-9\-\_=]+)">Adresse principale' csrf = search(pattern_csrf, content)[1] subscriber = search(pattern_sub, content)[1] headers = {'Referer': url} authentication.post(url + 'subscribe', {'csrfmiddlewaretoken': csrf, 'subscriber': subscriber, 'delivery_mode': 'regular', 'display_name': ''}, headers=headers) subject = f'=?utf-8?q?Bienvenue_sur_la_liste_de_diffusion_=22{list_name.capitalize()}=22?=' idx = 0 while True: try: check_mail(subject) break except: idx += 1 if idx == 10: raise Exception('mail not arrived') sleep(1) content = authentication.get(url) assert search_inscription in content def test_send_mail(): data = get_imap_data() if data is None: return data_mm = get_data() subject = 'TEST MAILMAN' send_mail(subject, data, data_mm) idx = 0 while True: try: check_mail("=?utf-8?b?W1Rlc3Rd?= TEST MAILMAN") break except: idx += 1 if idx == 10: raise Exception('mail not arrived') sleep(1) def test_send_wrong_mail(): data = get_imap_data() if data is None: return data_mm = get_data() url = f'{data_mm["base_url"]}postorius/lists/test.{data_mm["mailman_domain"]}/held_messages' # no moderated mail authentication = get_authentication(data_mm) content = authentication.get(url) subject = 'TEST MAILMAN' assert subject not in content # send mail smtp = SMTP(data['address'], '587') smtp.starttls() smtp.login(data['username_family'], data['password_family'] + '2') list_name = 'test' list_addr = f'{list_name}@{data_mm["mailman_domain"]}' msg = f"""From: {data["username_family"]}\r\n\ To: {list_addr}\r\n\ Subject: {subject}\r\n\ \r\n\ MESSAGE""" smtp.sendmail(data['username_family'], list_addr, msg) smtp.quit() idx = 0 mailman_domain = data_mm['mailman_domain'].replace('.', '=2E') while True: try: check_mail(f'=?utf-8?q?Votre_message_=C3=A0_test=40{mailman_domain}_attend_la_validation_d=27un_mod=C3=A9rateur?=', family=True) break except: idx += 1 if idx == 10: raise Exception('mail not arrived...') sleep(1) # mail = data['username_family'].replace('@', '=40').replace('.', '=2E').replace('_', '=5F') while True: try: check_mail(f'=?utf-8?q?Le_message_test=40{mailman_domain}_de_{mail}_n=C3=A9cessite_une_validation?=') break except: idx += 1 if idx == 10: raise Exception('mail 2 not arrived...') sleep(1) # reject mail content = authentication.get(url) assert subject in content pattern_csrf = r'name="csrfmiddlewaretoken" value="([a-zA-Z0-9\-\_=]+)"' csrf = search(pattern_csrf, content)[1] pattern_msg_id = r'class="message-checkbox" name="choices" value="([a-zA-Z0-9\-\_=]+)"' msg_id = search(pattern_msg_id, content)[1] # headers = {'Referer': url} authentication.post(url, {'csrfmiddlewaretoken': csrf, 'reject': 'Rejeter', 'choices': msg_id}, headers=headers) content = authentication.get(url) assert subject not in content while True: try: check_mail('=?utf-8?q?Requ=C3=AAte_pour_la_liste_de_diffusion_=22Test=22_rejet=C3=A9e?=', family=True) break except: idx += 1 if idx == 10: raise Exception('mail reject not arrived...') sleep(1) def test_mailman_create_delete(): data = get_data() authentication = get_authentication(data) list_name = 'test_tmp' search_list = f'href="/mailman/postorius/lists/{list_name}.{data["mailman_domain"]}/"' # content = authentication.get(data['auth_url']) assert search_list not in content # result = run(data['internal_address'], ['/usr/bin/mailman3', 'create', '--language', 'fr', '-o', data['username'], '-N', '-d', f'{list_name}@{data["mailman_domain"]}'], 'mailman', ) assert list(result) == [f'Liste de diffusion créée : {list_name}@{data["mailman_domain"]}'] content = authentication.get(data['auth_url']) assert search_list in content # result = run(data['internal_address'], ['/usr/bin/mailman3', 'remove', f'{list_name}@{data["mailman_domain"]}'], 'mailman', ) assert list(result) == [f'Liste supprimée : {list_name}@{data["mailman_domain"]}'] content = authentication.get(data['auth_url']) assert search_list not in content def test_mailman_inscription_desinscription_mail(): list_name = 'test' data = get_imap_data() if data is None: return data_mm = get_data() send_mail('subscribe', data, data_mm, family=True, add_mail='-join') idx = 0 # inscription + validation mailman_domain = data_mm['mailman_domain'].replace('.', '=2E') while True: try: check_mail(f'=?utf-8?q?Votre_validation_est_n=C3=A9cessaire_pour_vous_abonner_de_la_liste_de_diffusion_test=40{mailman_domain}?=', family=True, reply=True) break except: idx += 1 if idx == 10: raise Exception('mail not arrived...') sleep(1) idx = 0 while True: try: check_mail(f'=?utf-8?q?Bienvenue_sur_la_liste_de_diffusion_=22{list_name.capitalize()}=22?=', family=True) break except: idx += 1 if idx == 10: raise Exception('confirmation not arrived...') sleep(1) # send mail and check subject = 'TEST MAILMAN' send_mail(subject, data, data_mm) idx = 0 while True: try: check_mail("=?utf-8?b?W1Rlc3Rd?= TEST MAILMAN") break except: idx += 1 if idx == 10: raise Exception('mail not arrived') sleep(1) while True: try: check_mail("=?utf-8?b?W1Rlc3Rd?= TEST MAILMAN", family=True) break except: idx += 1 if idx == 10: raise Exception('mail not arrived') sleep(1) # unsubscribe send_mail('unsubscribe', data, data_mm, family=True, add_mail="-leave") idx = 0 while True: try: check_mail(f'=?utf-8?q?Votre_validation_est_n=C3=A9cessaire_pour_vous_d=C3=A9sabonner_de_la_liste_de_diffusion_test=40{mailman_domain}?=', family=True, reply=True) break except: idx += 1 if idx == 10: raise Exception('unsubscribe not arrived...') sleep(1) idx = 0 while True: try: check_mail(f'=?utf-8?q?Vous_avez_=C3=A9t=C3=A9_d=C3=A9sinscrits_de_la_liste_de_diffusion_Test?=', family=True) break except: idx += 1 if idx == 10: raise Exception('unsubscribe confirmation not arrived...') sleep(1)