from yaml import load, SafeLoader
from os import environ, makedirs
from os.path import expandvars, isfile, isdir, dirname, join
from re import search
from dulwich.porcelain import init, clone, add, commit, push
from tempfile import TemporaryDirectory
from subprocess import run
from revprox import Authentication
from mookdns import MookDnsSystem
PORT = '3000'
GITEA_USERNAME = 'gitea'
KEY_FILE = expandvars("$HOME/tests/risotto")
AUTHENTICATION = None
DATA = None
def get_data():
global DATA
if not DATA:
conf_file = f'{environ["MACHINE_TEST_DIR"]}/gitea.yml'
with open(conf_file) as yaml:
DATA = load(yaml, Loader=SafeLoader)
return DATA
def get_authentication(data):
global AUTHENTICATION
if not AUTHENTICATION:
AUTHENTICATION = Authentication(data['auth_url'],
data['auth_server'],
data['revprox_ip'],
data['username'],
data['password'],
f'
{data["username"]} - Dashboard - {data["gitea_title"]}',
)
return AUTHENTICATION
def get_info(authentication,
url,
with_uid=False,
with_data_id=False,
found_string=None
):
#
pattern_csrf = r'name="_csrf" value="([a-zA-Z0-9\-\_=]+)"'
ret = authentication.get(url)
csrf = search(pattern_csrf, ret)[1]
ret_data = []
if with_uid:
pattern_uid = r'input type="hidden" id="uid" name="uid" value="(\d)+"'
uid = search(pattern_uid, ret)
if uid is None:
ret_data.append(uid)
else:
ret_data.append(uid[1])
if with_data_id:
pattern_uid = r'/user/settings/keys/delete?type=ssh" data-id="(\d)+"'
uid = search(pattern_uid, ret)
if uid is None:
ret_data.append(uid)
else:
ret_data.append(uid[1])
if found_string:
ret_data.append(found_string in ret)
ret_data.append(csrf)
if len(ret_data) == 1:
return ret_data[0]
return ret_data
def add_ssh_key(authentication, data):
# Send key to gitea
url = f'{data["base_url"]}user/settings/keys'
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
if is_already_key:
return
# Gen SSH key if needed
if not isfile(KEY_FILE):
key_dir = dirname(KEY_FILE)
if not isdir(key_dir):
makedirs(key_dir)
cmd = ['/usr/bin/ssh-keygen', '-N', '', '-f', KEY_FILE]
run(cmd)
with open(f'{KEY_FILE}.pub') as fh:
key = fh.read()
authentication.post(url, {'_csrf': csrf, 'title': 'test_key_risotto', 'content': key, 'type': 'ssh'})
def delete_ssh_key(authentication, data):
url = f'{data["base_url"]}user/settings/keys'
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
if is_already_key:
uid, csrf = get_info(authentication, url, with_data_id=True)
url = f'{data["base_url"]}user/settings/keys/delete?type=ssh'
authentication.post(url, {'_csrf': csrf, 'id': uid})
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
def test_gitea():
data = get_data()
get_authentication(data)
def test_gitea_repos():
data = get_data()
authentication = get_authentication(data)
if 'FIRST_RUN' in environ:
url = f'{data["base_url"]}repo/create'
uid, csrf = get_info(authentication, url, with_uid=True)
authentication.post(url, {'_csrf': csrf, 'uid': uid, 'repo_name': 'test_persistent'})
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
json = authentication.get(url, json=True)
assert json['ok']
assert len(json['data']) == 1
username = data['username'].split('@', 1)[0]
assert json['data'][0]['full_name'] == f'{username}/test_persistent'
def test_gitea_create_repo():
data = get_data()
authentication = get_authentication(data)
url = f'{data["base_url"]}repo/create'
uid, csrf = get_info(authentication, url, with_uid=True)
authentication.post(url, {'_csrf': csrf, 'uid': uid, 'repo_name': 'test', 'default_branch': 'main'})
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
json = authentication.get(url, json=True)
assert json['ok']
assert len(json['data']) == 2
username = data['username'].split('@', 1)[0]
assert {dat['full_name'] for dat in json['data']} == set([f'{username}/test_persistent', f'{username}/test'])
def test_repo():
data = get_data()
authentication = get_authentication(data)
if 'FIRST_RUN' in environ:
# delete_ssh_key(authentication, data)
add_ssh_key(authentication, data)
with TemporaryDirectory() as tmpdirname:
username = data['username'].split('@', 1)[0]
dns = data['base_url'].split('/', 3)[2]
ssh_url = f'ssh://{GITEA_USERNAME}@{dns}:2222/{username}/test.git'
with MookDnsSystem(dns, data['ip']):
filename = join(tmpdirname, 'test.txt')
with open(filename, 'w') as fh:
fh.write('test')
repo = init(tmpdirname)
add(repo, filename)
commit(repo, message=b'test commit')
push(repo=repo,
remote_location=ssh_url,
refspecs='master',
)
lst = list(repo.get_walker())
assert len(lst) == 1
assert lst[0].commit.message == b'test commit'
def test_clone_http():
data = get_data()
authentication = get_authentication(data)
if 'FIRST_RUN' in environ:
# delete_ssh_key(authentication, data)
add_ssh_key(authentication, data)
with TemporaryDirectory() as tmpdirname:
username = data['username'].split('@', 1)[0]
dns = data['base_url'].split('/', 3)[2]
http_url = f'{data["base_url"]}{username}/test.git'
with MookDnsSystem(dns, data['revprox_ip']):
repo = clone(http_url, tmpdirname)
lst = list(repo.get_walker())
assert len(lst) == 1
assert lst[0].commit.message == b'test commit'
def test_gitea_delete_repo():
repo_name = 'test'
data = get_data()
authentication = get_authentication(data)
username = data['username'].split('@', 1)[0]
url = f'{data["base_url"]}{username}/{repo_name}/settings'
csrf = get_info(authentication, url)
authentication.post(url, {'_csrf': csrf, 'action': 'delete', 'repo_name': repo_name})
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
json = authentication.get(url, json=True)
assert json['ok']
assert len(json['data']) == 1
username = data['username'].split('@', 1)[0]
assert json['data'][0]['full_name'] == f'{username}/test_persistent'
def test_repo_persistent():
data = get_data()
authentication = get_authentication(data)
if 'FIRST_RUN' in environ:
# delete_ssh_key(authentication, data)
add_ssh_key(authentication, data)
with TemporaryDirectory() as tmpdirname:
username = data['username'].split('@', 1)[0]
dns = data['base_url'].split('/', 3)[2]
ssh_url = f'ssh://{GITEA_USERNAME}@{dns}:2222/{username}/test_persistent.git'
with MookDnsSystem(dns, data['ip']):
if 'FIRST_RUN' in environ:
filename = join(tmpdirname, 'test.txt')
with open(filename, 'w') as fh:
fh.write('test')
repo = init(tmpdirname)
add(repo, filename)
commit(repo, message=b'test commit')
push(repo=repo,
remote_location=ssh_url,
refspecs='master',
)
else:
repo = clone(ssh_url, tmpdirname)
lst = list(repo.get_walker())
assert len(lst) == 1
assert lst[0].commit.message == b'test commit'