from yaml import load, SafeLoader from os import environ, makedirs from os.path import expandvars, isfile, isdir, dirname, join from re import search from dulwich.porcelain import init, clone, add, commit, push from tempfile import TemporaryDirectory from subprocess import run from revprox import Authentication from mookdns import MookDnsSystem PORT = '3000' GITEA_USERNAME = 'gitea' KEY_FILE = expandvars("$HOME/tests/risotto") AUTHENTICATION = None DATA = None def get_data(): global DATA if not DATA: conf_file = f'{environ["MACHINE_TEST_DIR"]}/gitea.yml' with open(conf_file) as yaml: DATA = load(yaml, Loader=SafeLoader) return DATA def get_authentication(data): global AUTHENTICATION if not AUTHENTICATION: AUTHENTICATION = Authentication(data['auth_url'], data['auth_server'], data['revprox_ip'], data['username'], data['password'], f'{data["username"]} - Dashboard - {data["gitea_title"]}', ) return AUTHENTICATION def get_info(authentication, url, with_uid=False, with_data_id=False, found_string=None ): # pattern_csrf = r'name="_csrf" value="([a-zA-Z0-9\-\_=]+)"' ret = authentication.get(url) csrf = search(pattern_csrf, ret)[1] ret_data = [] if with_uid: pattern_uid = r'input type="hidden" id="uid" name="uid" value="(\d)+"' uid = search(pattern_uid, ret) if uid is None: ret_data.append(uid) else: ret_data.append(uid[1]) if with_data_id: pattern_uid = r'/user/settings/keys/delete?type=ssh" data-id="(\d)+"' uid = search(pattern_uid, ret) if uid is None: ret_data.append(uid) else: ret_data.append(uid[1]) if found_string: ret_data.append(found_string in ret) ret_data.append(csrf) if len(ret_data) == 1: return ret_data[0] return ret_data def add_ssh_key(authentication, data): # Send key to gitea url = f'{data["base_url"]}user/settings/keys' is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto') if is_already_key: return # Gen SSH key if needed if not isfile(KEY_FILE): key_dir = dirname(KEY_FILE) if not isdir(key_dir): makedirs(key_dir) cmd = ['/usr/bin/ssh-keygen', '-N', '', '-f', KEY_FILE] run(cmd) with open(f'{KEY_FILE}.pub') as fh: key =, {'_csrf': csrf, 'title': 'test_key_risotto', 'content': key, 'type': 'ssh'}) def delete_ssh_key(authentication, data): url = f'{data["base_url"]}user/settings/keys' is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto') if is_already_key: uid, csrf = get_info(authentication, url, with_data_id=True) url = f'{data["base_url"]}user/settings/keys/delete?type=ssh', {'_csrf': csrf, 'id': uid}) is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto') def test_gitea(): data = get_data() get_authentication(data) def test_gitea_repos(): data = get_data() authentication = get_authentication(data) if 'FIRST_RUN' in environ: url = f'{data["base_url"]}repo/create' uid, csrf = get_info(authentication, url, with_uid=True), {'_csrf': csrf, 'uid': uid, 'repo_name': 'test_persistent'}) url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode=' json = authentication.get(url, json=True) assert json['ok'] assert len(json['data']) == 1 username = data['username'].split('@', 1)[0] assert json['data'][0]['full_name'] == f'{username}/test_persistent' def test_gitea_create_repo(): data = get_data() authentication = get_authentication(data) url = f'{data["base_url"]}repo/create' uid, csrf = get_info(authentication, url, with_uid=True), {'_csrf': csrf, 'uid': uid, 'repo_name': 'test', 'default_branch': 'main'}) url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode=' json = authentication.get(url, json=True) assert json['ok'] assert len(json['data']) == 2 username = data['username'].split('@', 1)[0] assert {dat['full_name'] for dat in json['data']} == set([f'{username}/test_persistent', f'{username}/test']) def test_repo(): data = get_data() authentication = get_authentication(data) if 'FIRST_RUN' in environ: # delete_ssh_key(authentication, data) add_ssh_key(authentication, data) with TemporaryDirectory() as tmpdirname: username = data['username'].split('@', 1)[0] dns = data['base_url'].split('/', 3)[2] ssh_url = f'ssh://{GITEA_USERNAME}@{dns}:2222/{username}/test.git' with MookDnsSystem(dns, data['ip']): filename = join(tmpdirname, 'test.txt') with open(filename, 'w') as fh: fh.write('test') repo = init(tmpdirname) add(repo, filename) commit(repo, message=b'test commit') push(repo=repo, remote_location=ssh_url, refspecs='master', ) lst = list(repo.get_walker()) assert len(lst) == 1 assert lst[0].commit.message == b'test commit' def test_clone_http(): data = get_data() authentication = get_authentication(data) if 'FIRST_RUN' in environ: # delete_ssh_key(authentication, data) add_ssh_key(authentication, data) with TemporaryDirectory() as tmpdirname: username = data['username'].split('@', 1)[0] dns = data['base_url'].split('/', 3)[2] http_url = f'{data["base_url"]}{username}/test.git' with MookDnsSystem(dns, data['revprox_ip']): repo = clone(http_url, tmpdirname) lst = list(repo.get_walker()) assert len(lst) == 1 assert lst[0].commit.message == b'test commit' def test_gitea_delete_repo(): repo_name = 'test' data = get_data() authentication = get_authentication(data) username = data['username'].split('@', 1)[0] url = f'{data["base_url"]}{username}/{repo_name}/settings' csrf = get_info(authentication, url), {'_csrf': csrf, 'action': 'delete', 'repo_name': repo_name}) url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode=' json = authentication.get(url, json=True) assert json['ok'] assert len(json['data']) == 1 username = data['username'].split('@', 1)[0] assert json['data'][0]['full_name'] == f'{username}/test_persistent' def test_repo_persistent(): data = get_data() authentication = get_authentication(data) if 'FIRST_RUN' in environ: # delete_ssh_key(authentication, data) add_ssh_key(authentication, data) with TemporaryDirectory() as tmpdirname: username = data['username'].split('@', 1)[0] dns = data['base_url'].split('/', 3)[2] ssh_url = f'ssh://{GITEA_USERNAME}@{dns}:2222/{username}/test_persistent.git' with MookDnsSystem(dns, data['ip']): if 'FIRST_RUN' in environ: filename = join(tmpdirname, 'test.txt') with open(filename, 'w') as fh: fh.write('test') repo = init(tmpdirname) add(repo, filename) commit(repo, message=b'test commit') push(repo=repo, remote_location=ssh_url, refspecs='master', ) else: repo = clone(ssh_url, tmpdirname) lst = list(repo.get_walker()) assert len(lst) == 1 assert lst[0].commit.message == b'test commit'