289 lines
10 KiB
Python
289 lines
10 KiB
Python
from yaml import load, SafeLoader
|
|
from os import environ, makedirs, unlink
|
|
from os.path import expandvars, isfile, isdir, dirname, join
|
|
from re import search
|
|
from shutil import move
|
|
from glob import glob
|
|
from tempfile import TemporaryDirectory
|
|
from subprocess import run
|
|
from dulwich.porcelain import init, clone, add, commit, push
|
|
|
|
|
|
from revprox import Authentication
|
|
from mookdns import MookDnsSystem
|
|
|
|
|
|
PORT = '3000'
|
|
GITEA_USERNAME = 'forgejo'
|
|
GITEA_PORT = '2222'
|
|
KEY_FILE = '/var/lib/risotto/srv/hosts/forgejo'
|
|
# transition between gitea and forgejo
|
|
GITEA_KEY_FILE = '/var/lib/risotto/srv/hosts/gitea'
|
|
KNOWN_KEY = expandvars('$HOME/.ssh/known_hosts')
|
|
CONFIG_SSH = expandvars('$HOME/.ssh/config')
|
|
CONFIG_GIT = expandvars('$HOME/.gitconfig')
|
|
|
|
|
|
AUTHENTICATION = None
|
|
DATA = None
|
|
|
|
|
|
def get_data():
|
|
global DATA
|
|
if not DATA:
|
|
conf_file = f'{environ["MACHINE_TEST_DIR"]}/forgejo.yml'
|
|
with open(conf_file) as yaml:
|
|
DATA = load(yaml, Loader=SafeLoader)
|
|
return DATA
|
|
|
|
|
|
def get_authentication(data):
|
|
global AUTHENTICATION
|
|
if not AUTHENTICATION:
|
|
AUTHENTICATION = Authentication(data['auth_url'],
|
|
data['auth_server'],
|
|
data['revprox_ip'],
|
|
data['username'],
|
|
data['password'],
|
|
# f'<title>{data["username"]} - Tableau de bord - {data["forgejo_title"]}</title>',
|
|
f'<title>{data["username"]} - Dashboard - {data["forgejo_title"]}</title>',
|
|
)
|
|
return AUTHENTICATION
|
|
|
|
|
|
class SSHConfig:
|
|
def __enter__(self):
|
|
self.old_file = '{CONFIG_SSH}.old'
|
|
if isfile(CONFIG_SSH) and not isfile(self.old_file):
|
|
move(CONFIG_SSH, self.old_file)
|
|
with open(CONFIG_SSH, 'w') as fh:
|
|
fh.write(f"""Host *
|
|
User forgejo
|
|
PubkeyAcceptedKeyTypes +ssh-rsa
|
|
StrictHostKeyChecking no
|
|
IdentityFile {KEY_FILE}
|
|
""")
|
|
|
|
def __exit__(self, *args):
|
|
if isfile(self.old_file):
|
|
move(self.old_file, CONFIG_SSH)
|
|
else:
|
|
unlink(CONFIG_SSH)
|
|
|
|
|
|
class GITConfig:
|
|
def __enter__(self):
|
|
self.old_file = '{CONFIG_GIT}.old'
|
|
if isfile(CONFIG_GIT) and not isfile(self.old_file):
|
|
move(CONFIG_GIT, self.old_file)
|
|
with open(CONFIG_GIT, 'w') as fh:
|
|
conf_file = f'{environ["MACHINE_TEST_DIR"]}/reverse-proxy.yml'
|
|
with open(conf_file) as yaml:
|
|
data = load(yaml, Loader=SafeLoader)
|
|
path = join(environ["MACHINE_TEST_DIR"], data["ca_certificate"])
|
|
cert = glob(path)
|
|
fh.write(f"""[http]
|
|
sslCAInfo = {cert[0]}
|
|
""")
|
|
|
|
def __exit__(self, *args):
|
|
if isfile(self.old_file):
|
|
move(self.old_file, CONFIG_GIT)
|
|
else:
|
|
unlink(CONFIG_GIT)
|
|
|
|
|
|
def get_info(authentication,
|
|
url,
|
|
with_uid=False,
|
|
with_data_id=False,
|
|
found_string=None
|
|
):
|
|
# <input type="hidden" name="_csrf" value="YQbVgdYHX_3VQ-KuZ5cKtr9RzXE6MTY1NzgxMzUzNTA0OTYwODQ0NQ">
|
|
pattern_csrf = r'name="_csrf" value="([a-zA-Z0-9\-\_=]+)"'
|
|
ret = authentication.get(url)
|
|
csrf = search(pattern_csrf, ret)[1]
|
|
ret_data = []
|
|
if with_uid:
|
|
pattern_uid = r'input type="hidden" id="uid" name="uid" value="(\d)+"'
|
|
uid = search(pattern_uid, ret)
|
|
if uid is None:
|
|
ret_data.append(uid)
|
|
else:
|
|
ret_data.append(uid[1])
|
|
if with_data_id:
|
|
pattern_uid = r'/user/settings/keys/delete?type=ssh" data-id="(\d)+"'
|
|
uid = search(pattern_uid, ret)
|
|
if uid is None:
|
|
ret_data.append(uid)
|
|
else:
|
|
ret_data.append(uid[1])
|
|
if found_string:
|
|
ret_data.append(found_string in ret)
|
|
ret_data.append(csrf)
|
|
if len(ret_data) == 1:
|
|
return ret_data[0]
|
|
return ret_data
|
|
|
|
|
|
def add_ssh_key(authentication, data):
|
|
# Send key to forgejo
|
|
url = f'{data["base_url"]}user/settings/keys'
|
|
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
|
|
if is_already_key:
|
|
return
|
|
# Gen SSH key if needed
|
|
if not isfile(KEY_FILE):
|
|
key_dir = dirname(KEY_FILE)
|
|
if not isdir(key_dir):
|
|
makedirs(key_dir)
|
|
cmd = ['/usr/bin/ssh-keygen', '-t', 'rsa', '-N', '', '-f', KEY_FILE]
|
|
run(cmd)
|
|
with open(f'{KEY_FILE}.pub') as fh:
|
|
key = fh.read()
|
|
authentication.post(url, {'_csrf': csrf, 'title': 'test_key_risotto', 'content': key, 'type': 'ssh'})
|
|
|
|
|
|
def delete_ssh_key(authentication, data):
|
|
url = f'{data["base_url"]}user/settings/keys'
|
|
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
|
|
if is_already_key:
|
|
uid, csrf = get_info(authentication, url, with_data_id=True)
|
|
url = f'{data["base_url"]}user/settings/keys/delete?type=ssh'
|
|
authentication.post(url, {'_csrf': csrf, 'id': uid})
|
|
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
|
|
|
|
|
|
def test_forgejo():
|
|
data = get_data()
|
|
get_authentication(data)
|
|
|
|
|
|
def test_forgejo_repos():
|
|
data = get_data()
|
|
authentication = get_authentication(data)
|
|
if 'FIRST_RUN' in environ:
|
|
url = f'{data["base_url"]}repo/create'
|
|
uid, csrf = get_info(authentication, url, with_uid=True)
|
|
authentication.post(url, {'_csrf': csrf, 'uid': uid, 'repo_name': 'test_persistent'})
|
|
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
|
|
json = authentication.get(url, json=True)
|
|
assert json['ok']
|
|
assert len(json['data']) == 1
|
|
username = data['username'].split('@', 1)[0]
|
|
assert json['data'][0]['full_name'] == f'{username}/test_persistent'
|
|
|
|
|
|
def test_forgejo_create_repo():
|
|
data = get_data()
|
|
authentication = get_authentication(data)
|
|
url = f'{data["base_url"]}repo/create'
|
|
uid, csrf = get_info(authentication, url, with_uid=True)
|
|
authentication.post(url, {'_csrf': csrf, 'uid': uid, 'repo_name': 'test', 'default_branch': 'main'})
|
|
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
|
|
json = authentication.get(url, json=True)
|
|
assert json['ok']
|
|
assert len(json['data']) == 2
|
|
username = data['username'].split('@', 1)[0]
|
|
assert {dat['full_name'] for dat in json['data']} == set([f'{username}/test_persistent', f'{username}/test'])
|
|
|
|
|
|
def test_repo():
|
|
data = get_data()
|
|
authentication = get_authentication(data)
|
|
if 'FIRST_RUN' in environ:
|
|
# delete_ssh_key(authentication, data)
|
|
add_ssh_key(authentication, data)
|
|
if not isfile(KEY_FILE):
|
|
if isfile(GITEA_KEY_FILE):
|
|
move(GITEA_KEY_FILE, KEY_FILE)
|
|
move(GITEA_KEY_FILE + '.pub', KEY_FILE + '.pub')
|
|
else:
|
|
raise Exception(f'cannot find ssh key "{KEY_FILE}", do you run with FIRST_RUN?')
|
|
with TemporaryDirectory() as tmpdirname:
|
|
username = data['username'].split('@', 1)[0]
|
|
dns = data['base_url'].split('/', 3)[2]
|
|
ssh_url = f'ssh://{GITEA_USERNAME}@{dns}:{GITEA_PORT}/{username}/test.git'
|
|
with SSHConfig():
|
|
with MookDnsSystem(dns, data['ip']):
|
|
filename = join(tmpdirname, 'test.txt')
|
|
with open(filename, 'w') as fh:
|
|
fh.write('test')
|
|
repo = init(tmpdirname)
|
|
add(repo, filename)
|
|
commit(repo, message=b'test commit')
|
|
push(repo=repo,
|
|
remote_location=ssh_url,
|
|
refspecs='master',
|
|
)
|
|
lst = list(repo.get_walker())
|
|
assert len(lst) == 1
|
|
assert lst[0].commit.message == b'test commit'
|
|
|
|
|
|
def test_clone_http():
|
|
data = get_data()
|
|
authentication = get_authentication(data)
|
|
if 'FIRST_RUN' in environ:
|
|
# delete_ssh_key(authentication, data)
|
|
add_ssh_key(authentication, data)
|
|
with TemporaryDirectory() as tmpdirname:
|
|
username = data['username'].split('@', 1)[0]
|
|
dns = data['base_url'].split('/', 3)[2]
|
|
http_url = f'{data["base_url"]}{username}/test.git'
|
|
with SSHConfig():
|
|
with MookDnsSystem(dns, data['revprox_ip']):
|
|
try:
|
|
repo = clone(http_url, tmpdirname)
|
|
except:
|
|
with GITConfig():
|
|
repo = clone(http_url, tmpdirname)
|
|
lst = list(repo.get_walker())
|
|
assert len(lst) == 1
|
|
assert lst[0].commit.message == b'test commit'
|
|
|
|
|
|
def test_forgejo_delete_repo():
|
|
repo_name = 'test'
|
|
data = get_data()
|
|
authentication = get_authentication(data)
|
|
username = data['username'].split('@', 1)[0]
|
|
url = f'{data["base_url"]}{username}/{repo_name}/settings'
|
|
csrf = get_info(authentication, url)
|
|
authentication.post(url, {'_csrf': csrf, 'action': 'delete', 'repo_name': repo_name})
|
|
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
|
|
json = authentication.get(url, json=True)
|
|
assert json['ok']
|
|
assert len(json['data']) == 1
|
|
username = data['username'].split('@', 1)[0]
|
|
assert json['data'][0]['full_name'] == f'{username}/test_persistent'
|
|
|
|
|
|
def test_repo_persistent():
|
|
data = get_data()
|
|
authentication = get_authentication(data)
|
|
if 'FIRST_RUN' in environ:
|
|
# delete_ssh_key(authentication, data)
|
|
add_ssh_key(authentication, data)
|
|
with TemporaryDirectory() as tmpdirname:
|
|
username = data['username'].split('@', 1)[0]
|
|
dns = data['base_url'].split('/', 3)[2]
|
|
ssh_url = f'ssh://{GITEA_USERNAME}@{dns}:{GITEA_PORT}/{username}/test_persistent.git'
|
|
with SSHConfig():
|
|
with MookDnsSystem(dns, data['ip']):
|
|
if 'FIRST_RUN' in environ:
|
|
filename = join(tmpdirname, 'test.txt')
|
|
with open(filename, 'w') as fh:
|
|
fh.write('test')
|
|
repo = init(tmpdirname)
|
|
add(repo, filename)
|
|
commit(repo, message=b'test commit')
|
|
push(repo=repo,
|
|
remote_location=ssh_url,
|
|
refspecs='master',
|
|
)
|
|
else:
|
|
repo = clone(ssh_url, tmpdirname)
|
|
lst = list(repo.get_walker())
|
|
assert len(lst) == 1
|
|
assert lst[0].commit.message == b'test commit'
|