forked from stove/dataset
106 lines
3.9 KiB
Python
106 lines
3.9 KiB
Python
from requests import get, post, session
|
|
from requests.exceptions import SSLError
|
|
from mookdns import MookDns
|
|
from os import environ
|
|
from os.path import join
|
|
from yaml import load, SafeLoader
|
|
from glob import glob
|
|
|
|
|
|
VERIFY = True
|
|
|
|
|
|
class Authentication:
|
|
def __init__(self,
|
|
auth_url,
|
|
portal_server,
|
|
ip,
|
|
username,
|
|
password,
|
|
title,
|
|
):
|
|
self.ip = ip
|
|
with session() as req:
|
|
with MookDns(self.ip):
|
|
self.is_lemonldap(req,
|
|
auth_url,
|
|
)
|
|
self.auth_lemonldap(req,
|
|
portal_server,
|
|
username,
|
|
password,
|
|
title,
|
|
)
|
|
self.cookies = dict(req.cookies)
|
|
|
|
# @staticmethod
|
|
def is_lemonldap(self,
|
|
req,
|
|
url,
|
|
):
|
|
global VERIFY
|
|
try:
|
|
ret = req.get(url, verify=VERIFY)
|
|
except SSLError:
|
|
conf_file = f'{environ["MACHINE_TEST_DIR"]}/reverse-proxy.yml'
|
|
with open(conf_file) as yaml:
|
|
data = load(yaml, Loader=SafeLoader)
|
|
path = join(environ["MACHINE_TEST_DIR"], data["ca_certificate"])
|
|
cert = glob(path)
|
|
if len(cert) != 1:
|
|
raise Exception(f'{path} should find one and one certificate but found: {cert}')
|
|
VERIFY=cert[0]
|
|
ret = req.get(url, verify=VERIFY)
|
|
code = ret.status_code
|
|
content = ret.content
|
|
assert code == 200, f"cannot access to lemonldap; {content}"
|
|
assert b'<title trspan="authPortal">Authentication portal</title>' in content, f'cannot find LemonLdap title: {content}'
|
|
|
|
def auth_lemonldap(self,
|
|
req,
|
|
portal_server,
|
|
username,
|
|
password,
|
|
title,
|
|
):
|
|
# authentification
|
|
json = {'user': username,
|
|
'password': password,
|
|
}
|
|
headers = {"Content-Type": "application/x-www-form-urlencoded",
|
|
"Accept": "application/json",
|
|
}
|
|
portal_url = f'https://{portal_server}/oauth2/'
|
|
ret = req.post(portal_url, data=json, headers=headers, verify=VERIFY)
|
|
json = ret.json()
|
|
assert json['error']
|
|
assert json['result'] == 1
|
|
assert json['id'] == ret.cookies.get('lemonldap')
|
|
# authorization code
|
|
# curl -X POST -d user=dwho -d password=dwho -H 'Accept: application/json' 'https://oidctest.wsweet.org/oauth2/'
|
|
# curl -s -D - -o /dev/null -b lemonldap=0640f95827111f00ba7ad5863ba819fe46cfbcecdb18ce525836369fb4c8350b 'https://oidctest.wsweet.org/oauth2/authorize?response_type=code&client_id=private&scope=openid+profile+email&redirect_uri=http://localhost' | grep '^location'
|
|
authorize_url = f'{portal_url}authorize'
|
|
ret = req.get(authorize_url, verify=VERIFY)
|
|
assert ret.status_code == 200
|
|
content = ret.content.decode()
|
|
assert title in content, f'cannot find {title} in {content}'
|
|
|
|
def get(self,
|
|
url,
|
|
json=False,
|
|
):
|
|
with MookDns(self.ip):
|
|
ret = get(url, cookies=self.cookies, verify=VERIFY)
|
|
assert ret.status_code == 200, f'return code is {ret.status_code}'
|
|
if json:
|
|
return ret.json()
|
|
return ret.content.decode()
|
|
|
|
def post(self,
|
|
url,
|
|
data,
|
|
headers=None,
|
|
):
|
|
with MookDns(self.ip):
|
|
ret = post(url, cookies=self.cookies, data=data, headers=headers, verify=VERIFY)
|
|
assert ret.status_code == 200, f'return code is {ret.status_code}'
|