dataset/seed/mailman/tests/test_mailman.py

397 lines
13 KiB
Python
Raw Normal View History

2022-08-21 18:59:02 +02:00
from yaml import load, SafeLoader
from os import environ
from os.path import join, isdir
2023-07-31 15:30:32 +02:00
from revprox_client import Authentication
2022-08-21 18:59:02 +02:00
from execute import run
from re import search
from time import sleep
from imaplib2 import IMAP4_SSL
from smtplib import SMTP
from email import message_from_bytes
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
DATA = None
DATA_IMAP = None
IMAP = None
IMAP_FAMILY = None
def get_data():
global DATA
if not DATA:
conf_file = f'{environ["MACHINE_TEST_DIR"]}/mailman.yml'
with open(conf_file) as yaml:
DATA = load(yaml, Loader=SafeLoader)
return DATA
def get_imap_data():
global DATA_IMAP
if not DATA_IMAP:
machine_test_dir = environ['MACHINE_TEST_DIR']
print("FIXME")
machine_test_dir = machine_test_dir.replace('mailman', 'dovecot')
if not isdir(machine_test_dir):
print('!!! No local IMAP server found !!!')
return
conf_file = f'{machine_test_dir}/imap.yml'
with open(conf_file) as yaml:
DATA_IMAP = load(yaml, Loader=SafeLoader)
return DATA_IMAP
def get_authentication(data, family=False):
if family:
username = data['username_family']
password = data['password_family']
else:
username = data['username']
password = data['password']
return Authentication(data['auth_url'],
data['auth_server'],
data['revprox_ip'],
username,
password,
f"""<title>
Listes - {data["domain_name"]}
</title>""",
)
def check_mail(subject, family=False, reply=False):
global IMAP, IMAP_FAMILY
data = get_imap_data()
if data is None:
return
if family:
if IMAP_FAMILY is None:
IMAP_FAMILY = IMAP4_SSL(data['address'])
IMAP_FAMILY.LOGIN(data['username_family'], data['password_family'] + '2')
imap = IMAP_FAMILY
else:
if IMAP is None:
IMAP = IMAP4_SSL(data['address'])
IMAP.LOGIN(data['username'], data['password'])
imap = IMAP
imap.SELECT(readonly=False)
typ, req = imap.SEARCH(None, 'ALL')
assert typ == 'OK'
if not req[0].decode():
raise Exception('pas de mail')
num = req[0].decode().split()[-1]
field = imap.FETCH(num, '(RFC822)')
assert field[0] == 'OK'
msg = message_from_bytes(field[1][-2][-1])
#if msg.is_multipart():
# for part in msg.walk():
# # extract content type of email
# try:
# print(part.get_payload(decode=True).decode())
# except:
# pass
#else:
# print(msg.get_payload(decode=True).decode())
if subject is not None:
assert subject == msg['Subject']
if reply:
reply_message(msg)
ret = imap.store(num, '+FLAGS', '\\Deleted')
assert ret[0] == 'OK', f'error when deleting mail: {ret}'
imap.expunge()
# imap.CLOSE()
# imap.LOGOUT()
def reply_message(msg):
data = get_imap_data()
body = MIMEText('resend')
message = MIMEMultipart()
message["to"] = msg["From"]
message['from'] = msg["To"]
message['subject'] = f'Re: {msg["Subject"]}'
message.add_header('reply-to', msg["From"])
message.attach(body)
#
smtp = SMTP(data['address'], '587')
smtp.starttls()
smtp.login(data['username_family'], data['password_family'] + '2')
smtp.sendmail(msg["To"],
msg["From"],
message.as_string(),
)
smtp.quit()
def send_mail(subject, data, data_mm, family=False, add_mail=''):
list_name = 'test'
smtp = SMTP(data['address'], '587')
smtp.starttls()
if not family:
sender = data['username']
smtp.login(sender, data['password'])
else:
sender = data['username_family']
smtp.login(sender, data['password_family'] + '2')
list_addr = f'{list_name}{add_mail}@{data_mm["mailman_domain"]}'
msg = f"""From: {sender}\r\n\
To: {list_addr}\r\n\
Subject: {subject}\r\n\
\r\n\
MESSAGE"""
smtp.sendmail(sender,
list_addr,
msg,
)
smtp.quit()
def test_mailman():
data = get_data()
get_authentication(data)
def test_mailman_login():
data = get_data()
authentication = get_authentication(data)
content = authentication.get(data['auth_url'])
login = data['username'].split('@')[0]
assert f'Successfully signed in as {login}.' in content
def test_mailman_list():
data = get_data()
authentication = get_authentication(data)
list_name = 'test'
search_list = f'href="/mailman/postorius/lists/{list_name}.{data["mailman_domain"]}/"'
if 'FIRST_RUN' in environ:
content = authentication.get(data['auth_url'])
assert search_list not in content
result = run(data['internal_address'],
['/usr/bin/mailman3', 'create', '--language', 'fr', '-o', data['username'], '-N', '-d', f'{list_name}@{data["mailman_domain"]}'],
'mailman',
)
assert list(result) == [f'Liste de diffusion créée : {list_name}@{data["mailman_domain"]}']
content = authentication.get(data['auth_url'])
assert search_list in content
def test_mailman_inscription():
data = get_data()
authentication = get_authentication(data)
list_name = 'test'
search_inscription = f'<td>Adresse principale ({data["username"]})</td>'
url = join(data['base_url'], f'postorius/lists/{list_name}.{data["mailman_domain"]}/')
if 'FIRST_RUN' in environ:
content = authentication.get(url)
assert search_inscription not in content
pattern_csrf = r'name="csrfmiddlewaretoken" value="([a-zA-Z0-9\-\_=]+)"'
pattern_sub = r'<option value="([a-zA-Z0-9\-\_=]+)">Adresse principale'
csrf = search(pattern_csrf, content)[1]
subscriber = search(pattern_sub, content)[1]
headers = {'Referer': url}
authentication.post(url + 'subscribe', {'csrfmiddlewaretoken': csrf, 'subscriber': subscriber, 'delivery_mode': 'regular', 'display_name': ''}, headers=headers)
subject = f'=?utf-8?q?Bienvenue_sur_la_liste_de_diffusion_=22{list_name.capitalize()}=22?='
idx = 0
while True:
try:
check_mail(subject)
break
except:
idx += 1
if idx == 10:
raise Exception('mail not arrived')
sleep(1)
content = authentication.get(url)
assert search_inscription in content
def test_send_mail():
data = get_imap_data()
if data is None:
return
data_mm = get_data()
subject = 'TEST MAILMAN'
send_mail(subject, data, data_mm)
idx = 0
while True:
try:
check_mail("=?utf-8?b?W1Rlc3Rd?= TEST MAILMAN")
break
except:
idx += 1
if idx == 10:
raise Exception('mail not arrived')
sleep(1)
def test_send_wrong_mail():
data = get_imap_data()
if data is None:
return
data_mm = get_data()
url = f'{data_mm["base_url"]}postorius/lists/test.{data_mm["mailman_domain"]}/held_messages'
# no moderated mail
authentication = get_authentication(data_mm)
content = authentication.get(url)
subject = 'TEST MAILMAN'
assert subject not in content
# send mail
smtp = SMTP(data['address'], '587')
smtp.starttls()
smtp.login(data['username_family'], data['password_family'] + '2')
list_name = 'test'
list_addr = f'{list_name}@{data_mm["mailman_domain"]}'
msg = f"""From: {data["username_family"]}\r\n\
To: {list_addr}\r\n\
Subject: {subject}\r\n\
\r\n\
MESSAGE"""
smtp.sendmail(data['username_family'], list_addr, msg)
smtp.quit()
idx = 0
mailman_domain = data_mm['mailman_domain'].replace('.', '=2E')
while True:
try:
check_mail(f'=?utf-8?q?Votre_message_=C3=A0_test=40{mailman_domain}_attend_la_validation_d=27un_mod=C3=A9rateur?=', family=True)
break
except:
idx += 1
if idx == 10:
raise Exception('mail not arrived...')
sleep(1)
#
mail = data['username_family'].replace('@', '=40').replace('.', '=2E').replace('_', '=5F')
while True:
try:
check_mail(f'=?utf-8?q?Le_message_test=40{mailman_domain}_de_{mail}_n=C3=A9cessite_une_validation?=')
break
except:
idx += 1
if idx == 10:
raise Exception('mail 2 not arrived...')
sleep(1)
# reject mail
content = authentication.get(url)
assert subject in content
pattern_csrf = r'name="csrfmiddlewaretoken" value="([a-zA-Z0-9\-\_=]+)"'
csrf = search(pattern_csrf, content)[1]
pattern_msg_id = r'class="message-checkbox" name="choices" value="([a-zA-Z0-9\-\_=]+)"'
msg_id = search(pattern_msg_id, content)[1]
#
headers = {'Referer': url}
authentication.post(url, {'csrfmiddlewaretoken': csrf, 'reject': 'Rejeter', 'choices': msg_id}, headers=headers)
content = authentication.get(url)
assert subject not in content
while True:
try:
check_mail('=?utf-8?q?Requ=C3=AAte_pour_la_liste_de_diffusion_=22Test=22_rejet=C3=A9e?=', family=True)
break
except:
idx += 1
if idx == 10:
raise Exception('mail reject not arrived...')
sleep(1)
def test_mailman_create_delete():
data = get_data()
authentication = get_authentication(data)
list_name = 'test_tmp'
search_list = f'href="/mailman/postorius/lists/{list_name}.{data["mailman_domain"]}/"'
#
content = authentication.get(data['auth_url'])
assert search_list not in content
#
result = run(data['internal_address'],
['/usr/bin/mailman3', 'create', '--language', 'fr', '-o', data['username'], '-N', '-d', f'{list_name}@{data["mailman_domain"]}'],
'mailman',
)
assert list(result) == [f'Liste de diffusion créée : {list_name}@{data["mailman_domain"]}']
content = authentication.get(data['auth_url'])
assert search_list in content
#
result = run(data['internal_address'],
['/usr/bin/mailman3', 'remove', f'{list_name}@{data["mailman_domain"]}'],
'mailman',
)
assert list(result) == [f'Liste supprimée : {list_name}@{data["mailman_domain"]}']
content = authentication.get(data['auth_url'])
assert search_list not in content
def test_mailman_inscription_desinscription_mail():
list_name = 'test'
data = get_imap_data()
if data is None:
return
data_mm = get_data()
send_mail('subscribe', data, data_mm, family=True, add_mail='-join')
idx = 0
# inscription + validation
mailman_domain = data_mm['mailman_domain'].replace('.', '=2E')
while True:
try:
check_mail(f'=?utf-8?q?Votre_validation_est_n=C3=A9cessaire_pour_vous_abonner_de_la_liste_de_diffusion_test=40{mailman_domain}?=', family=True, reply=True)
break
except:
idx += 1
if idx == 10:
raise Exception('mail not arrived...')
sleep(1)
idx = 0
while True:
try:
check_mail(f'=?utf-8?q?Bienvenue_sur_la_liste_de_diffusion_=22{list_name.capitalize()}=22?=', family=True)
break
except:
idx += 1
if idx == 10:
raise Exception('confirmation not arrived...')
sleep(1)
# send mail and check
subject = 'TEST MAILMAN'
send_mail(subject, data, data_mm)
idx = 0
while True:
try:
check_mail("=?utf-8?b?W1Rlc3Rd?= TEST MAILMAN")
break
except:
idx += 1
if idx == 10:
raise Exception('mail not arrived')
sleep(1)
while True:
try:
check_mail("=?utf-8?b?W1Rlc3Rd?= TEST MAILMAN", family=True)
break
except:
idx += 1
if idx == 10:
raise Exception('mail not arrived')
sleep(1)
# unsubscribe
send_mail('unsubscribe', data, data_mm, family=True, add_mail="-leave")
idx = 0
while True:
try:
check_mail(f'=?utf-8?q?Votre_validation_est_n=C3=A9cessaire_pour_vous_d=C3=A9sabonner_de_la_liste_de_diffusion_test=40{mailman_domain}?=', family=True, reply=True)
break
except:
idx += 1
if idx == 10:
raise Exception('unsubscribe not arrived...')
sleep(1)
idx = 0
while True:
try:
check_mail(f'=?utf-8?q?Vous_avez_=C3=A9t=C3=A9_d=C3=A9sinscrits_de_la_liste_de_diffusion_Test?=', family=True)
break
except:
idx += 1
if idx == 10:
raise Exception('unsubscribe confirmation not arrived...')
sleep(1)