from requests import get, post, session
from requests.exceptions import SSLError
from mookdns import MookDns
from os import environ
from os.path import join
from yaml import load, SafeLoader
from glob import glob
VERIFY = True
class Authentication:
def __init__(self,
auth_url,
portal_server,
ip,
username,
password,
title,
):
self.ip = ip
with session() as req:
with MookDns(self.ip):
self.is_lemonldap(req,
auth_url,
)
self.auth_lemonldap(req,
portal_server,
username,
password,
title,
)
self.cookies = dict(req.cookies)
# @staticmethod
def is_lemonldap(self,
req,
url,
):
global VERIFY
try:
ret = req.get(url, verify=VERIFY)
except SSLError:
conf_file = f'{environ["MACHINE_TEST_DIR"]}/reverse-proxy-client.yml'
with open(conf_file) as yaml:
data = load(yaml, Loader=SafeLoader)
path = join(environ["MACHINE_TEST_DIR"], data["ca_certificate"])
cert = glob(path)
if len(cert) != 1:
raise Exception(f'{path} should find one and one certificate but found: {cert}')
VERIFY=cert[0]
ret = req.get(url, verify=VERIFY)
code = ret.status_code
content = ret.content
assert code == 200, f"cannot access to lemonldap; {content}"
assert b'
Authentication portal' in content, f'cannot find LemonLdap title: {content}'
def auth_lemonldap(self,
req,
portal_server,
username,
password,
title,
):
# authentification
json = {'user': username,
'password': password,
}
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
}
portal_url = f'https://{portal_server}/oauth2/'
ret = req.post(portal_url, data=json, headers=headers, verify=VERIFY)
json = ret.json()
assert json['error']
assert json['result'] == 1
assert json['id'] == ret.cookies.get('lemonldap')
# authorization code
# curl -X POST -d user=dwho -d password=dwho -H 'Accept: application/json' 'https://oidctest.wsweet.org/oauth2/'
# curl -s -D - -o /dev/null -b lemonldap=0640f95827111f00ba7ad5863ba819fe46cfbcecdb18ce525836369fb4c8350b 'https://oidctest.wsweet.org/oauth2/authorize?response_type=code&client_id=private&scope=openid+profile+email&redirect_uri=http://localhost' | grep '^location'
authorize_url = f'{portal_url}authorize'
ret = req.get(authorize_url, verify=VERIFY)
assert ret.status_code == 200
content = ret.content.decode()
assert title in content, f'cannot find {title} in {content}'
def get(self,
url,
json=False,
):
with MookDns(self.ip):
ret = get(url, cookies=self.cookies, verify=VERIFY)
assert ret.status_code == 200, f'return code is {ret.status_code}'
if json:
return ret.json()
return ret.content.decode()
def post(self,
url,
data,
headers=None,
):
with MookDns(self.ip):
ret = post(url, cookies=self.cookies, data=data, headers=headers, verify=VERIFY)
assert ret.status_code == 200, f'return code is {ret.status_code}'