forked from stove/dataset
227 lines
8.1 KiB
Python
227 lines
8.1 KiB
Python
|
from yaml import load, SafeLoader
|
||
|
from os import environ, makedirs
|
||
|
from os.path import expandvars, isfile, isdir, dirname, join
|
||
|
from re import search
|
||
|
from dulwich.porcelain import init, clone, add, commit, push
|
||
|
|
||
|
from tempfile import TemporaryDirectory
|
||
|
from subprocess import run
|
||
|
|
||
|
|
||
|
from revprox import Authentication
|
||
|
from mookdns import MookDnsSystem
|
||
|
|
||
|
|
||
|
PORT = '3000'
|
||
|
GITEA_USERNAME = 'gitea'
|
||
|
KEY_FILE = expandvars("$HOME/tests/risotto")
|
||
|
|
||
|
|
||
|
AUTHENTICATION = None
|
||
|
DATA = None
|
||
|
|
||
|
|
||
|
def get_data():
|
||
|
global DATA
|
||
|
if not DATA:
|
||
|
conf_file = f'{environ["MACHINE_TEST_DIR"]}/gitea.yml'
|
||
|
with open(conf_file) as yaml:
|
||
|
DATA = load(yaml, Loader=SafeLoader)
|
||
|
return DATA
|
||
|
|
||
|
|
||
|
def get_authentication(data):
|
||
|
global AUTHENTICATION
|
||
|
if not AUTHENTICATION:
|
||
|
AUTHENTICATION = Authentication(data['auth_url'],
|
||
|
data['auth_server'],
|
||
|
data['revprox_ip'],
|
||
|
data['username'],
|
||
|
data['password'],
|
||
|
f'<title>{data["username"]} - Dashboard - {data["gitea_title"]}</title>',
|
||
|
)
|
||
|
return AUTHENTICATION
|
||
|
|
||
|
|
||
|
def get_info(authentication,
|
||
|
url,
|
||
|
with_uid=False,
|
||
|
with_data_id=False,
|
||
|
found_string=None
|
||
|
):
|
||
|
# <input type="hidden" name="_csrf" value="YQbVgdYHX_3VQ-KuZ5cKtr9RzXE6MTY1NzgxMzUzNTA0OTYwODQ0NQ">
|
||
|
pattern_csrf = r'name="_csrf" value="([a-zA-Z0-9\-\_=]+)"'
|
||
|
ret = authentication.get(url)
|
||
|
csrf = search(pattern_csrf, ret)[1]
|
||
|
ret_data = []
|
||
|
if with_uid:
|
||
|
pattern_uid = r'input type="hidden" id="uid" name="uid" value="(\d)+"'
|
||
|
uid = search(pattern_uid, ret)
|
||
|
if uid is None:
|
||
|
ret_data.append(uid)
|
||
|
else:
|
||
|
ret_data.append(uid[1])
|
||
|
if with_data_id:
|
||
|
pattern_uid = r'/user/settings/keys/delete?type=ssh" data-id="(\d)+"'
|
||
|
uid = search(pattern_uid, ret)
|
||
|
if uid is None:
|
||
|
ret_data.append(uid)
|
||
|
else:
|
||
|
ret_data.append(uid[1])
|
||
|
if found_string:
|
||
|
ret_data.append(found_string in ret)
|
||
|
ret_data.append(csrf)
|
||
|
if len(ret_data) == 1:
|
||
|
return ret_data[0]
|
||
|
return ret_data
|
||
|
|
||
|
|
||
|
def add_ssh_key(authentication, data):
|
||
|
# Send key to gitea
|
||
|
url = f'{data["base_url"]}user/settings/keys'
|
||
|
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
|
||
|
if is_already_key:
|
||
|
return
|
||
|
# Gen SSH key if needed
|
||
|
if not isfile(KEY_FILE):
|
||
|
key_dir = dirname(KEY_FILE)
|
||
|
if not isdir(key_dir):
|
||
|
makedirs(key_dir)
|
||
|
cmd = ['/usr/bin/ssh-keygen', '-N', '', '-f', KEY_FILE]
|
||
|
run(cmd)
|
||
|
with open(f'{KEY_FILE}.pub') as fh:
|
||
|
key = fh.read()
|
||
|
authentication.post(url, {'_csrf': csrf, 'title': 'test_key_risotto', 'content': key, 'type': 'ssh'})
|
||
|
|
||
|
|
||
|
def delete_ssh_key(authentication, data):
|
||
|
url = f'{data["base_url"]}user/settings/keys'
|
||
|
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
|
||
|
if is_already_key:
|
||
|
uid, csrf = get_info(authentication, url, with_data_id=True)
|
||
|
url = f'{data["base_url"]}user/settings/keys/delete?type=ssh'
|
||
|
authentication.post(url, {'_csrf': csrf, 'id': uid})
|
||
|
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
|
||
|
|
||
|
|
||
|
def test_gitea():
|
||
|
data = get_data()
|
||
|
get_authentication(data)
|
||
|
|
||
|
|
||
|
def test_gitea_repos():
|
||
|
data = get_data()
|
||
|
authentication = get_authentication(data)
|
||
|
if 'FIRST_RUN' in environ:
|
||
|
url = f'{data["base_url"]}repo/create'
|
||
|
uid, csrf = get_info(authentication, url, with_uid=True)
|
||
|
authentication.post(url, {'_csrf': csrf, 'uid': uid, 'repo_name': 'test_persistent'})
|
||
|
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
|
||
|
json = authentication.get(url, json=True)
|
||
|
assert json['ok']
|
||
|
assert len(json['data']) == 1
|
||
|
username = data['username'].split('@', 1)[0]
|
||
|
assert json['data'][0]['full_name'] == f'{username}/test_persistent'
|
||
|
|
||
|
|
||
|
def test_gitea_create_repo():
|
||
|
data = get_data()
|
||
|
authentication = get_authentication(data)
|
||
|
url = f'{data["base_url"]}repo/create'
|
||
|
uid, csrf = get_info(authentication, url, with_uid=True)
|
||
|
authentication.post(url, {'_csrf': csrf, 'uid': uid, 'repo_name': 'test', 'default_branch': 'main'})
|
||
|
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
|
||
|
json = authentication.get(url, json=True)
|
||
|
assert json['ok']
|
||
|
assert len(json['data']) == 2
|
||
|
username = data['username'].split('@', 1)[0]
|
||
|
assert {dat['full_name'] for dat in json['data']} == set([f'{username}/test_persistent', f'{username}/test'])
|
||
|
|
||
|
|
||
|
def test_repo():
|
||
|
data = get_data()
|
||
|
authentication = get_authentication(data)
|
||
|
if 'FIRST_RUN' in environ:
|
||
|
# delete_ssh_key(authentication, data)
|
||
|
add_ssh_key(authentication, data)
|
||
|
with TemporaryDirectory() as tmpdirname:
|
||
|
username = data['username'].split('@', 1)[0]
|
||
|
dns = data['base_url'].split('/', 3)[2]
|
||
|
ssh_url = f'ssh://{GITEA_USERNAME}@{dns}:2222/{username}/test.git'
|
||
|
with MookDnsSystem(dns, data['ip']):
|
||
|
filename = join(tmpdirname, 'test.txt')
|
||
|
with open(filename, 'w') as fh:
|
||
|
fh.write('test')
|
||
|
repo = init(tmpdirname)
|
||
|
add(repo, filename)
|
||
|
commit(repo, message=b'test commit')
|
||
|
push(repo=repo,
|
||
|
remote_location=ssh_url,
|
||
|
refspecs='master',
|
||
|
)
|
||
|
lst = list(repo.get_walker())
|
||
|
assert len(lst) == 1
|
||
|
assert lst[0].commit.message == b'test commit'
|
||
|
|
||
|
|
||
|
def test_clone_http():
|
||
|
data = get_data()
|
||
|
authentication = get_authentication(data)
|
||
|
if 'FIRST_RUN' in environ:
|
||
|
# delete_ssh_key(authentication, data)
|
||
|
add_ssh_key(authentication, data)
|
||
|
with TemporaryDirectory() as tmpdirname:
|
||
|
username = data['username'].split('@', 1)[0]
|
||
|
dns = data['base_url'].split('/', 3)[2]
|
||
|
http_url = f'{data["base_url"]}{username}/test.git'
|
||
|
with MookDnsSystem(dns, data['revprox_ip']):
|
||
|
repo = clone(http_url, tmpdirname)
|
||
|
lst = list(repo.get_walker())
|
||
|
assert len(lst) == 1
|
||
|
assert lst[0].commit.message == b'test commit'
|
||
|
|
||
|
|
||
|
def test_gitea_delete_repo():
|
||
|
repo_name = 'test'
|
||
|
data = get_data()
|
||
|
authentication = get_authentication(data)
|
||
|
username = data['username'].split('@', 1)[0]
|
||
|
url = f'{data["base_url"]}{username}/{repo_name}/settings'
|
||
|
csrf = get_info(authentication, url)
|
||
|
authentication.post(url, {'_csrf': csrf, 'action': 'delete', 'repo_name': repo_name})
|
||
|
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
|
||
|
json = authentication.get(url, json=True)
|
||
|
assert json['ok']
|
||
|
assert len(json['data']) == 1
|
||
|
username = data['username'].split('@', 1)[0]
|
||
|
assert json['data'][0]['full_name'] == f'{username}/test_persistent'
|
||
|
|
||
|
|
||
|
def test_repo_persistent():
|
||
|
data = get_data()
|
||
|
authentication = get_authentication(data)
|
||
|
if 'FIRST_RUN' in environ:
|
||
|
# delete_ssh_key(authentication, data)
|
||
|
add_ssh_key(authentication, data)
|
||
|
with TemporaryDirectory() as tmpdirname:
|
||
|
username = data['username'].split('@', 1)[0]
|
||
|
dns = data['base_url'].split('/', 3)[2]
|
||
|
ssh_url = f'ssh://{GITEA_USERNAME}@{dns}:2222/{username}/test_persistent.git'
|
||
|
with MookDnsSystem(dns, data['ip']):
|
||
|
if 'FIRST_RUN' in environ:
|
||
|
filename = join(tmpdirname, 'test.txt')
|
||
|
with open(filename, 'w') as fh:
|
||
|
fh.write('test')
|
||
|
repo = init(tmpdirname)
|
||
|
add(repo, filename)
|
||
|
commit(repo, message=b'test commit')
|
||
|
push(repo=repo,
|
||
|
remote_location=ssh_url,
|
||
|
refspecs='master',
|
||
|
)
|
||
|
else:
|
||
|
repo = clone(ssh_url, tmpdirname)
|
||
|
lst = list(repo.get_walker())
|
||
|
assert len(lst) == 1
|
||
|
assert lst[0].commit.message == b'test commit'
|