add mailman test

This commit is contained in:
Emmanuel Garette 2022-08-21 18:59:02 +02:00
parent 6ce90c4580
commit 29c09a3799
7 changed files with 428 additions and 17 deletions

View file

@ -2,12 +2,14 @@ from os import fdopen
from dbus import SystemBus, Array from dbus import SystemBus, Array
def run(host, cmd): def run(host, cmd, user=None):
bus = SystemBus() bus = SystemBus()
remote_object = bus.get_object('org.freedesktop.machine1', remote_object = bus.get_object('org.freedesktop.machine1',
'/org/freedesktop/machine1', '/org/freedesktop/machine1',
False, False,
) )
if user is not None:
cmd = ['/bin/su', '-', user, '-s', '/bin/bash', '-c', ' '.join(cmd)]
res = remote_object.OpenMachineShell(host, res = remote_object.OpenMachineShell(host,
'', '',
cmd[0], cmd[0],

View file

@ -13,6 +13,7 @@
<file engine="none" source="sysuser-postorius.conf">/sysusers.d/0postorius.conf</file> <file engine="none" source="sysuser-postorius.conf">/sysusers.d/0postorius.conf</file>
<file source="config-nginx.conf">/etc/nginx/default.d/postorius.conf</file> <file source="config-nginx.conf">/etc/nginx/default.d/postorius.conf</file>
<file source="postorius-settings.py">/etc/mailman3.d/postorius.py</file> <file source="postorius-settings.py">/etc/mailman3.d/postorius.py</file>
<file>/tests/mailman.yml</file>
</service> </service>
<service name="postgresqlclient" target="multi-user" engine="creole"> <service name="postgresqlclient" target="multi-user" engine="creole">
<file owner="postorius" mode="400">/etc/pki/tls/private/postgresql_postorius.key</file> <file owner="postorius" mode="400">/etc/pki/tls/private/postgresql_postorius.key</file>

View file

@ -3,7 +3,7 @@
<variables> <variables>
<family name="list_" description="Listes du domaine " dynamic="mailman_domains"> <family name="list_" description="Listes du domaine " dynamic="mailman_domains">
<variable name="name_" description="Nom des listes" type="unix_user" multi="True" mandatory="True"/> <variable name="name_" description="Nom des listes" type="unix_user" multi="True" mandatory="True"/>
<variable name="names_" description="Address names" type="string" multi="True" mandatory="True" hidden="True"/> <variable name="names_" description="Address names" type="string" mandatory="True" hidden="True"/>
</family> </family>
<variable name="names_" description="Address names" type="string" multi="True" mandatory="True" hidden="True" supplier="LMTP:criteria"/> <variable name="names_" description="Address names" type="string" multi="True" mandatory="True" hidden="True" supplier="LMTP:criteria"/>
</variables> </variables>

View file

@ -1,20 +1,18 @@
from risotto.utils import multi_function as _multi_function from risotto.utils import multi_function as _multi_function
from itertools import chain
@_multi_function
def mailman_emails(lists, domain): def mailman_emails(lists, domain):
ret = [] return '.*@' + domain
for lst in lists: # ret = []
for suffix in [None, 'bounces(\+.*)?', 'confirm(\+.*)?', 'join', 'leave', 'owner', 'request', 'subscribe', 'unsubscribe']: # for lst in lists:
if suffix: # for suffix in [None, 'bounces(\+.*)?', 'confirm(\+.*)?', 'join', 'leave', 'owner', 'request', 'subscribe', 'unsubscribe']:
lst_name = lst + '-' + suffix # if suffix:
else: # lst_name = lst + '-' + suffix
lst_name = lst # else:
ret.append(lst_name + '@' + domain) # lst_name = lst
return ret # ret.append(lst_name + '@' + domain)
# return ret
@_multi_function @_multi_function
def mailman_concat(lists): def mailman_concat(lists):
# list of lists to a single list return lists
return list(chain(*lists))

View file

@ -0,0 +1,12 @@
%set %%username="rougail_test@silique.fr"
ip: %%ip_eth0
revprox_ip: %%revprox_client_server_ip
%set %%domain = %%revprox_client_external_domainnames[0]
domain_name: %%domain
base_url: https://%%domain%%domain.revprox_client_location
auth_url: %%oauth2_client_external[0]
auth_server: %%oauth2_server_domainname
username: %%username
password: %%get_password(server_name='test', username=%%username, description='test', type="cleartext", hide=%%hide_secret, temporary=True)
mailman_domain: %%mailman_domains[0]
internal_address: %%domain_name_eth0

View file

@ -0,0 +1,396 @@
from yaml import load, SafeLoader
from os import environ
from os.path import join, isdir
from revprox import Authentication
from execute import run
from re import search
from time import sleep
from imaplib2 import IMAP4_SSL
from smtplib import SMTP
from email import message_from_bytes
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
DATA = None
DATA_IMAP = None
IMAP = None
IMAP_FAMILY = None
def get_data():
global DATA
if not DATA:
conf_file = f'{environ["MACHINE_TEST_DIR"]}/mailman.yml'
with open(conf_file) as yaml:
DATA = load(yaml, Loader=SafeLoader)
return DATA
def get_imap_data():
global DATA_IMAP
if not DATA_IMAP:
machine_test_dir = environ['MACHINE_TEST_DIR']
print("FIXME")
machine_test_dir = machine_test_dir.replace('mailman', 'dovecot')
if not isdir(machine_test_dir):
print('!!! No local IMAP server found !!!')
return
conf_file = f'{machine_test_dir}/imap.yml'
with open(conf_file) as yaml:
DATA_IMAP = load(yaml, Loader=SafeLoader)
return DATA_IMAP
def get_authentication(data, family=False):
if family:
username = data['username_family']
password = data['password_family']
else:
username = data['username']
password = data['password']
return Authentication(data['auth_url'],
data['auth_server'],
data['revprox_ip'],
username,
password,
f"""<title>
Listes - {data["domain_name"]}
</title>""",
)
def check_mail(subject, family=False, reply=False):
global IMAP, IMAP_FAMILY
data = get_imap_data()
if data is None:
return
if family:
if IMAP_FAMILY is None:
IMAP_FAMILY = IMAP4_SSL(data['address'])
IMAP_FAMILY.LOGIN(data['username_family'], data['password_family'] + '2')
imap = IMAP_FAMILY
else:
if IMAP is None:
IMAP = IMAP4_SSL(data['address'])
IMAP.LOGIN(data['username'], data['password'])
imap = IMAP
imap.SELECT(readonly=False)
typ, req = imap.SEARCH(None, 'ALL')
assert typ == 'OK'
if not req[0].decode():
raise Exception('pas de mail')
num = req[0].decode().split()[-1]
field = imap.FETCH(num, '(RFC822)')
assert field[0] == 'OK'
msg = message_from_bytes(field[1][-2][-1])
#if msg.is_multipart():
# for part in msg.walk():
# # extract content type of email
# try:
# print(part.get_payload(decode=True).decode())
# except:
# pass
#else:
# print(msg.get_payload(decode=True).decode())
if subject is not None:
assert subject == msg['Subject']
if reply:
reply_message(msg)
ret = imap.store(num, '+FLAGS', '\\Deleted')
assert ret[0] == 'OK', f'error when deleting mail: {ret}'
imap.expunge()
# imap.CLOSE()
# imap.LOGOUT()
def reply_message(msg):
data = get_imap_data()
body = MIMEText('resend')
message = MIMEMultipart()
message["to"] = msg["From"]
message['from'] = msg["To"]
message['subject'] = f'Re: {msg["Subject"]}'
message.add_header('reply-to', msg["From"])
message.attach(body)
#
smtp = SMTP(data['address'], '587')
smtp.starttls()
smtp.login(data['username_family'], data['password_family'] + '2')
smtp.sendmail(msg["To"],
msg["From"],
message.as_string(),
)
smtp.quit()
def send_mail(subject, data, data_mm, family=False, add_mail=''):
list_name = 'test'
smtp = SMTP(data['address'], '587')
smtp.starttls()
if not family:
sender = data['username']
smtp.login(sender, data['password'])
else:
sender = data['username_family']
smtp.login(sender, data['password_family'] + '2')
list_addr = f'{list_name}{add_mail}@{data_mm["mailman_domain"]}'
msg = f"""From: {sender}\r\n\
To: {list_addr}\r\n\
Subject: {subject}\r\n\
\r\n\
MESSAGE"""
smtp.sendmail(sender,
list_addr,
msg,
)
smtp.quit()
def test_mailman():
data = get_data()
get_authentication(data)
def test_mailman_login():
data = get_data()
authentication = get_authentication(data)
content = authentication.get(data['auth_url'])
login = data['username'].split('@')[0]
assert f'Successfully signed in as {login}.' in content
def test_mailman_list():
data = get_data()
authentication = get_authentication(data)
list_name = 'test'
search_list = f'href="/mailman/postorius/lists/{list_name}.{data["mailman_domain"]}/"'
if 'FIRST_RUN' in environ:
content = authentication.get(data['auth_url'])
assert search_list not in content
result = run(data['internal_address'],
['/usr/bin/mailman3', 'create', '--language', 'fr', '-o', data['username'], '-N', '-d', f'{list_name}@{data["mailman_domain"]}'],
'mailman',
)
assert list(result) == [f'Liste de diffusion créée : {list_name}@{data["mailman_domain"]}']
content = authentication.get(data['auth_url'])
assert search_list in content
def test_mailman_inscription():
data = get_data()
authentication = get_authentication(data)
list_name = 'test'
search_inscription = f'<td>Adresse principale ({data["username"]})</td>'
url = join(data['base_url'], f'postorius/lists/{list_name}.{data["mailman_domain"]}/')
if 'FIRST_RUN' in environ:
content = authentication.get(url)
assert search_inscription not in content
pattern_csrf = r'name="csrfmiddlewaretoken" value="([a-zA-Z0-9\-\_=]+)"'
pattern_sub = r'<option value="([a-zA-Z0-9\-\_=]+)">Adresse principale'
csrf = search(pattern_csrf, content)[1]
subscriber = search(pattern_sub, content)[1]
headers = {'Referer': url}
authentication.post(url + 'subscribe', {'csrfmiddlewaretoken': csrf, 'subscriber': subscriber, 'delivery_mode': 'regular', 'display_name': ''}, headers=headers)
subject = f'=?utf-8?q?Bienvenue_sur_la_liste_de_diffusion_=22{list_name.capitalize()}=22?='
idx = 0
while True:
try:
check_mail(subject)
break
except:
idx += 1
if idx == 10:
raise Exception('mail not arrived')
sleep(1)
content = authentication.get(url)
assert search_inscription in content
def test_send_mail():
data = get_imap_data()
if data is None:
return
data_mm = get_data()
subject = 'TEST MAILMAN'
send_mail(subject, data, data_mm)
idx = 0
while True:
try:
check_mail("=?utf-8?b?W1Rlc3Rd?= TEST MAILMAN")
break
except:
idx += 1
if idx == 10:
raise Exception('mail not arrived')
sleep(1)
def test_send_wrong_mail():
data = get_imap_data()
if data is None:
return
data_mm = get_data()
url = f'{data_mm["base_url"]}postorius/lists/test.{data_mm["mailman_domain"]}/held_messages'
# no moderated mail
authentication = get_authentication(data_mm)
content = authentication.get(url)
subject = 'TEST MAILMAN'
assert subject not in content
# send mail
smtp = SMTP(data['address'], '587')
smtp.starttls()
smtp.login(data['username_family'], data['password_family'] + '2')
list_name = 'test'
list_addr = f'{list_name}@{data_mm["mailman_domain"]}'
msg = f"""From: {data["username_family"]}\r\n\
To: {list_addr}\r\n\
Subject: {subject}\r\n\
\r\n\
MESSAGE"""
smtp.sendmail(data['username_family'], list_addr, msg)
smtp.quit()
idx = 0
mailman_domain = data_mm['mailman_domain'].replace('.', '=2E')
while True:
try:
check_mail(f'=?utf-8?q?Votre_message_=C3=A0_test=40{mailman_domain}_attend_la_validation_d=27un_mod=C3=A9rateur?=', family=True)
break
except:
idx += 1
if idx == 10:
raise Exception('mail not arrived...')
sleep(1)
#
mail = data['username_family'].replace('@', '=40').replace('.', '=2E').replace('_', '=5F')
while True:
try:
check_mail(f'=?utf-8?q?Le_message_test=40{mailman_domain}_de_{mail}_n=C3=A9cessite_une_validation?=')
break
except:
idx += 1
if idx == 10:
raise Exception('mail 2 not arrived...')
sleep(1)
# reject mail
content = authentication.get(url)
assert subject in content
pattern_csrf = r'name="csrfmiddlewaretoken" value="([a-zA-Z0-9\-\_=]+)"'
csrf = search(pattern_csrf, content)[1]
pattern_msg_id = r'class="message-checkbox" name="choices" value="([a-zA-Z0-9\-\_=]+)"'
msg_id = search(pattern_msg_id, content)[1]
#
headers = {'Referer': url}
authentication.post(url, {'csrfmiddlewaretoken': csrf, 'reject': 'Rejeter', 'choices': msg_id}, headers=headers)
content = authentication.get(url)
assert subject not in content
while True:
try:
check_mail('=?utf-8?q?Requ=C3=AAte_pour_la_liste_de_diffusion_=22Test=22_rejet=C3=A9e?=', family=True)
break
except:
idx += 1
if idx == 10:
raise Exception('mail reject not arrived...')
sleep(1)
def test_mailman_create_delete():
data = get_data()
authentication = get_authentication(data)
list_name = 'test_tmp'
search_list = f'href="/mailman/postorius/lists/{list_name}.{data["mailman_domain"]}/"'
#
content = authentication.get(data['auth_url'])
assert search_list not in content
#
result = run(data['internal_address'],
['/usr/bin/mailman3', 'create', '--language', 'fr', '-o', data['username'], '-N', '-d', f'{list_name}@{data["mailman_domain"]}'],
'mailman',
)
assert list(result) == [f'Liste de diffusion créée : {list_name}@{data["mailman_domain"]}']
content = authentication.get(data['auth_url'])
assert search_list in content
#
result = run(data['internal_address'],
['/usr/bin/mailman3', 'remove', f'{list_name}@{data["mailman_domain"]}'],
'mailman',
)
assert list(result) == [f'Liste supprimée : {list_name}@{data["mailman_domain"]}']
content = authentication.get(data['auth_url'])
assert search_list not in content
def test_mailman_inscription_desinscription_mail():
list_name = 'test'
data = get_imap_data()
if data is None:
return
data_mm = get_data()
send_mail('subscribe', data, data_mm, family=True, add_mail='-join')
idx = 0
# inscription + validation
mailman_domain = data_mm['mailman_domain'].replace('.', '=2E')
while True:
try:
check_mail(f'=?utf-8?q?Votre_validation_est_n=C3=A9cessaire_pour_vous_abonner_de_la_liste_de_diffusion_test=40{mailman_domain}?=', family=True, reply=True)
break
except:
idx += 1
if idx == 10:
raise Exception('mail not arrived...')
sleep(1)
idx = 0
while True:
try:
check_mail(f'=?utf-8?q?Bienvenue_sur_la_liste_de_diffusion_=22{list_name.capitalize()}=22?=', family=True)
break
except:
idx += 1
if idx == 10:
raise Exception('confirmation not arrived...')
sleep(1)
# send mail and check
subject = 'TEST MAILMAN'
send_mail(subject, data, data_mm)
idx = 0
while True:
try:
check_mail("=?utf-8?b?W1Rlc3Rd?= TEST MAILMAN")
break
except:
idx += 1
if idx == 10:
raise Exception('mail not arrived')
sleep(1)
while True:
try:
check_mail("=?utf-8?b?W1Rlc3Rd?= TEST MAILMAN", family=True)
break
except:
idx += 1
if idx == 10:
raise Exception('mail not arrived')
sleep(1)
# unsubscribe
send_mail('unsubscribe', data, data_mm, family=True, add_mail="-leave")
idx = 0
while True:
try:
check_mail(f'=?utf-8?q?Votre_validation_est_n=C3=A9cessaire_pour_vous_d=C3=A9sabonner_de_la_liste_de_diffusion_test=40{mailman_domain}?=', family=True, reply=True)
break
except:
idx += 1
if idx == 10:
raise Exception('unsubscribe not arrived...')
sleep(1)
idx = 0
while True:
try:
check_mail(f'=?utf-8?q?Vous_avez_=C3=A9t=C3=A9_d=C3=A9sinscrits_de_la_liste_de_diffusion_Test?=', family=True)
break
except:
idx += 1
if idx == 10:
raise Exception('unsubscribe confirmation not arrived...')
sleep(1)

View file

@ -62,7 +62,8 @@ class Authentication:
authorize_url = f'{portal_url}authorize' authorize_url = f'{portal_url}authorize'
ret = req.get(authorize_url) ret = req.get(authorize_url)
assert ret.status_code == 200 assert ret.status_code == 200
assert title in ret.content.decode() content = ret.content.decode()
assert title in content, f'cannot find {title} in {content}'
def get(self, def get(self,
url, url,
@ -78,7 +79,8 @@ class Authentication:
def post(self, def post(self,
url, url,
data, data,
headers=None,
): ):
with MookDns(self.ip): with MookDns(self.ip):
ret = post(url, cookies=self.cookies, data=data) ret = post(url, cookies=self.cookies, data=data, headers=headers)
assert ret.status_code == 200, f'return code is {ret.status_code}' assert ret.status_code == 200, f'return code is {ret.status_code}'