import datetime
from yaml import load, SafeLoader
from os import environ, makedirs, unlink
from os.path import expandvars, isfile, isdir, dirname, join
from re import search
from shutil import move
from glob import glob
from tempfile import TemporaryDirectory
from subprocess import run
from dulwich.porcelain import init, clone, add, commit, push
from revprox import Authentication
from mookdns import MookDnsSystem
PORT = '3000'
FORGEJO_USERNAME = 'forgejo'
FORGEJO_PORT = '2222'
KEY_FILE = '/var/lib/risotto/srv/hosts/forgejo'
# transition between gitea and forgejo
GITEA_KEY_FILE = '/var/lib/risotto/srv/hosts/gitea'
CONFIG_SSH = expandvars('$HOME/.ssh/config')
CONFIG_GIT = expandvars('$HOME/.gitconfig')
AUTHENTICATION = None
DATA = None
def get_data():
global DATA
if not DATA:
conf_file = f'{environ["MACHINE_TEST_DIR"]}/forgejo.yml'
with open(conf_file) as yaml:
DATA = load(yaml, Loader=SafeLoader)
return DATA
def get_authentication(data):
global AUTHENTICATION
if not AUTHENTICATION:
AUTHENTICATION = Authentication(data['auth_url'],
data['auth_server'],
data['revprox_ip'],
data['username'],
data['password'],
# f'
{data["username"]} - Tableau de bord - {data["forgejo_title"]}',
f'{data["username"]} - Dashboard - {data["forgejo_title"]}',
)
return AUTHENTICATION
class SSHConfig:
def __enter__(self):
self.old_file = '{CONFIG_SSH}.old'
if isfile(CONFIG_SSH) and not isfile(self.old_file):
move(CONFIG_SSH, self.old_file)
with open(CONFIG_SSH, 'w') as fh:
fh.write(f"""Host *
User forgejo
PubkeyAcceptedKeyTypes +ssh-rsa
StrictHostKeyChecking no
IdentityFile {KEY_FILE}
""")
def __exit__(self, *args):
if isfile(self.old_file):
move(self.old_file, CONFIG_SSH)
else:
unlink(CONFIG_SSH)
class GITConfig:
def __enter__(self):
self.old_file = '{CONFIG_GIT}.old'
if isfile(CONFIG_GIT) and not isfile(self.old_file):
move(CONFIG_GIT, self.old_file)
with open(CONFIG_GIT, 'w') as fh:
conf_file = f'{environ["MACHINE_TEST_DIR"]}/reverse-proxy.yml'
with open(conf_file) as yaml:
data = load(yaml, Loader=SafeLoader)
path = join(environ["MACHINE_TEST_DIR"], data["ca_certificate"])
cert = glob(path)
fh.write(f"""[http]
sslCAInfo = {cert[0]}
""")
def __exit__(self, *args):
if isfile(self.old_file):
move(self.old_file, CONFIG_GIT)
else:
unlink(CONFIG_GIT)
def get_info(authentication,
url,
with_uid=False,
with_data_id=False,
found_string=None
):
pattern_csrf = r'name="_csrf" value="([a-zA-Z0-9\-\_=]+)"'
ret = authentication.get(url)
csrf = search(pattern_csrf, ret)[1]
ret_data = []
if with_uid:
pattern_uid = r'input type="hidden" id="uid" name="uid" value="(\d)+"'
uid = search(pattern_uid, ret)
if uid is None:
ret_data.append(uid)
else:
ret_data.append(uid[1])
if with_data_id:
pattern_uid = r'/user/settings/keys/delete?type=ssh" data-id="(\d)+"'
uid = search(pattern_uid, ret)
if uid is None:
ret_data.append(uid)
else:
ret_data.append(uid[1])
if found_string:
ret_data.append(found_string in ret)
ret_data.append(csrf)
if len(ret_data) == 1:
return ret_data[0]
return ret_data
def add_ssh_key(authentication, data):
# Send key to forgejo
url = f'{data["base_url"]}user/settings/keys'
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
if is_already_key:
return
# Gen SSH key if needed
if not isfile(KEY_FILE):
key_dir = dirname(KEY_FILE)
if not isdir(key_dir):
makedirs(key_dir)
cmd = ['/usr/bin/ssh-keygen', '-t', 'rsa', '-N', '', '-f', KEY_FILE]
run(cmd)
with open(f'{KEY_FILE}.pub') as fh:
key = fh.read()
authentication.post(url, {'_csrf': csrf, 'title': 'test_key_risotto', 'content': key, 'type': 'ssh'})
def delete_ssh_key(authentication, data):
url = f'{data["base_url"]}user/settings/keys'
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
if is_already_key:
uid, csrf = get_info(authentication, url, with_data_id=True)
url = f'{data["base_url"]}user/settings/keys/delete?type=ssh'
authentication.post(url, {'_csrf': csrf, 'id': uid})
is_already_key, csrf = get_info(authentication, url, found_string='test_key_risotto')
def test_forgejo():
data = get_data()
get_authentication(data)
def test_forgejo_repos():
data = get_data()
authentication = get_authentication(data)
if 'FIRST_RUN' in environ:
url = f'{data["base_url"]}repo/create'
uid, csrf = get_info(authentication, url, with_uid=True)
authentication.post(url, {'_csrf': csrf, 'uid': uid, 'repo_name': 'test_persistent'})
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
json = authentication.get(url, json=True)
assert json['ok']
assert len(json['data']) == 1
username = data['username'].split('@', 1)[0]
assert json['data'][0]['full_name'] == f'{username}/test_persistent'
def test_forgejo_create_repo():
data = get_data()
authentication = get_authentication(data)
url = f'{data["base_url"]}repo/create'
uid, csrf = get_info(authentication, url, with_uid=True)
authentication.post(url, {'_csrf': csrf, 'uid': uid, 'repo_name': 'test', 'default_branch': 'main'})
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
json = authentication.get(url, json=True)
assert json['ok']
assert len(json['data']) == 2
username = data['username'].split('@', 1)[0]
assert {dat['full_name'] for dat in json['data']} == set([f'{username}/test_persistent', f'{username}/test'])
def test_repo():
data = get_data()
authentication = get_authentication(data)
if 'FIRST_RUN' in environ:
# delete_ssh_key(authentication, data)
add_ssh_key(authentication, data)
if not isfile(KEY_FILE):
if isfile(GITEA_KEY_FILE):
move(GITEA_KEY_FILE, KEY_FILE)
move(GITEA_KEY_FILE + '.pub', KEY_FILE + '.pub')
else:
raise Exception(f'cannot find ssh key "{KEY_FILE}", do you run with FIRST_RUN?')
with TemporaryDirectory() as tmpdirname:
username = data['username'].split('@', 1)[0]
dns = data['base_url'].split('/', 3)[2]
ssh_url = f'ssh://{FORGEJO_USERNAME}@{dns}:{FORGEJO_PORT}/{username}/test.git'
with SSHConfig():
with MookDnsSystem(dns, data['ip']):
filename = join(tmpdirname, 'test.txt')
with open(filename, 'w') as fh:
fh.write('test')
repo = init(tmpdirname)
add(repo, filename)
commit(repo, message=b'test commit')
push(repo=repo,
remote_location=ssh_url,
refspecs='master',
)
lst = list(repo.get_walker())
assert len(lst) == 1
assert lst[0].commit.message == b'test commit'
def test_clone_http():
data = get_data()
authentication = get_authentication(data)
if 'FIRST_RUN' in environ:
# delete_ssh_key(authentication, data)
add_ssh_key(authentication, data)
with TemporaryDirectory() as tmpdirname:
username = data['username'].split('@', 1)[0]
dns = data['base_url'].split('/', 3)[2]
http_url = f'{data["base_url"]}{username}/test.git'
with SSHConfig():
with MookDnsSystem(dns, data['revprox_ip']):
try:
repo = clone(http_url, tmpdirname)
except:
with GITConfig():
repo = clone(http_url, tmpdirname)
lst = list(repo.get_walker())
assert len(lst) == 1
assert lst[0].commit.message == b'test commit'
def test_forgejo_delete_repo():
repo_name = 'test'
data = get_data()
authentication = get_authentication(data)
username = data['username'].split('@', 1)[0]
url = f'{data["base_url"]}{username}/{repo_name}/settings'
csrf = get_info(authentication, url)
authentication.post(url, {'_csrf': csrf, 'action': 'delete', 'repo_name': repo_name})
url = f'{data["base_url"]}api/v1/repos/search?sort=updated&order=desc&uid=1&team_id=0&q=&page=1&mode='
json = authentication.get(url, json=True)
assert json['ok']
assert len(json['data']) == 1
username = data['username'].split('@', 1)[0]
assert json['data'][0]['full_name'] == f'{username}/test_persistent'
def test_repo_persistent():
data = get_data()
authentication = get_authentication(data)
if 'FIRST_RUN' in environ:
# delete_ssh_key(authentication, data)
add_ssh_key(authentication, data)
with TemporaryDirectory() as tmpdirname:
username = data['username'].split('@', 1)[0]
dns = data['base_url'].split('/', 3)[2]
ssh_url = f'ssh://{FORGEJO_USERNAME}@{dns}:{FORGEJO_PORT}/{username}/test_persistent.git'
with SSHConfig():
with MookDnsSystem(dns, data['ip']):
filename = join(tmpdirname, 'test.txt')
if 'FIRST_RUN' in environ:
with open(filename, 'w') as fh:
fh.write('test')
repo = init(tmpdirname)
add(repo, filename)
commit(repo, message=b'test commit')
push(repo=repo,
remote_location=ssh_url,
refspecs='master',
)
else:
repo = clone(ssh_url, tmpdirname)
with open(filename, 'r') as fh:
len_file = len(fh.readlines())
# get previous commit number
lst = list(repo.get_walker())
len_before_commit = len(lst)
assert len_before_commit == len_file
# add a new line in file and commit
with open(filename, 'a') as fh:
fh.write('\ntest')
add(repo, filename)
date = datetime.datetime.now()
commit_message = f'test commit {date}'.encode()
commit(repo, message=commit_message)
push(repo=repo,
remote_location=ssh_url,
refspecs='master',
)
# test if commit is added and last commit
lst = list(repo.get_walker())
len_after_commit = len(lst)
assert len_before_commit + 1 == len_after_commit
assert lst[-1].commit.message == commit_message