2022-07-04 22:28:44 +02:00
|
|
|
from yaml import load, SafeLoader
|
|
|
|
from os import environ
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
from imaplib2 import IMAP4_SSL
|
|
|
|
from smtplib import SMTP, SMTPNotSupportedError, SMTPAuthenticationError
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
conf_file = f'{environ["MACHINE_TEST_DIR"]}/imap.yml'
|
|
|
|
with open(conf_file) as yaml:
|
|
|
|
data = load(yaml, Loader=SafeLoader)
|
2022-07-16 22:16:24 +02:00
|
|
|
parameters = (('user', data['username'], [data['password']]),
|
|
|
|
('family', data['username_family'], [data['password_family'], data['password_family'] + "2"]),
|
2022-07-04 22:28:44 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def get_msg(username, msg='MESSAGE'):
|
|
|
|
return f'From: {username}\r\nTo: {username}\r\n\r\nSubject: TEST\r\n{msg}\r\n'
|
|
|
|
|
|
|
|
|
2022-07-16 22:16:24 +02:00
|
|
|
@pytest.mark.parametrize('typ, username, passwords', parameters)
|
|
|
|
def test_imap_wrong_password(typ, username, passwords):
|
2022-07-04 22:28:44 +02:00
|
|
|
imap = IMAP4_SSL(data['address'])
|
|
|
|
try:
|
|
|
|
imap.LOGIN(username, 'b')
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
raise Exception('wrong login !')
|
|
|
|
|
|
|
|
|
2022-07-16 22:16:24 +02:00
|
|
|
@pytest.mark.parametrize('typ, username, passwords', parameters)
|
|
|
|
def test_imap_migration(typ, username, passwords):
|
2022-07-04 22:28:44 +02:00
|
|
|
msg = get_msg(username, 'MIGRATION')
|
2022-07-05 22:09:16 +02:00
|
|
|
if 'FIRST_RUN' in environ:
|
2022-07-04 22:28:44 +02:00
|
|
|
smtp = SMTP(data['address'], '587')
|
|
|
|
smtp.starttls()
|
2022-07-16 22:16:24 +02:00
|
|
|
error = None
|
|
|
|
for password in passwords:
|
|
|
|
try:
|
|
|
|
smtp.login(username, password)
|
|
|
|
break
|
|
|
|
except SMTPAuthenticationError as err:
|
|
|
|
error = err
|
|
|
|
else:
|
|
|
|
raise error from error
|
2022-07-04 22:28:44 +02:00
|
|
|
smtp.sendmail(username, username, msg)
|
|
|
|
smtp.quit()
|
|
|
|
imap = IMAP4_SSL(data['address'])
|
2022-07-16 22:16:24 +02:00
|
|
|
error = None
|
|
|
|
for password in passwords:
|
|
|
|
try:
|
|
|
|
imap.LOGIN(username, password)
|
|
|
|
break
|
|
|
|
except Exception as err:
|
|
|
|
error = err
|
|
|
|
else:
|
|
|
|
raise error from error
|
2022-07-04 22:28:44 +02:00
|
|
|
imap.SELECT(readonly=True)
|
|
|
|
typ, req = imap.SEARCH(None, 'ALL')
|
|
|
|
assert typ == 'OK'
|
|
|
|
assert len(req) == 1
|
|
|
|
assert req[0] == b'1'
|
|
|
|
field = imap.FETCH('1', '(RFC822)')
|
|
|
|
assert field[0] == 'OK'
|
|
|
|
assert field[1][-2][-1].decode().endswith(msg)
|
|
|
|
imap.CLOSE()
|
|
|
|
imap.LOGOUT()
|
|
|
|
|
|
|
|
|
2022-07-16 22:16:24 +02:00
|
|
|
@pytest.mark.parametrize('typ, username, passwords', parameters)
|
|
|
|
def test_smtp_no_tls(typ, username, passwords):
|
2022-07-04 22:28:44 +02:00
|
|
|
smtp = SMTP(data['address'], '587')
|
2022-07-16 22:16:24 +02:00
|
|
|
with pytest.raises(SMTPNotSupportedError):
|
|
|
|
smtp.login(username, passwords[0])
|
2022-07-04 22:28:44 +02:00
|
|
|
|
|
|
|
|
2022-07-16 22:16:24 +02:00
|
|
|
@pytest.mark.parametrize('typ, username, passwords', parameters)
|
|
|
|
def test_smtp_wrong_passwd(typ, username, passwords):
|
2022-07-04 22:28:44 +02:00
|
|
|
smtp = SMTP(data['address'], '587')
|
|
|
|
smtp.starttls()
|
2022-07-16 22:16:24 +02:00
|
|
|
with pytest.raises(SMTPAuthenticationError):
|
2022-07-04 22:28:44 +02:00
|
|
|
smtp.login(username, 'a')
|
|
|
|
smtp.quit()
|
|
|
|
|
|
|
|
|
2022-07-16 22:16:24 +02:00
|
|
|
@pytest.mark.parametrize('typ, username, passwords', parameters)
|
|
|
|
def test_smtp_login(typ, username, passwords):
|
2022-07-04 22:28:44 +02:00
|
|
|
smtp = SMTP(data['address'], '587')
|
|
|
|
smtp.starttls()
|
2022-07-16 22:16:24 +02:00
|
|
|
error = None
|
|
|
|
for password in passwords:
|
|
|
|
try:
|
|
|
|
smtp.login(username, password)
|
|
|
|
break
|
|
|
|
except SMTPAuthenticationError as err:
|
|
|
|
error = err
|
|
|
|
else:
|
|
|
|
raise error from error
|
2022-07-04 22:28:44 +02:00
|
|
|
smtp.quit()
|
|
|
|
|
|
|
|
|
2022-07-16 22:16:24 +02:00
|
|
|
@pytest.mark.parametrize('typ, username, passwords', parameters)
|
|
|
|
def test_smtp_sendmail(typ, username, passwords):
|
2022-07-04 22:28:44 +02:00
|
|
|
smtp = SMTP(data['address'], '587')
|
|
|
|
smtp.starttls()
|
2022-07-16 22:16:24 +02:00
|
|
|
error = None
|
|
|
|
for password in passwords:
|
|
|
|
try:
|
|
|
|
smtp.login(username, password)
|
|
|
|
break
|
|
|
|
except SMTPAuthenticationError as err:
|
|
|
|
error = err
|
|
|
|
else:
|
|
|
|
raise error from error
|
2022-07-04 22:28:44 +02:00
|
|
|
smtp.sendmail(username, username, get_msg(username))
|
|
|
|
smtp.quit()
|
|
|
|
|
|
|
|
|
2022-07-16 22:16:24 +02:00
|
|
|
@pytest.mark.parametrize('typ, username, passwords', parameters)
|
|
|
|
def test_imap_read_mail(typ, username, passwords):
|
2022-07-04 22:28:44 +02:00
|
|
|
imap = IMAP4_SSL(data['address'])
|
2022-07-16 22:16:24 +02:00
|
|
|
error = None
|
|
|
|
for password in passwords:
|
|
|
|
try:
|
|
|
|
imap.LOGIN(username, password)
|
|
|
|
break
|
|
|
|
except Exception as err:
|
|
|
|
error = err
|
|
|
|
else:
|
|
|
|
raise error from error
|
2022-07-04 22:28:44 +02:00
|
|
|
imap.SELECT(readonly=True)
|
|
|
|
typ, req = imap.SEARCH(None, 'ALL')
|
|
|
|
assert typ == 'OK'
|
|
|
|
assert len(req) == 1
|
|
|
|
msg = get_msg(username)
|
|
|
|
msg_no = req[0].split()
|
|
|
|
assert len(msg_no) == 2
|
|
|
|
for num in msg_no[1:]:
|
|
|
|
field = imap.FETCH(num, '(RFC822)')
|
|
|
|
assert field[0] == 'OK'
|
|
|
|
assert field[1][-2][-1].decode().endswith(msg)
|
|
|
|
imap.CLOSE()
|
|
|
|
imap.LOGOUT()
|
|
|
|
|
|
|
|
|
2022-07-16 22:16:24 +02:00
|
|
|
@pytest.mark.parametrize('typ, username, passwords', parameters)
|
|
|
|
def test_imap_delete_mail(typ, username, passwords):
|
2022-07-04 22:28:44 +02:00
|
|
|
imap = IMAP4_SSL(data['address'])
|
2022-07-16 22:16:24 +02:00
|
|
|
error = None
|
|
|
|
for password in passwords:
|
|
|
|
try:
|
|
|
|
imap.LOGIN(username, password)
|
|
|
|
break
|
|
|
|
except Exception as err:
|
|
|
|
error = err
|
|
|
|
else:
|
|
|
|
raise error from error
|
2022-07-04 22:28:44 +02:00
|
|
|
imap.SELECT()
|
|
|
|
typ, req = imap.SEARCH(None, 'ALL')
|
|
|
|
msg_no = req[0].split()
|
|
|
|
for num in msg_no[1:]:
|
|
|
|
ret = imap.store(num, '+FLAGS', '\\Deleted')
|
|
|
|
assert ret[0] == 'OK', f'error when deleting mail: {ret}'
|
|
|
|
imap.expunge()
|
|
|
|
imap.CLOSE()
|
|
|
|
imap.LOGOUT()
|