dataset/seed/forgejo/tests/test_forgejo.py

308 lines
11 KiB
Python
Raw Normal View History

2023-01-17 21:43:32 +01:00
import datetime
2022-07-16 22:16:24 +02:00
from yaml import load, SafeLoader
2023-01-03 11:36:37 +01:00
from os import environ, makedirs, unlink
2022-07-16 22:16:24 +02:00
from os.path import expandvars, isfile, isdir, dirname, join
from re import search
2023-01-03 11:36:37 +01:00
from shutil import move
from glob import glob
2022-07-16 22:16:24 +02:00
from tempfile import TemporaryDirectory
from subprocess import run
2023-01-03 11:36:37 +01:00
from dulwich.porcelain import init, clone, add, commit, push
2022-07-16 22:16:24 +02:00
from revprox import Authentication
from mookdns import MookDnsSystem
PORT = '3000'
2023-01-17 21:43:32 +01:00
FORGEJO_USERNAME = 'forgejo'
FORGEJO_PORT = '2222'
2023-01-03 11:36:37 +01:00
KEY_FILE = '/var/lib/risotto/srv/hosts/forgejo'
# transition between gitea and forgejo
GITEA_KEY_FILE = '/var/lib/risotto/srv/hosts/gitea'
CONFIG_SSH = expandvars('$HOME/.ssh/config')
CONFIG_GIT = expandvars('$HOME/.gitconfig')
2022-07-16 22:16:24 +02:00
AUTHENTICATION = None
DATA = None
def get_data():
global DATA
if not DATA:
2023-01-03 11:36:37 +01:00
conf_file = f'{environ["MACHINE_TEST_DIR"]}/forgejo.yml'
2022-07-16 22:16:24 +02:00
with open(conf_file) as yaml:
DATA = load(yaml, Loader=SafeLoader)
return DATA
def get_authentication(data):
global AUTHENTICATION
if not AUTHENTICATION:
AUTHENTICATION = Authentication(data['auth_url'],
data['auth_server'],
data['revprox_ip'],
data['username'],
data['password'],
2023-01-03 11:36:37 +01:00
# f'<title>{data["username"]} - Tableau de bord - {data["forgejo_title"]}</title>',
f'<title>{data["username"]} - Dashboard - {data["forgejo_title"]}</title>',
2022-07-16 22:16:24 +02:00
)
return AUTHENTICATION
2023-01-03 11:36:37 +01:00
class SSHConfig:
def __enter__(self):
self.old_file = '{CONFIG_SSH}.old'
if isfile(CONFIG_SSH) and not isfile(self.old_file):
move(CONFIG_SSH, self.old_file)
with open(CONFIG_SSH, 'w') as fh:
fh.write(f"""Host *
User forgejo
PubkeyAcceptedKeyTypes +ssh-rsa
StrictHostKeyChecking no
IdentityFile {KEY_FILE}
""")
def __exit__(self, *args):
if isfile(self.old_file):
move(self.old_file, CONFIG_SSH)
else:
unlink(CONFIG_SSH)
class GITConfig:
def __enter__(self):
self.old_file = '{CONFIG_GIT}.old'
if isfile(CONFIG_GIT) and not isfile(self.old_file):
move(CONFIG_GIT, self.old_file)
with open(CONFIG_GIT, 'w') as fh:
conf_file = f'{environ["MACHINE_TEST_DIR"]}/reverse-proxy.yml'
with open(conf_file) as yaml:
data = load(yaml, Loader=SafeLoader)
path = join(environ["MACHINE_TEST_DIR"], data["ca_certificate"])
cert = glob(path)
fh.write(f"""[http]
sslCAInfo = {cert[0]}
""")
def __exit__(self, *args):
if isfile(self.old_file):
move(self.old_file, CONFIG_GIT)
else:
unlink(CONFIG_GIT)
2022-07-16 22:16:24 +02:00
def get_info(authentication,
url,
with_uid=False,
with_data_id=False,
found_string=None
):
pattern_csrf = r'name="_csrf" value="([a-zA-Z0-9\-\_=]+)"'
ret = authentication.get(url)
csrf = search(pattern_csrf, ret)[1]
ret_data = []
if with_uid:
pattern_uid = r'input type="hidden" id="uid" name="uid" value="(\d)+"'
uid = search(pattern_uid, ret)
if uid is None:
ret_data.append(uid)
else:
ret_data.append(uid[1])
if with_data_id:
pattern_uid = r'/user/settings/keys/delete?type=ssh" data-id="(\d)+"'
uid = search(pattern_uid, ret)
if uid is None:
ret_data.append(uid)
else:
ret_data.append(uid[1])
if found_string:
ret_data.append(found_string in ret)
ret_data.append(csrf)
if len(ret_data) == 1:
return ret_data[0]
return ret_data
def add_ssh_key(authentication, data):
2023-01-03 11:36:37 +01:00
# Send key to forgejo
2022-07-16 22:16:24 +02:00
url = f'{data["base_url"]}user/settings/keys'
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
if is_already_key:
return
# Gen SSH key if needed
if not isfile(KEY_FILE):
key_dir = dirname(KEY_FILE)
if not isdir(key_dir):
makedirs(key_dir)
2023-01-03 11:36:37 +01:00
cmd = ['/usr/bin/ssh-keygen', '-t', 'rsa', '-N', '', '-f', KEY_FILE]
2022-07-16 22:16:24 +02:00
run(cmd)
with open(f'{KEY_FILE}.pub') as fh:
key = fh.read()
authentication.post(url, {'_csrf': csrf, 'title': 'test_key_risotto', 'content': key, 'type': 'ssh'})
def delete_ssh_key(authentication, data):
url = f'{data["base_url"]}user/settings/keys'
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
if is_already_key:
uid, csrf = get_info(authentication, url, with_data_id=True)
url = f'{data["base_url"]}user/settings/keys/delete?type=ssh'
authentication.post(url, {'_csrf': csrf, 'id': uid})
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
2023-01-03 11:36:37 +01:00
def test_forgejo():
2022-07-16 22:16:24 +02:00
data = get_data()
get_authentication(data)
2023-01-03 11:36:37 +01:00
def test_forgejo_repos():
2022-07-16 22:16:24 +02:00
data = get_data()
authentication = get_authentication(data)
if 'FIRST_RUN' in environ:
url = f'{data["base_url"]}repo/create'
uid, csrf = get_info(authentication, url, with_uid=True)
authentication.post(url, {'_csrf': csrf, 'uid': uid, 'repo_name': 'test_persistent'})
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
json = authentication.get(url, json=True)
assert json['ok']
assert len(json['data']) == 1
username = data['username'].split('@', 1)[0]
assert json['data'][0]['full_name'] == f'{username}/test_persistent'
2023-01-03 11:36:37 +01:00
def test_forgejo_create_repo():
2022-07-16 22:16:24 +02:00
data = get_data()
authentication = get_authentication(data)
url = f'{data["base_url"]}repo/create'
uid, csrf = get_info(authentication, url, with_uid=True)
authentication.post(url, {'_csrf': csrf, 'uid': uid, 'repo_name': 'test', 'default_branch': 'main'})
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
json = authentication.get(url, json=True)
assert json['ok']
assert len(json['data']) == 2
username = data['username'].split('@', 1)[0]
assert {dat['full_name'] for dat in json['data']} == set([f'{username}/test_persistent', f'{username}/test'])
def test_repo():
data = get_data()
authentication = get_authentication(data)
if 'FIRST_RUN' in environ:
# delete_ssh_key(authentication, data)
add_ssh_key(authentication, data)
2023-01-03 11:36:37 +01:00
if not isfile(KEY_FILE):
if isfile(GITEA_KEY_FILE):
move(GITEA_KEY_FILE, KEY_FILE)
move(GITEA_KEY_FILE + '.pub', KEY_FILE + '.pub')
else:
raise Exception(f'cannot find ssh key "{KEY_FILE}", do you run with FIRST_RUN?')
2022-07-16 22:16:24 +02:00
with TemporaryDirectory() as tmpdirname:
username = data['username'].split('@', 1)[0]
dns = data['base_url'].split('/', 3)[2]
2023-01-17 21:43:32 +01:00
ssh_url = f'ssh://{FORGEJO_USERNAME}@{dns}:{FORGEJO_PORT}/{username}/test.git'
2023-01-03 11:36:37 +01:00
with SSHConfig():
with MookDnsSystem(dns, data['ip']):
filename = join(tmpdirname, 'test.txt')
with open(filename, 'w') as fh:
fh.write('test')
repo = init(tmpdirname)
add(repo, filename)
commit(repo, message=b'test commit')
push(repo=repo,
remote_location=ssh_url,
refspecs='master',
)
lst = list(repo.get_walker())
assert len(lst) == 1
assert lst[0].commit.message == b'test commit'
2022-07-16 22:16:24 +02:00
def test_clone_http():
data = get_data()
authentication = get_authentication(data)
if 'FIRST_RUN' in environ:
# delete_ssh_key(authentication, data)
add_ssh_key(authentication, data)
with TemporaryDirectory() as tmpdirname:
username = data['username'].split('@', 1)[0]
dns = data['base_url'].split('/', 3)[2]
http_url = f'{data["base_url"]}{username}/test.git'
2023-01-03 11:36:37 +01:00
with SSHConfig():
with MookDnsSystem(dns, data['revprox_ip']):
try:
repo = clone(http_url, tmpdirname)
except:
with GITConfig():
repo = clone(http_url, tmpdirname)
lst = list(repo.get_walker())
assert len(lst) == 1
assert lst[0].commit.message == b'test commit'
2022-07-16 22:16:24 +02:00
2023-01-03 11:36:37 +01:00
def test_forgejo_delete_repo():
2022-07-16 22:16:24 +02:00
repo_name = 'test'
data = get_data()
authentication = get_authentication(data)
username = data['username'].split('@', 1)[0]
url = f'{data["base_url"]}{username}/{repo_name}/settings'
csrf = get_info(authentication, url)
authentication.post(url, {'_csrf': csrf, 'action': 'delete', 'repo_name': repo_name})
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
json = authentication.get(url, json=True)
assert json['ok']
assert len(json['data']) == 1
username = data['username'].split('@', 1)[0]
assert json['data'][0]['full_name'] == f'{username}/test_persistent'
def test_repo_persistent():
data = get_data()
authentication = get_authentication(data)
if 'FIRST_RUN' in environ:
# delete_ssh_key(authentication, data)
add_ssh_key(authentication, data)
with TemporaryDirectory() as tmpdirname:
username = data['username'].split('@', 1)[0]
dns = data['base_url'].split('/', 3)[2]
2023-01-17 21:43:32 +01:00
ssh_url = f'ssh://{FORGEJO_USERNAME}@{dns}:{FORGEJO_PORT}/{username}/test_persistent.git'
2023-01-03 11:36:37 +01:00
with SSHConfig():
with MookDnsSystem(dns, data['ip']):
2023-01-17 21:43:32 +01:00
filename = join(tmpdirname, 'test.txt')
2023-01-03 11:36:37 +01:00
if 'FIRST_RUN' in environ:
with open(filename, 'w') as fh:
fh.write('test')
repo = init(tmpdirname)
add(repo, filename)
commit(repo, message=b'test commit')
push(repo=repo,
remote_location=ssh_url,
refspecs='master',
)
else:
repo = clone(ssh_url, tmpdirname)
2023-01-17 21:43:32 +01:00
with open(filename, 'r') as fh:
len_file = len(fh.readlines())
# get previous commit number
2023-01-03 11:36:37 +01:00
lst = list(repo.get_walker())
2023-01-17 21:43:32 +01:00
len_before_commit = len(lst)
assert len_before_commit == len_file
# add a new line in file and commit
with open(filename, 'a') as fh:
fh.write('\ntest')
add(repo, filename)
date = datetime.datetime.now()
commit_message = f'test commit {date}'.encode()
commit(repo, message=commit_message)
push(repo=repo,
remote_location=ssh_url,
refspecs='master',
)
# test if commit is added and last commit
lst = list(repo.get_walker())
len_after_commit = len(lst)
assert len_before_commit + 1 == len_after_commit
assert lst[-1].commit.message == commit_message