diff --git a/seed/base-machine/tests/execute.py b/seed/base-machine/tests/execute.py index 7b9cb08..76747d7 100644 --- a/seed/base-machine/tests/execute.py +++ b/seed/base-machine/tests/execute.py @@ -2,12 +2,14 @@ from os import fdopen from dbus import SystemBus, Array -def run(host, cmd): +def run(host, cmd, user=None): bus = SystemBus() remote_object = bus.get_object('org.freedesktop.machine1', '/org/freedesktop/machine1', False, ) + if user is not None: + cmd = ['/bin/su', '-', user, '-s', '/bin/bash', '-c', ' '.join(cmd)] res = remote_object.OpenMachineShell(host, '', cmd[0], diff --git a/seed/mailman/dictionaries/31_mailman.xml b/seed/mailman/dictionaries/31_mailman.xml index 3bf09da..575e133 100644 --- a/seed/mailman/dictionaries/31_mailman.xml +++ b/seed/mailman/dictionaries/31_mailman.xml @@ -13,6 +13,7 @@ /sysusers.d/0postorius.conf /etc/nginx/default.d/postorius.conf /etc/mailman3.d/postorius.py + /tests/mailman.yml /etc/pki/tls/private/postgresql_postorius.key diff --git a/seed/mailman/extras/mailman/20_mailman.xml b/seed/mailman/extras/mailman/20_mailman.xml index 97066c7..37e46c9 100644 --- a/seed/mailman/extras/mailman/20_mailman.xml +++ b/seed/mailman/extras/mailman/20_mailman.xml @@ -3,7 +3,7 @@ - diff --git a/seed/mailman/funcs/mailman.py b/seed/mailman/funcs/mailman.py index 07372bc..66b633d 100644 --- a/seed/mailman/funcs/mailman.py +++ b/seed/mailman/funcs/mailman.py @@ -1,20 +1,18 @@ from risotto.utils import multi_function as _multi_function -from itertools import chain -@_multi_function def mailman_emails(lists, domain): - ret = [] - for lst in lists: - for suffix in [None, 'bounces(\+.*)?', 'confirm(\+.*)?', 'join', 'leave', 'owner', 'request', 'subscribe', 'unsubscribe']: - if suffix: - lst_name = lst + '-' + suffix - else: - lst_name = lst - ret.append(lst_name + '@' + domain) - return ret + return '.*@' + domain +# ret = [] +# for lst in lists: +# for suffix in [None, 'bounces(\+.*)?', 'confirm(\+.*)?', 'join', 'leave', 'owner', 'request', 'subscribe', 'unsubscribe']: +# if suffix: +# lst_name = lst + '-' + suffix +# else: +# lst_name = lst +# ret.append(lst_name + '@' + domain) +# return ret @_multi_function def mailman_concat(lists): - # list of lists to a single list - return list(chain(*lists)) + return lists diff --git a/seed/mailman/templates/mailman.yml b/seed/mailman/templates/mailman.yml new file mode 100644 index 0000000..2c96daf --- /dev/null +++ b/seed/mailman/templates/mailman.yml @@ -0,0 +1,12 @@ +%set %%username="rougail_test@silique.fr" +ip: %%ip_eth0 +revprox_ip: %%revprox_client_server_ip +%set %%domain = %%revprox_client_external_domainnames[0] +domain_name: %%domain +base_url: https://%%domain%%domain.revprox_client_location +auth_url: %%oauth2_client_external[0] +auth_server: %%oauth2_server_domainname +username: %%username +password: %%get_password(server_name='test', username=%%username, description='test', type="cleartext", hide=%%hide_secret, temporary=True) +mailman_domain: %%mailman_domains[0] +internal_address: %%domain_name_eth0 diff --git a/seed/mailman/tests/test_mailman.py b/seed/mailman/tests/test_mailman.py new file mode 100644 index 0000000..2b7e899 --- /dev/null +++ b/seed/mailman/tests/test_mailman.py @@ -0,0 +1,396 @@ +from yaml import load, SafeLoader +from os import environ +from os.path import join, isdir +from revprox import Authentication +from execute import run +from re import search +from time import sleep +from imaplib2 import IMAP4_SSL +from smtplib import SMTP +from email import message_from_bytes +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + + +DATA = None +DATA_IMAP = None +IMAP = None +IMAP_FAMILY = None + + +def get_data(): + global DATA + if not DATA: + conf_file = f'{environ["MACHINE_TEST_DIR"]}/mailman.yml' + with open(conf_file) as yaml: + DATA = load(yaml, Loader=SafeLoader) + return DATA + + +def get_imap_data(): + global DATA_IMAP + if not DATA_IMAP: + machine_test_dir = environ['MACHINE_TEST_DIR'] + print("FIXME") + machine_test_dir = machine_test_dir.replace('mailman', 'dovecot') + if not isdir(machine_test_dir): + print('!!! No local IMAP server found !!!') + return + conf_file = f'{machine_test_dir}/imap.yml' + with open(conf_file) as yaml: + DATA_IMAP = load(yaml, Loader=SafeLoader) + return DATA_IMAP + + +def get_authentication(data, family=False): + if family: + username = data['username_family'] + password = data['password_family'] + else: + username = data['username'] + password = data['password'] + return Authentication(data['auth_url'], + data['auth_server'], + data['revprox_ip'], + username, + password, + f""" +Listes - {data["domain_name"]} +""", + ) + + +def check_mail(subject, family=False, reply=False): + global IMAP, IMAP_FAMILY + data = get_imap_data() + if data is None: + return + if family: + if IMAP_FAMILY is None: + IMAP_FAMILY = IMAP4_SSL(data['address']) + IMAP_FAMILY.LOGIN(data['username_family'], data['password_family'] + '2') + imap = IMAP_FAMILY + else: + if IMAP is None: + IMAP = IMAP4_SSL(data['address']) + IMAP.LOGIN(data['username'], data['password']) + imap = IMAP + imap.SELECT(readonly=False) + typ, req = imap.SEARCH(None, 'ALL') + assert typ == 'OK' + if not req[0].decode(): + raise Exception('pas de mail') + num = req[0].decode().split()[-1] + field = imap.FETCH(num, '(RFC822)') + assert field[0] == 'OK' + msg = message_from_bytes(field[1][-2][-1]) + #if msg.is_multipart(): + # for part in msg.walk(): + # # extract content type of email + # try: + # print(part.get_payload(decode=True).decode()) + # except: + # pass + #else: + # print(msg.get_payload(decode=True).decode()) + if subject is not None: + assert subject == msg['Subject'] + if reply: + reply_message(msg) + ret = imap.store(num, '+FLAGS', '\\Deleted') + assert ret[0] == 'OK', f'error when deleting mail: {ret}' + imap.expunge() +# imap.CLOSE() +# imap.LOGOUT() + + +def reply_message(msg): + data = get_imap_data() + body = MIMEText('resend') + message = MIMEMultipart() + message["to"] = msg["From"] + message['from'] = msg["To"] + message['subject'] = f'Re: {msg["Subject"]}' + message.add_header('reply-to', msg["From"]) + message.attach(body) + # + smtp = SMTP(data['address'], '587') + smtp.starttls() + smtp.login(data['username_family'], data['password_family'] + '2') + smtp.sendmail(msg["To"], + msg["From"], + message.as_string(), + ) + smtp.quit() + + +def send_mail(subject, data, data_mm, family=False, add_mail=''): + list_name = 'test' + smtp = SMTP(data['address'], '587') + smtp.starttls() + if not family: + sender = data['username'] + smtp.login(sender, data['password']) + else: + sender = data['username_family'] + smtp.login(sender, data['password_family'] + '2') + list_addr = f'{list_name}{add_mail}@{data_mm["mailman_domain"]}' + msg = f"""From: {sender}\r\n\ +To: {list_addr}\r\n\ +Subject: {subject}\r\n\ +\r\n\ +MESSAGE""" + smtp.sendmail(sender, + list_addr, + msg, + ) + smtp.quit() + + +def test_mailman(): + data = get_data() + get_authentication(data) + + +def test_mailman_login(): + data = get_data() + authentication = get_authentication(data) + content = authentication.get(data['auth_url']) + login = data['username'].split('@')[0] + assert f'Successfully signed in as {login}.' in content + + +def test_mailman_list(): + data = get_data() + authentication = get_authentication(data) + list_name = 'test' + search_list = f'href="/mailman/postorius/lists/{list_name}.{data["mailman_domain"]}/"' + if 'FIRST_RUN' in environ: + content = authentication.get(data['auth_url']) + assert search_list not in content + result = run(data['internal_address'], + ['/usr/bin/mailman3', 'create', '--language', 'fr', '-o', data['username'], '-N', '-d', f'{list_name}@{data["mailman_domain"]}'], + 'mailman', + ) + assert list(result) == [f'Liste de diffusion créée : {list_name}@{data["mailman_domain"]}'] + content = authentication.get(data['auth_url']) + assert search_list in content + + +def test_mailman_inscription(): + data = get_data() + authentication = get_authentication(data) + list_name = 'test' + search_inscription = f'Adresse principale ({data["username"]})' + url = join(data['base_url'], f'postorius/lists/{list_name}.{data["mailman_domain"]}/') + if 'FIRST_RUN' in environ: + content = authentication.get(url) + assert search_inscription not in content + pattern_csrf = r'name="csrfmiddlewaretoken" value="([a-zA-Z0-9\-\_=]+)"' + pattern_sub = r'